001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.jms;
019
020import static org.slf4j.LoggerFactory.getLogger;
021
022import java.io.IOException;
023
024import javax.annotation.PostConstruct;
025import javax.annotation.PreDestroy;
026import javax.inject.Inject;
027import javax.jms.Connection;
028import javax.jms.JMSException;
029import javax.jms.Message;
030import javax.jms.MessageProducer;
031import javax.jms.Session;
032
033import org.apache.activemq.ActiveMQConnectionFactory;
034import org.fcrepo.kernel.api.observer.FedoraEvent;
035import org.slf4j.Logger;
036
037import com.google.common.eventbus.EventBus;
038import com.google.common.eventbus.Subscribe;
039
040/**
041 * Machinery to publish JMS messages when an EventBus
042 * message is received.
043 *
044 * @author barmintor
045 * @author awoods
046 */
047public class JMSTopicPublisher {
048
049    @Inject
050    private EventBus eventBus;
051
052    @Inject
053    private ActiveMQConnectionFactory connectionFactory;
054
055    @Inject
056    private JMSEventMessageFactory eventFactory;
057
058    private Connection connection;
059
060    private Session jmsSession;
061
062    private MessageProducer producer;
063
064    private static final Logger LOGGER = getLogger(JMSTopicPublisher.class);
065
066    /**
067     * When an EventBus mesage is received, map it to our JMS
068     * message payload and push it onto the queue.
069     *
070     * @param fedoraEvent the fedora event
071     * @throws JMSException if JMS exception occurred
072     * @throws IOException if IO exception occurred
073     */
074    @Subscribe
075    public void publishJCREvent(final FedoraEvent fedoraEvent) throws JMSException, IOException {
076        LOGGER.debug("Received an event from the internal bus.");
077        final Message tm =
078                eventFactory.getMessage(fedoraEvent, jmsSession);
079        LOGGER.debug("Transformed the event to a JMS message.");
080        producer.send(tm);
081
082        LOGGER.debug("Put event: {} onto JMS.", tm.getJMSMessageID());
083    }
084
085    /**
086     * Connect to JCR Repostory and JMS queue
087     *
088     * @throws JMSException if JMS Exception occurred
089     */
090    @PostConstruct
091    public void acquireConnections() throws JMSException {
092        LOGGER.debug("Initializing: {}", this.getClass().getCanonicalName());
093
094        connection = connectionFactory.createConnection();
095        connection.start();
096        jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
097        producer = jmsSession.createProducer(jmsSession.createTopic("fedora"));
098        eventBus.register(this);
099    }
100
101    /**
102     * Close external connections
103     *
104     * @throws JMSException if JMS exception occurred
105     */
106    @PreDestroy
107    public void releaseConnections() throws JMSException {
108        LOGGER.debug("Tearing down: {}", this.getClass().getCanonicalName());
109
110        producer.close();
111        jmsSession.close();
112        connection.close();
113        eventBus.unregister(this);
114    }
115}