001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.jms.observer;
017
018import static org.slf4j.LoggerFactory.getLogger;
019
020import java.io.IOException;
021
022import javax.annotation.PostConstruct;
023import javax.annotation.PreDestroy;
024import javax.inject.Inject;
025import javax.jcr.RepositoryException;
026import javax.jms.Connection;
027import javax.jms.JMSException;
028import javax.jms.Message;
029import javax.jms.MessageProducer;
030import javax.jms.Session;
031
032import org.apache.activemq.ActiveMQConnectionFactory;
033import org.fcrepo.kernel.observer.FedoraEvent;
034import org.slf4j.Logger;
035
036import com.google.common.eventbus.EventBus;
037import com.google.common.eventbus.Subscribe;
038
039/**
040 * Machinery to publish JMS messages when an EventBus
041 * message is received.
042 *
043 * @author barmintor
044 * @author awoods
045 */
046public class JMSTopicPublisher {
047
048    @Inject
049    private EventBus eventBus;
050
051    @Inject
052    private ActiveMQConnectionFactory connectionFactory;
053
054    @Inject
055    private JMSEventMessageFactory eventFactory;
056
057    private Connection connection;
058
059    private Session jmsSession;
060
061    private MessageProducer producer;
062
063    private static final Logger LOGGER = getLogger(JMSTopicPublisher.class);
064
065    /**
066     * When an EventBus mesage is received, map it to our JMS
067     * message payload and push it onto the queue.
068     *
069     * @param fedoraEvent the fedora event
070     * @throws JMSException if JMS exception occurred
071     * @throws RepositoryException if repository exception occurred
072     * @throws IOException if IO exception occurred
073     */
074    @Subscribe
075    public void publishJCREvent(final FedoraEvent fedoraEvent) throws JMSException,
076        RepositoryException, IOException {
077        LOGGER.debug("Received an event from the internal bus.");
078        final Message tm =
079                eventFactory.getMessage(fedoraEvent, jmsSession);
080        LOGGER.debug("Transformed the event to a JMS message.");
081        producer.send(tm);
082
083        LOGGER.debug("Put event: {} onto JMS.", tm.getJMSMessageID());
084    }
085
086    /**
087     * Connect to JCR Repostory and JMS queue
088     *
089     * @throws JMSException if JMS Exception occurred
090     */
091    @PostConstruct
092    public void acquireConnections() throws JMSException {
093        LOGGER.debug("Initializing: {}", this.getClass().getCanonicalName());
094
095        connection = connectionFactory.createConnection();
096        connection.start();
097        jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
098        producer = jmsSession.createProducer(jmsSession.createTopic("fedora"));
099        eventBus.register(this);
100    }
101
102    /**
103     * Close external connections
104     *
105     * @throws JMSException if JMS exception occurred
106     */
107    @PreDestroy
108    public void releaseConnections() throws JMSException {
109        LOGGER.debug("Tearing down: {}", this.getClass().getCanonicalName());
110
111        producer.close();
112        jmsSession.close();
113        connection.close();
114        eventBus.unregister(this);
115    }
116}