001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.jms.observer; 017 018import static org.slf4j.LoggerFactory.getLogger; 019 020import java.io.IOException; 021 022import javax.annotation.PostConstruct; 023import javax.annotation.PreDestroy; 024import javax.inject.Inject; 025import javax.jcr.RepositoryException; 026import javax.jms.Connection; 027import javax.jms.JMSException; 028import javax.jms.Message; 029import javax.jms.MessageProducer; 030import javax.jms.Session; 031 032import org.apache.activemq.ActiveMQConnectionFactory; 033import org.fcrepo.kernel.observer.FedoraEvent; 034import org.slf4j.Logger; 035 036import com.google.common.eventbus.EventBus; 037import com.google.common.eventbus.Subscribe; 038 039/** 040 * Machinery to publish JMS messages when an EventBus 041 * message is received. 042 * 043 * @author barmintor 044 * @author awoods 045 */ 046public class JMSTopicPublisher { 047 048 @Inject 049 private EventBus eventBus; 050 051 @Inject 052 private ActiveMQConnectionFactory connectionFactory; 053 054 @Inject 055 private JMSEventMessageFactory eventFactory; 056 057 private Connection connection; 058 059 private Session jmsSession; 060 061 private MessageProducer producer; 062 063 private static final Logger LOGGER = getLogger(JMSTopicPublisher.class); 064 065 /** 066 * When an EventBus mesage is received, map it to our JMS 067 * message payload and push it onto the queue. 068 * 069 * @param fedoraEvent 070 * @throws JMSException 071 * @throws RepositoryException 072 * @throws IOException 073 */ 074 @Subscribe 075 public void publishJCREvent(final FedoraEvent fedoraEvent) throws JMSException, 076 RepositoryException, IOException { 077 LOGGER.debug("Received an event from the internal bus."); 078 final Message tm = 079 eventFactory.getMessage(fedoraEvent, jmsSession); 080 LOGGER.debug("Transformed the event to a JMS message."); 081 producer.send(tm); 082 083 LOGGER.debug("Put event: {} onto JMS.", tm.getJMSMessageID()); 084 } 085 086 /** 087 * Connect to JCR Repostory and JMS queue 088 * 089 * @throws JMSException 090 */ 091 @PostConstruct 092 public void acquireConnections() throws JMSException { 093 LOGGER.debug("Initializing: {}", this.getClass().getCanonicalName()); 094 095 connection = connectionFactory.createConnection(); 096 connection.start(); 097 jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 098 producer = jmsSession.createProducer(jmsSession.createTopic("fedora")); 099 eventBus.register(this); 100 } 101 102 /** 103 * Close external connections 104 * 105 * @throws JMSException 106 */ 107 @PreDestroy 108 public void releaseConnections() throws JMSException { 109 LOGGER.debug("Tearing down: {}", this.getClass().getCanonicalName()); 110 111 producer.close(); 112 jmsSession.close(); 113 connection.close(); 114 eventBus.unregister(this); 115 } 116}