001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.jms.observer; 017 018import static org.slf4j.LoggerFactory.getLogger; 019 020import java.io.IOException; 021 022import javax.annotation.PostConstruct; 023import javax.annotation.PreDestroy; 024import javax.inject.Inject; 025import javax.jms.Connection; 026import javax.jms.JMSException; 027import javax.jms.Message; 028import javax.jms.MessageProducer; 029import javax.jms.Session; 030 031import org.apache.activemq.ActiveMQConnectionFactory; 032import org.fcrepo.kernel.api.observer.FedoraEvent; 033import org.slf4j.Logger; 034 035import com.google.common.eventbus.EventBus; 036import com.google.common.eventbus.Subscribe; 037 038/** 039 * Machinery to publish JMS messages when an EventBus 040 * message is received. 041 * 042 * @author barmintor 043 * @author awoods 044 */ 045public class JMSTopicPublisher { 046 047 @Inject 048 private EventBus eventBus; 049 050 @Inject 051 private ActiveMQConnectionFactory connectionFactory; 052 053 @Inject 054 private JMSEventMessageFactory eventFactory; 055 056 private Connection connection; 057 058 private Session jmsSession; 059 060 private MessageProducer producer; 061 062 private static final Logger LOGGER = getLogger(JMSTopicPublisher.class); 063 064 /** 065 * When an EventBus mesage is received, map it to our JMS 066 * message payload and push it onto the queue. 067 * 068 * @param fedoraEvent the fedora event 069 * @throws JMSException if JMS exception occurred 070 * @throws IOException if IO exception occurred 071 */ 072 @Subscribe 073 public void publishJCREvent(final FedoraEvent fedoraEvent) throws JMSException, IOException { 074 LOGGER.debug("Received an event from the internal bus."); 075 final Message tm = 076 eventFactory.getMessage(fedoraEvent, jmsSession); 077 LOGGER.debug("Transformed the event to a JMS message."); 078 producer.send(tm); 079 080 LOGGER.debug("Put event: {} onto JMS.", tm.getJMSMessageID()); 081 } 082 083 /** 084 * Connect to JCR Repostory and JMS queue 085 * 086 * @throws JMSException if JMS Exception occurred 087 */ 088 @PostConstruct 089 public void acquireConnections() throws JMSException { 090 LOGGER.debug("Initializing: {}", this.getClass().getCanonicalName()); 091 092 connection = connectionFactory.createConnection(); 093 connection.start(); 094 jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 095 producer = jmsSession.createProducer(jmsSession.createTopic("fedora")); 096 eventBus.register(this); 097 } 098 099 /** 100 * Close external connections 101 * 102 * @throws JMSException if JMS exception occurred 103 */ 104 @PreDestroy 105 public void releaseConnections() throws JMSException { 106 LOGGER.debug("Tearing down: {}", this.getClass().getCanonicalName()); 107 108 producer.close(); 109 jmsSession.close(); 110 connection.close(); 111 eventBus.unregister(this); 112 } 113}