001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.modeshape.observer;
017
018import static com.codahale.metrics.MetricRegistry.name;
019import static com.google.common.collect.Iterators.filter;
020import static com.google.common.collect.Iterators.transform;
021import static javax.jcr.observation.Event.NODE_ADDED;
022import static javax.jcr.observation.Event.NODE_MOVED;
023import static javax.jcr.observation.Event.NODE_REMOVED;
024import static javax.jcr.observation.Event.PROPERTY_ADDED;
025import static javax.jcr.observation.Event.PROPERTY_CHANGED;
026import static javax.jcr.observation.Event.PROPERTY_REMOVED;
027import static org.slf4j.LoggerFactory.getLogger;
028
029import  org.fcrepo.metrics.RegistryService;
030
031import java.util.Iterator;
032import javax.annotation.PostConstruct;
033import javax.annotation.PreDestroy;
034import javax.inject.Inject;
035import javax.jcr.RepositoryException;
036import javax.jcr.Session;
037import javax.jcr.observation.Event;
038import javax.jcr.observation.EventListener;
039
040import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
041import org.fcrepo.kernel.api.observer.EventFilter;
042import org.fcrepo.kernel.api.observer.FedoraEvent;
043import org.fcrepo.kernel.api.observer.eventmappings.InternalExternalEventMapper;
044
045import org.modeshape.jcr.api.Repository;
046import org.slf4j.Logger;
047
048import com.codahale.metrics.Counter;
049import com.google.common.eventbus.EventBus;
050
051/**
052 * Simple JCR EventListener that filters JCR Events through a Fedora EventFilter, maps the results through a mapper,
053 * and puts the resulting stream onto the internal Fedora EventBus as a stream of FedoraEvents.
054 *
055 * @author eddies
056 * @author ajs6f
057 * @since Feb 7, 2013
058 */
059public class SimpleObserver implements EventListener {
060
061    private static final Logger LOGGER = getLogger(SimpleObserver.class);
062
063    /**
064     * A simple counter of events that pass through this observer
065     */
066    static final Counter EVENT_COUNTER =
067            RegistryService.getInstance().getMetrics().counter(name(SimpleObserver.class, "onEvent"));
068
069    static final Integer EVENT_TYPES = NODE_ADDED + NODE_REMOVED + NODE_MOVED + PROPERTY_ADDED + PROPERTY_CHANGED
070            + PROPERTY_REMOVED;
071
072    @Inject
073    private Repository repository;
074
075    @Inject
076    private EventBus eventBus;
077
078    @Inject
079    private InternalExternalEventMapper eventMapper;
080
081    @Inject
082    private EventFilter eventFilter;
083
084    // THIS SESSION SHOULD NOT BE USED TO LOOK UP NODES
085    // it is used only to register and deregister this observer to the JCR
086    private Session session;
087
088    /**
089     * Register this observer with the JCR event listeners
090     *
091     * @throws RepositoryException if repository exception occurred
092     */
093    @PostConstruct
094    public void buildListener() throws RepositoryException {
095        LOGGER.debug("Constructing an observer for JCR events...");
096        session = repository.login();
097        session.getWorkspace().getObservationManager()
098                .addEventListener(this, EVENT_TYPES, "/", true, null, null, false);
099        session.save();
100    }
101
102    /**
103     * logout of the session
104     *
105     * @throws RepositoryException if repository exception occurred
106     */
107    @PreDestroy
108    public void stopListening() throws RepositoryException {
109        try {
110            LOGGER.debug("Destroying an observer for JCR events...");
111            session.getWorkspace().getObservationManager().removeEventListener(this);
112        } finally {
113            session.logout();
114        }
115    }
116
117    /**
118     * Filter JCR events and transform them into our own FedoraEvents.
119     *
120     * @param events the JCR events
121     */
122    @Override
123    public void onEvent(final javax.jcr.observation.EventIterator events) {
124        Session lookupSession = null;
125        try {
126            lookupSession = repository.login();
127            @SuppressWarnings("unchecked")
128            final Iterator<Event> filteredEvents = filter(events, eventFilter::test);
129            final Iterator<FedoraEvent> publishableEvents = eventMapper.apply(filteredEvents);
130            transform(publishableEvents, new GetNamespacedProperties(lookupSession)::apply)
131                .forEachRemaining(this::post);
132        } catch (final RepositoryException ex) {
133            throw new RepositoryRuntimeException(ex);
134        } finally {
135            if (lookupSession != null) {
136                lookupSession.logout();
137            }
138        }
139    }
140
141    private void post(final FedoraEvent evt) {
142        eventBus.post(evt);
143        EVENT_COUNTER.inc();
144    }
145}