001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.jms.observer;
017
018import static org.slf4j.LoggerFactory.getLogger;
019
020import java.io.IOException;
021
022import javax.annotation.PostConstruct;
023import javax.annotation.PreDestroy;
024import javax.inject.Inject;
025import javax.jms.Connection;
026import javax.jms.JMSException;
027import javax.jms.Message;
028import javax.jms.MessageProducer;
029import javax.jms.Session;
030
031import org.apache.activemq.ActiveMQConnectionFactory;
032import org.fcrepo.kernel.observer.FedoraEvent;
033import org.slf4j.Logger;
034
035import com.google.common.eventbus.EventBus;
036import com.google.common.eventbus.Subscribe;
037
038/**
039 * Machinery to publish JMS messages when an EventBus
040 * message is received.
041 *
042 * @author barmintor
043 * @author awoods
044 */
045public class JMSTopicPublisher {
046
047    @Inject
048    private EventBus eventBus;
049
050    @Inject
051    private ActiveMQConnectionFactory connectionFactory;
052
053    @Inject
054    private JMSEventMessageFactory eventFactory;
055
056    private Connection connection;
057
058    private Session jmsSession;
059
060    private MessageProducer producer;
061
062    private static final Logger LOGGER = getLogger(JMSTopicPublisher.class);
063
064    /**
065     * When an EventBus mesage is received, map it to our JMS
066     * message payload and push it onto the queue.
067     *
068     * @param fedoraEvent the fedora event
069     * @throws JMSException if JMS exception occurred
070     * @throws IOException if IO exception occurred
071     */
072    @Subscribe
073    public void publishJCREvent(final FedoraEvent fedoraEvent) throws JMSException, IOException {
074        LOGGER.debug("Received an event from the internal bus.");
075        final Message tm =
076                eventFactory.getMessage(fedoraEvent, jmsSession);
077        LOGGER.debug("Transformed the event to a JMS message.");
078        producer.send(tm);
079
080        LOGGER.debug("Put event: {} onto JMS.", tm.getJMSMessageID());
081    }
082
083    /**
084     * Connect to JCR Repostory and JMS queue
085     *
086     * @throws JMSException if JMS Exception occurred
087     */
088    @PostConstruct
089    public void acquireConnections() throws JMSException {
090        LOGGER.debug("Initializing: {}", this.getClass().getCanonicalName());
091
092        connection = connectionFactory.createConnection();
093        connection.start();
094        jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
095        producer = jmsSession.createProducer(jmsSession.createTopic("fedora"));
096        eventBus.register(this);
097    }
098
099    /**
100     * Close external connections
101     *
102     * @throws JMSException if JMS exception occurred
103     */
104    @PreDestroy
105    public void releaseConnections() throws JMSException {
106        LOGGER.debug("Tearing down: {}", this.getClass().getCanonicalName());
107
108        producer.close();
109        jmsSession.close();
110        connection.close();
111        eventBus.unregister(this);
112    }
113}