001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.jms; 019 020import static org.slf4j.LoggerFactory.getLogger; 021 022import javax.annotation.PostConstruct; 023import javax.annotation.PreDestroy; 024import javax.inject.Inject; 025import javax.jms.Connection; 026import javax.jms.Destination; 027import javax.jms.JMSException; 028import javax.jms.Message; 029import javax.jms.MessageProducer; 030import javax.jms.Session; 031 032import com.google.common.eventbus.AllowConcurrentEvents; 033import org.apache.activemq.ActiveMQConnectionFactory; 034import org.fcrepo.kernel.api.observer.Event; 035import org.slf4j.Logger; 036 037import com.google.common.eventbus.EventBus; 038import com.google.common.eventbus.Subscribe; 039 040/** 041 * Machinery to publish JMS messages when an EventBus 042 * message is received. 043 * 044 * @author barmintor 045 * @author awoods 046 * @author acoburn 047 */ 048public abstract class AbstractJMSPublisher { 049 050 @Inject 051 private EventBus eventBus; 052 053 @Inject 054 private ActiveMQConnectionFactory connectionFactory; 055 056 @Inject 057 private JMSEventMessageFactory eventFactory; 058 059 private Connection connection; 060 061 protected Session jmsSession; 062 063 private MessageProducer producer; 064 065 private static final Logger LOGGER = getLogger(AbstractJMSPublisher.class); 066 067 protected abstract Destination createDestination() throws JMSException; 068 069 /** 070 * When an EventBus message is received, map it to our JMS 071 * message payload and push it onto the queue. 072 * 073 * @param event the fedora event 074 * @throws JMSException if JMS exception occurred 075 */ 076 @Subscribe 077 @AllowConcurrentEvents 078 public void publishJCREvent(final Event event) throws JMSException { 079 LOGGER.debug("Received an event from the internal bus. {}", event); 080 final Message tm = 081 eventFactory.getMessage(event, jmsSession); 082 LOGGER.trace("Transformed the event to a JMS message."); 083 producer.send(tm); 084 085 LOGGER.debug("Put event: {} onto JMS.", tm.getJMSMessageID()); 086 } 087 088 /** 089 * Connect to JCR Repository and JMS queue 090 * 091 * @throws JMSException if JMS Exception occurred 092 */ 093 @PostConstruct 094 public void acquireConnections() throws JMSException { 095 LOGGER.debug("Initializing: {}", this.getClass().getCanonicalName()); 096 097 connection = connectionFactory.createConnection(); 098 connection.start(); 099 jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 100 producer = jmsSession.createProducer(createDestination()); 101 eventBus.register(this); 102 } 103 104 /** 105 * Close external connections 106 * 107 * @throws JMSException if JMS exception occurred 108 */ 109 @PreDestroy 110 public void releaseConnections() throws JMSException { 111 LOGGER.debug("Tearing down: {}", this.getClass().getCanonicalName()); 112 113 producer.close(); 114 jmsSession.close(); 115 connection.close(); 116 eventBus.unregister(this); 117 } 118}