001/*
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.modeshape.observer;
017
018import static com.codahale.metrics.MetricRegistry.name;
019import static com.google.common.collect.Iterators.filter;
020import static java.util.Collections.emptyMap;
021import static java.util.stream.Stream.concat;
022import static java.util.stream.Stream.of;
023import static javax.jcr.observation.Event.NODE_ADDED;
024import static javax.jcr.observation.Event.NODE_MOVED;
025import static javax.jcr.observation.Event.NODE_REMOVED;
026import static javax.jcr.observation.Event.PROPERTY_ADDED;
027import static javax.jcr.observation.Event.PROPERTY_CHANGED;
028import static javax.jcr.observation.Event.PROPERTY_REMOVED;
029import static org.fcrepo.kernel.modeshape.rdf.JcrRdfTools.getRDFNamespaceForJcrNamespace;
030import static org.fcrepo.kernel.modeshape.utils.FedoraTypesUtils.isPublicJcrProperty;
031import static org.fcrepo.kernel.modeshape.utils.NamespaceTools.getNamespaceRegistry;
032import static org.fcrepo.kernel.modeshape.utils.StreamUtils.iteratorToStream;
033import static org.slf4j.LoggerFactory.getLogger;
034
035import  org.fcrepo.metrics.RegistryService;
036
037import java.util.Iterator;
038import java.util.Map;
039import java.util.function.Function;
040import java.util.stream.Stream;
041
042import javax.annotation.PostConstruct;
043import javax.annotation.PreDestroy;
044import javax.inject.Inject;
045import javax.jcr.NamespaceRegistry;
046import javax.jcr.RepositoryException;
047import javax.jcr.Session;
048import javax.jcr.observation.Event;
049import javax.jcr.observation.EventListener;
050
051import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
052import org.fcrepo.kernel.api.models.FedoraResource;
053import org.fcrepo.kernel.api.observer.FedoraEvent;
054import org.fcrepo.kernel.api.observer.EventType;
055import org.fcrepo.kernel.modeshape.FedoraResourceImpl;
056import org.fcrepo.kernel.modeshape.observer.eventmappings.InternalExternalEventMapper;
057
058import org.modeshape.jcr.api.Repository;
059import org.slf4j.Logger;
060
061import com.codahale.metrics.Counter;
062import com.google.common.eventbus.EventBus;
063
064/**
065 * Simple JCR EventListener that filters JCR Events through a Fedora EventFilter, maps the results through a mapper,
066 * and puts the resulting stream onto the internal Fedora EventBus as a stream of FedoraEvents.
067 *
068 * @author eddies
069 * @author ajs6f
070 * @since Feb 7, 2013
071 */
072public class SimpleObserver implements EventListener {
073
074    private static final Logger LOGGER = getLogger(SimpleObserver.class);
075
076    /**
077     * A simple counter of events that pass through this observer
078     */
079    static final Counter EVENT_COUNTER =
080            RegistryService.getInstance().getMetrics().counter(name(SimpleObserver.class, "onEvent"));
081
082    static final Integer EVENT_TYPES = NODE_ADDED + NODE_REMOVED + NODE_MOVED + PROPERTY_ADDED + PROPERTY_CHANGED
083            + PROPERTY_REMOVED;
084
085    /**
086     * Note: In order to resolve the JCR-based properties (e.g. jcr:lastModified or
087     * ebucore:mimeType) a new FedoarEvent is created and the dereferenced properties
088     * are added to that new event.
089     */
090    private static Function<Session, Function<FedoraEvent, FedoraEvent>> namespaceResolver = session -> evt -> {
091        final NamespaceRegistry namespaceRegistry = getNamespaceRegistry(session);
092        final FedoraEvent event = new FedoraEventImpl(evt.getTypes(), evt.getPath(), evt.getUserID(),
093                evt.getUserData(), evt.getDate(), evt.getInfo());
094
095        evt.getProperties().stream()
096            .filter(isPublicJcrProperty.or(property -> !property.startsWith("jcr:")))
097            .forEach(property -> {
098                final String[] parts = property.split(":", 2);
099                if (parts.length == 2) {
100                    try {
101                        event.addProperty(
102                            getRDFNamespaceForJcrNamespace(namespaceRegistry.getURI(parts[0])) + parts[1]);
103                    } catch (final RepositoryException ex) {
104                        LOGGER.warn("Prefix could not be dereferenced using the namespace registry, skipping: {}",
105                            property);
106                    }
107                } else {
108                    LOGGER.warn("Prefix could not be determined, skipping: {}", property);
109                }
110            });
111        return event;
112    };
113
114    /**
115     * Note: This function maps a FedoraEvent to a Stream of some number of FedoraEvents. This is because a MOVE event
116     * may lead to an arbitrarily large number of additional events for any child resources. In the event of this not
117     * being a MOVE event, the same FedoraEvent is returned, wrapped in a Stream. For a MOVEd resource, the resource in
118     * question will be translated to two FedoraEvents: a MOVED event for the new resource location and a REMOVED event
119     * corresponding to the old location. The same pair of FedoraEvents will also be generated for each child resource.
120     */
121    private static Function<Session, Function<FedoraEvent, Stream<FedoraEvent>>> handleMoveEvents = session -> evt -> {
122        if (evt.getTypes().contains(EventType.NODE_MOVED)) {
123            final Map<String, String> movePath = evt.getInfo();
124            final String dest = movePath.get("destAbsPath");
125            final String src = movePath.get("srcAbsPath");
126            try {
127                final FedoraResource resource = new FedoraResourceImpl(session.getNode(evt.getPath()));
128                return concat(of(evt), resource.getChildren(true).map(FedoraResource::getPath)
129                    .flatMap(path -> of(
130                        new FedoraEventImpl(EventType.NODE_MOVED, path, evt.getUserID(),
131                            evt.getUserData(), evt.getDate(), emptyMap()),
132                        new FedoraEventImpl(EventType.NODE_REMOVED, path.replaceFirst(dest, src),
133                            evt.getUserID(), evt.getUserData(), evt.getDate(), emptyMap()))));
134            } catch (final RepositoryException ex) {
135                throw new RepositoryRuntimeException(ex);
136            }
137        }
138        return of(evt);
139    };
140
141    @Inject
142    private Repository repository;
143
144    @Inject
145    private EventBus eventBus;
146
147    @Inject
148    private InternalExternalEventMapper eventMapper;
149
150    @Inject
151    private EventFilter eventFilter;
152
153    // THIS SESSION SHOULD NOT BE USED TO LOOK UP NODES
154    // it is used only to register and deregister this observer to the JCR
155    private Session session;
156
157    /**
158     * Register this observer with the JCR event listeners
159     *
160     * @throws RepositoryException if repository exception occurred
161     */
162    @PostConstruct
163    public void buildListener() throws RepositoryException {
164        LOGGER.debug("Constructing an observer for JCR events...");
165        session = repository.login();
166        session.getWorkspace().getObservationManager()
167                .addEventListener(this, EVENT_TYPES, "/", true, null, null, false);
168        session.save();
169    }
170
171    /**
172     * logout of the session
173     *
174     * @throws RepositoryException if repository exception occurred
175     */
176    @PreDestroy
177    public void stopListening() throws RepositoryException {
178        try {
179            LOGGER.debug("Destroying an observer for JCR events...");
180            session.getWorkspace().getObservationManager().removeEventListener(this);
181        } finally {
182            session.logout();
183        }
184    }
185
186    /**
187     * Filter JCR events and transform them into our own FedoraEvents.
188     *
189     * @param events the JCR events
190     */
191    @Override
192    public void onEvent(final javax.jcr.observation.EventIterator events) {
193        Session lookupSession = null;
194        try {
195            lookupSession = repository.login();
196            final Function<FedoraEvent, Stream<FedoraEvent>> moveEventHandler = handleMoveEvents.apply(lookupSession);
197
198            @SuppressWarnings("unchecked")
199            final Iterator<Event> filteredEvents = filter(events, eventFilter::test);
200            eventMapper.apply(iteratorToStream(filteredEvents))
201                .flatMap(moveEventHandler)
202                .map(namespaceResolver.apply(lookupSession))
203                .forEach(this::post);
204        } catch (final RepositoryException ex) {
205            throw new RepositoryRuntimeException(ex);
206        } finally {
207            if (lookupSession != null) {
208                lookupSession.logout();
209            }
210        }
211    }
212
213    private void post(final FedoraEvent evt) {
214        eventBus.post(evt);
215        EVENT_COUNTER.inc();
216    }
217}