001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.jms; 019 020import static org.slf4j.LoggerFactory.getLogger; 021 022import java.io.IOException; 023 024import javax.annotation.PostConstruct; 025import javax.annotation.PreDestroy; 026import javax.inject.Inject; 027import javax.jms.Connection; 028import javax.jms.JMSException; 029import javax.jms.Message; 030import javax.jms.MessageProducer; 031import javax.jms.Session; 032 033import org.apache.activemq.ActiveMQConnectionFactory; 034import org.fcrepo.kernel.api.observer.FedoraEvent; 035import org.slf4j.Logger; 036 037import com.google.common.eventbus.EventBus; 038import com.google.common.eventbus.Subscribe; 039 040/** 041 * Machinery to publish JMS messages when an EventBus 042 * message is received. 043 * 044 * @author barmintor 045 * @author awoods 046 */ 047public class JMSTopicPublisher { 048 049 @Inject 050 private EventBus eventBus; 051 052 @Inject 053 private ActiveMQConnectionFactory connectionFactory; 054 055 @Inject 056 private JMSEventMessageFactory eventFactory; 057 058 private Connection connection; 059 060 private Session jmsSession; 061 062 private MessageProducer producer; 063 064 private static final Logger LOGGER = getLogger(JMSTopicPublisher.class); 065 066 /** 067 * When an EventBus mesage is received, map it to our JMS 068 * message payload and push it onto the queue. 069 * 070 * @param fedoraEvent the fedora event 071 * @throws JMSException if JMS exception occurred 072 * @throws IOException if IO exception occurred 073 */ 074 @Subscribe 075 public void publishJCREvent(final FedoraEvent fedoraEvent) throws JMSException, IOException { 076 LOGGER.debug("Received an event from the internal bus."); 077 final Message tm = 078 eventFactory.getMessage(fedoraEvent, jmsSession); 079 LOGGER.debug("Transformed the event to a JMS message."); 080 producer.send(tm); 081 082 LOGGER.debug("Put event: {} onto JMS.", tm.getJMSMessageID()); 083 } 084 085 /** 086 * Connect to JCR Repostory and JMS queue 087 * 088 * @throws JMSException if JMS Exception occurred 089 */ 090 @PostConstruct 091 public void acquireConnections() throws JMSException { 092 LOGGER.debug("Initializing: {}", this.getClass().getCanonicalName()); 093 094 connection = connectionFactory.createConnection(); 095 connection.start(); 096 jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 097 producer = jmsSession.createProducer(jmsSession.createTopic("fedora")); 098 eventBus.register(this); 099 } 100 101 /** 102 * Close external connections 103 * 104 * @throws JMSException if JMS exception occurred 105 */ 106 @PreDestroy 107 public void releaseConnections() throws JMSException { 108 LOGGER.debug("Tearing down: {}", this.getClass().getCanonicalName()); 109 110 producer.close(); 111 jmsSession.close(); 112 connection.close(); 113 eventBus.unregister(this); 114 } 115}