001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.jms;
019
020import static org.slf4j.LoggerFactory.getLogger;
021
022import javax.annotation.PostConstruct;
023import javax.annotation.PreDestroy;
024import javax.inject.Inject;
025import javax.jms.Connection;
026import javax.jms.Destination;
027import javax.jms.JMSException;
028import javax.jms.Message;
029import javax.jms.MessageProducer;
030import javax.jms.Session;
031
032import com.google.common.eventbus.AllowConcurrentEvents;
033import org.apache.activemq.ActiveMQConnectionFactory;
034import org.fcrepo.kernel.api.observer.Event;
035import org.slf4j.Logger;
036
037import com.google.common.eventbus.EventBus;
038import com.google.common.eventbus.Subscribe;
039
040/**
041 * Machinery to publish JMS messages when an EventBus
042 * message is received.
043 *
044 * @author barmintor
045 * @author awoods
046 * @author acoburn
047 */
048public abstract class AbstractJMSPublisher {
049
050    @Inject
051    private EventBus eventBus;
052
053    @Inject
054    private ActiveMQConnectionFactory connectionFactory;
055
056    @Inject
057    private JMSEventMessageFactory eventFactory;
058
059    private Connection connection;
060
061    protected Session jmsSession;
062
063    private MessageProducer producer;
064
065    private static final Logger LOGGER = getLogger(AbstractJMSPublisher.class);
066
067    protected abstract Destination createDestination() throws JMSException;
068
069    /**
070     * When an EventBus message is received, map it to our JMS
071     * message payload and push it onto the queue.
072     *
073     * @param event the fedora event
074     * @throws JMSException if JMS exception occurred
075     */
076    @Subscribe
077    @AllowConcurrentEvents
078    public void publishJCREvent(final Event event) throws JMSException {
079        LOGGER.debug("Received an event from the internal bus. {}", event);
080        final Message tm =
081                eventFactory.getMessage(event, jmsSession);
082        LOGGER.trace("Transformed the event to a JMS message.");
083        producer.send(tm);
084
085        LOGGER.debug("Put event: {} onto JMS.", tm.getJMSMessageID());
086    }
087
088    /**
089     * Connect to JCR Repository and JMS queue
090     *
091     * @throws JMSException if JMS Exception occurred
092     */
093    @PostConstruct
094    public void acquireConnections() throws JMSException {
095        LOGGER.debug("Initializing: {}", this.getClass().getCanonicalName());
096
097        connection = connectionFactory.createConnection();
098        connection.start();
099        jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
100        producer = jmsSession.createProducer(createDestination());
101        eventBus.register(this);
102    }
103
104    /**
105     * Close external connections
106     *
107     * @throws JMSException if JMS exception occurred
108     */
109    @PreDestroy
110    public void releaseConnections() throws JMSException {
111        LOGGER.debug("Tearing down: {}", this.getClass().getCanonicalName());
112
113        producer.close();
114        jmsSession.close();
115        connection.close();
116        eventBus.unregister(this);
117    }
118}