001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.kernel.modeshape.observer;
019
020import static com.codahale.metrics.MetricRegistry.name;
021import static com.google.common.collect.Iterators.filter;
022import static java.util.stream.Collectors.toSet;
023import static java.util.stream.Stream.concat;
024import static java.util.stream.Stream.of;
025import static javax.jcr.observation.Event.NODE_ADDED;
026import static javax.jcr.observation.Event.NODE_MOVED;
027import static javax.jcr.observation.Event.NODE_REMOVED;
028import static javax.jcr.observation.Event.PROPERTY_ADDED;
029import static javax.jcr.observation.Event.PROPERTY_CHANGED;
030import static javax.jcr.observation.Event.PROPERTY_REMOVED;
031import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_BINARY;
032import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_CONTAINER;
033import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_REPOSITORY_ROOT;
034import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_RESOURCE;
035import static org.fcrepo.kernel.api.FedoraTypes.LDP_BASIC_CONTAINER;
036import static org.fcrepo.kernel.api.FedoraTypes.LDP_CONTAINER;
037import static org.fcrepo.kernel.api.FedoraTypes.LDP_NON_RDF_SOURCE;
038import static org.fcrepo.kernel.api.FedoraTypes.LDP_RDF_SOURCE;
039import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_DELETION;
040import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_RELOCATION;
041import static org.fcrepo.kernel.modeshape.FedoraSessionImpl.getJcrSession;
042import static org.fcrepo.kernel.modeshape.FedoraJcrConstants.ROOT;
043import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.JCR_NAMESPACE;
044import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.JCR_NT_NAMESPACE;
045import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.MIX_NAMESPACE;
046import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.MODE_NAMESPACE;
047import static org.fcrepo.kernel.modeshape.utils.NamespaceTools.getNamespaceRegistry;
048import static org.fcrepo.kernel.modeshape.utils.UncheckedFunction.uncheck;
049import static org.fcrepo.kernel.modeshape.utils.StreamUtils.iteratorToStream;
050import static org.slf4j.LoggerFactory.getLogger;
051
052import  org.fcrepo.metrics.RegistryService;
053
054import java.util.Iterator;
055import java.util.Map;
056import java.util.Set;
057import java.util.function.Function;
058import java.util.stream.Stream;
059
060import javax.annotation.PostConstruct;
061import javax.annotation.PreDestroy;
062import javax.inject.Inject;
063import javax.jcr.NamespaceRegistry;
064import javax.jcr.RepositoryException;
065import javax.jcr.Session;
066import javax.jcr.observation.Event;
067import javax.jcr.observation.EventListener;
068
069import org.fcrepo.kernel.api.FedoraRepository;
070import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
071import org.fcrepo.kernel.api.models.FedoraResource;
072import org.fcrepo.kernel.api.observer.FedoraEvent;
073import org.fcrepo.kernel.modeshape.FedoraResourceImpl;
074import org.fcrepo.kernel.modeshape.observer.eventmappings.InternalExternalEventMapper;
075
076import org.slf4j.Logger;
077
078import com.codahale.metrics.Counter;
079import com.google.common.collect.ImmutableSet;
080import com.google.common.eventbus.EventBus;
081
082/**
083 * Simple JCR EventListener that filters JCR Events through a Fedora EventFilter, maps the results through a mapper,
084 * and puts the resulting stream onto the internal Fedora EventBus as a stream of FedoraEvents.
085 *
086 * @author eddies
087 * @author ajs6f
088 * @since Feb 7, 2013
089 */
090public class SimpleObserver implements EventListener {
091
092    private static final Logger LOGGER = getLogger(SimpleObserver.class);
093
094    private static final Set<String> filteredNamespaces = ImmutableSet.of(
095            JCR_NAMESPACE, MIX_NAMESPACE, JCR_NT_NAMESPACE, MODE_NAMESPACE);
096
097    /**
098     * A simple counter of events that pass through this observer
099     */
100    static final Counter EVENT_COUNTER =
101            RegistryService.getInstance().getMetrics().counter(name(SimpleObserver.class, "onEvent"));
102
103    static final Integer EVENT_TYPES = NODE_ADDED + NODE_REMOVED + NODE_MOVED + PROPERTY_ADDED + PROPERTY_CHANGED
104            + PROPERTY_REMOVED;
105
106    /**
107     * Note: This function maps a FedoraEvent to a Stream of some number of FedoraEvents. This is because a MOVE event
108     * may lead to an arbitrarily large number of additional events for any child resources. In the event of this not
109     * being a MOVE event, the same FedoraEvent is returned, wrapped in a Stream. For a MOVEd resource, the resource in
110     * question will be translated to two FedoraEvents: a MOVED event for the new resource location and a REMOVED event
111     * corresponding to the old location. The same pair of FedoraEvents will also be generated for each child resource.
112     */
113    private static Function<FedoraEvent, Stream<FedoraEvent>> handleMoveEvents(final Session session) {
114        return evt -> {
115            if (evt.getTypes().contains(RESOURCE_RELOCATION)) {
116                final Map<String, String> movePath = evt.getInfo();
117                final String dest = movePath.get("destAbsPath");
118                final String src = movePath.get("srcAbsPath");
119                try {
120                    final FedoraResource resource = new FedoraResourceImpl(session.getNode(evt.getPath()));
121                    return concat(of(evt), resource.getChildren(true).map(FedoraResource::getPath)
122                        .flatMap(path -> of(
123                            new FedoraEventImpl(RESOURCE_RELOCATION, path, evt.getResourceTypes(), evt.getUserID(),
124                                evt.getDate(), evt.getInfo()),
125                            new FedoraEventImpl(RESOURCE_DELETION, path.replaceFirst(dest, src), evt.getResourceTypes(),
126                                evt.getUserID(), evt.getDate(), evt.getInfo()))));
127                } catch (final RepositoryException ex) {
128                    throw new RepositoryRuntimeException(ex);
129                }
130            }
131            return of(evt);
132        };
133    }
134
135    /**
136     * Note: Certain RDF types are generated dynamically. These are added here, based on
137     * certain type hints.
138     */
139    private static Function<String, Stream<String>> dynamicTypes = type -> {
140        if (type.equals(ROOT)) {
141            return of(FEDORA_REPOSITORY_ROOT, FEDORA_RESOURCE, FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE,
142                    LDP_BASIC_CONTAINER);
143        } else if (type.equals(FEDORA_CONTAINER)) {
144            return of(FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE);
145        } else if (type.equals(FEDORA_BINARY)) {
146            return of(FEDORA_BINARY, LDP_NON_RDF_SOURCE);
147        } else {
148            return of(type);
149        }
150    };
151
152    private static Function<FedoraEvent, FedoraEvent> filterAndDerefResourceTypes(final Session session) {
153        final NamespaceRegistry registry = getNamespaceRegistry(session);
154        return evt -> {
155            final Set<String> resourceTypes = evt.getResourceTypes().stream()
156                .flatMap(dynamicTypes).map(type -> type.split(":"))
157                .filter(pair -> pair.length == 2).map(uncheck(pair -> new String[]{registry.getURI(pair[0]), pair[1]}))
158                .filter(pair -> !filteredNamespaces.contains(pair[0])).map(pair -> pair[0] + pair[1]).collect(toSet());
159            return new FedoraEventImpl(evt.getTypes(), evt.getPath(), resourceTypes, evt.getUserID(),
160                    evt.getDate(), evt.getInfo());
161        };
162    }
163
164    @Inject
165    private FedoraRepository repository;
166
167    @Inject
168    private EventBus eventBus;
169
170    @Inject
171    private InternalExternalEventMapper eventMapper;
172
173    @Inject
174    private EventFilter eventFilter;
175
176    // THIS SESSION SHOULD NOT BE USED TO LOOK UP NODES
177    // it is used only to register and deregister this observer to the JCR
178    private Session session;
179
180    /**
181     * Register this observer with the JCR event listeners
182     *
183     * @throws RepositoryException if repository exception occurred
184     */
185    @PostConstruct
186    public void buildListener() throws RepositoryException {
187        LOGGER.debug("Constructing an observer for JCR events...");
188        session = getJcrSession(repository.login());
189        session.getWorkspace().getObservationManager()
190                .addEventListener(this, EVENT_TYPES, "/", true, null, null, false);
191        session.save();
192    }
193
194    /**
195     * logout of the session
196     *
197     * @throws RepositoryException if repository exception occurred
198     */
199    @PreDestroy
200    public void stopListening() throws RepositoryException {
201        try {
202            LOGGER.debug("Destroying an observer for JCR events...");
203            session.getWorkspace().getObservationManager().removeEventListener(this);
204        } finally {
205            session.logout();
206        }
207    }
208
209    /**
210     * Filter JCR events and transform them into our own FedoraEvents.
211     *
212     * @param events the JCR events
213     */
214    @Override
215    public void onEvent(final javax.jcr.observation.EventIterator events) {
216        Session lookupSession = null;
217        try {
218            lookupSession = getJcrSession(repository.login());
219
220            @SuppressWarnings("unchecked")
221            final Iterator<Event> filteredEvents = filter(events, eventFilter::test);
222            eventMapper.apply(iteratorToStream(filteredEvents))
223                .map(filterAndDerefResourceTypes(lookupSession))
224                .flatMap(handleMoveEvents(lookupSession))
225                .forEach(this::post);
226        } finally {
227            if (lookupSession != null) {
228                lookupSession.logout();
229            }
230        }
231    }
232
233    private void post(final FedoraEvent evt) {
234        eventBus.post(evt);
235        EVENT_COUNTER.inc();
236    }
237}