001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.kernel.modeshape.observer;
019
020import static com.google.common.collect.Iterators.filter;
021import static java.util.stream.Collectors.toSet;
022import static java.util.stream.Stream.concat;
023import static java.util.stream.Stream.of;
024import static javax.jcr.observation.Event.NODE_ADDED;
025import static javax.jcr.observation.Event.NODE_MOVED;
026import static javax.jcr.observation.Event.NODE_REMOVED;
027import static javax.jcr.observation.Event.PROPERTY_ADDED;
028import static javax.jcr.observation.Event.PROPERTY_CHANGED;
029import static javax.jcr.observation.Event.PROPERTY_REMOVED;
030import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_BINARY;
031import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_CONTAINER;
032import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_NON_RDF_SOURCE_DESCRIPTION;
033import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_REPOSITORY_ROOT;
034import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_RESOURCE;
035import static org.fcrepo.kernel.api.FedoraTypes.LDP_BASIC_CONTAINER;
036import static org.fcrepo.kernel.api.FedoraTypes.LDP_CONTAINER;
037import static org.fcrepo.kernel.api.FedoraTypes.LDP_NON_RDF_SOURCE;
038import static org.fcrepo.kernel.api.FedoraTypes.LDP_RDF_SOURCE;
039import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_DELETION;
040import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_RELOCATION;
041import static org.fcrepo.kernel.modeshape.FedoraSessionImpl.getJcrSession;
042import static org.fcrepo.kernel.modeshape.FedoraJcrConstants.ROOT;
043import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.JCR_NAMESPACE;
044import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.JCR_NT_NAMESPACE;
045import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.MIX_NAMESPACE;
046import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.MODE_NAMESPACE;
047import static org.fcrepo.kernel.modeshape.utils.NamespaceTools.getNamespaceRegistry;
048import static org.fcrepo.kernel.modeshape.utils.UncheckedFunction.uncheck;
049import static org.fcrepo.kernel.modeshape.utils.StreamUtils.iteratorToStream;
050import static org.slf4j.LoggerFactory.getLogger;
051
052import java.util.Iterator;
053import java.util.Map;
054import java.util.Set;
055import java.util.function.Function;
056import java.util.stream.Stream;
057
058import javax.annotation.PostConstruct;
059import javax.annotation.PreDestroy;
060import javax.inject.Inject;
061import javax.jcr.NamespaceRegistry;
062import javax.jcr.RepositoryException;
063import javax.jcr.Session;
064import javax.jcr.observation.Event;
065import javax.jcr.observation.EventListener;
066
067import org.fcrepo.kernel.api.FedoraRepository;
068import org.fcrepo.kernel.api.FedoraSession;
069import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
070import org.fcrepo.kernel.api.models.FedoraResource;
071import org.fcrepo.kernel.api.observer.FedoraEvent;
072import org.fcrepo.kernel.modeshape.FedoraResourceImpl;
073import org.fcrepo.kernel.modeshape.FedoraSessionImpl;
074import org.fcrepo.kernel.modeshape.observer.eventmappings.InternalExternalEventMapper;
075import org.fcrepo.kernel.modeshape.utils.FedoraSessionUserUtil;
076
077import org.slf4j.Logger;
078
079import com.google.common.collect.ImmutableSet;
080import com.google.common.eventbus.EventBus;
081
082/**
083 * Simple JCR EventListener that filters JCR Events through a Fedora EventFilter, maps the results through a mapper,
084 * and puts the resulting stream onto the internal Fedora EventBus as a stream of FedoraEvents.
085 *
086 * @author eddies
087 * @author ajs6f
088 * @since Feb 7, 2013
089 */
090public class SimpleObserver implements EventListener {
091
092    private static final Logger LOGGER = getLogger(SimpleObserver.class);
093
094    private static final Set<String> filteredNamespaces = ImmutableSet.of(
095            JCR_NAMESPACE, MIX_NAMESPACE, JCR_NT_NAMESPACE, MODE_NAMESPACE);
096
097    static final Integer EVENT_TYPES = NODE_ADDED + NODE_REMOVED + NODE_MOVED + PROPERTY_ADDED + PROPERTY_CHANGED
098            + PROPERTY_REMOVED;
099
100    /**
101     * Note: This function maps a FedoraEvent to a Stream of some number of FedoraEvents. This is because a MOVE event
102     * may lead to an arbitrarily large number of additional events for any child resources. In the event of this not
103     * being a MOVE event, the same FedoraEvent is returned, wrapped in a Stream. For a MOVEd resource, the resource in
104     * question will be translated to two FedoraEvents: a MOVED event for the new resource location and a REMOVED event
105     * corresponding to the old location. The same pair of FedoraEvents will also be generated for each child resource.
106     */
107    private static Function<FedoraEvent, Stream<FedoraEvent>> handleMoveEvents(final Session session) {
108        return evt -> {
109            if (evt.getTypes().contains(RESOURCE_RELOCATION)) {
110                final Map<String, String> movePath = evt.getInfo();
111                final String dest = movePath.get("destAbsPath");
112                final String src = movePath.get("srcAbsPath");
113                final FedoraSession fsession = new FedoraSessionImpl(session);
114
115                try {
116                    final FedoraResource resource = new FedoraResourceImpl(session.getNode(evt.getPath()));
117                    return concat(of(evt), resource.getChildren(true).map(FedoraResource::getPath)
118                        .flatMap(path -> of(
119                            new FedoraEventImpl(RESOURCE_RELOCATION, path, evt.getResourceTypes(), evt.getUserID(),
120                                fsession.getUserURI(), evt.getDate(), evt.getInfo()),
121                            new FedoraEventImpl(RESOURCE_DELETION, path.replaceFirst(dest, src), evt.getResourceTypes(),
122                                evt.getUserID(), fsession.getUserURI(), evt.getDate(), evt.getInfo()))));
123                } catch (final RepositoryException ex) {
124                    throw new RepositoryRuntimeException(ex);
125                }
126            }
127            return of(evt);
128        };
129    }
130
131    /**
132     * Note: Certain RDF types are generated dynamically. These are added here, based on
133     * certain type hints.
134     */
135    private static final Function<String, Stream<String>> dynamicTypes = type -> {
136        if (type.equals(ROOT)) {
137            return of(FEDORA_REPOSITORY_ROOT, FEDORA_RESOURCE, FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE,
138                    LDP_BASIC_CONTAINER);
139        } else if (type.equals(FEDORA_CONTAINER)) {
140            return of(FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE);
141        } else if (type.equals(FEDORA_BINARY)) {
142            return of(FEDORA_BINARY, LDP_NON_RDF_SOURCE);
143        } else if (type.equals(FEDORA_NON_RDF_SOURCE_DESCRIPTION)) {
144            return of(FEDORA_BINARY, LDP_NON_RDF_SOURCE);
145        } else {
146            return of(type);
147        }
148    };
149
150    private static Function<FedoraEvent, FedoraEvent> filterAndDerefResourceTypes(final Session session) {
151        final NamespaceRegistry registry = getNamespaceRegistry(session);
152        return evt -> {
153            final Set<String> resourceTypes = evt.getResourceTypes().stream()
154                .flatMap(dynamicTypes).map(type -> type.split(":"))
155                .filter(pair -> pair.length == 2).map(uncheck(pair -> new String[]{registry.getURI(pair[0]), pair[1]}))
156                .filter(pair -> !filteredNamespaces.contains(pair[0])).map(pair -> pair[0] + pair[1]).collect(toSet());
157            return new FedoraEventImpl(evt.getTypes(), evt.getPath(), resourceTypes, evt.getUserID(),
158                    FedoraSessionUserUtil.getUserURI(evt.getUserID()), evt.getDate(), evt.getInfo());
159        };
160    }
161
162    @Inject
163    private FedoraRepository repository;
164
165    @Inject
166    private EventBus eventBus;
167
168    @Inject
169    private InternalExternalEventMapper eventMapper;
170
171    @Inject
172    private EventFilter eventFilter;
173
174    // THIS SESSION SHOULD NOT BE USED TO LOOK UP NODES
175    // it is used only to register and deregister this observer to the JCR
176    private Session session;
177
178    /**
179     * Register this observer with the JCR event listeners
180     *
181     * @throws RepositoryException if repository exception occurred
182     */
183    @PostConstruct
184    public void buildListener() throws RepositoryException {
185        LOGGER.debug("Constructing an observer for JCR events...");
186        session = getJcrSession(repository.login());
187        session.getWorkspace().getObservationManager()
188                .addEventListener(this, EVENT_TYPES, "/", true, null, null, false);
189        session.save();
190    }
191
192    /**
193     * logout of the session
194     *
195     * @throws RepositoryException if repository exception occurred
196     */
197    @PreDestroy
198    public void stopListening() throws RepositoryException {
199        try {
200            LOGGER.debug("Destroying an observer for JCR events...");
201            session.getWorkspace().getObservationManager().removeEventListener(this);
202        } finally {
203            session.logout();
204        }
205    }
206
207    /**
208     * Filter JCR events and transform them into our own FedoraEvents.
209     *
210     * @param events the JCR events
211     */
212    @Override
213    public void onEvent(final javax.jcr.observation.EventIterator events) {
214        Session lookupSession = null;
215        try {
216            lookupSession = getJcrSession(repository.login());
217
218            @SuppressWarnings("unchecked")
219            final Iterator<Event> filteredEvents = filter(events, eventFilter::test);
220            eventMapper.apply(iteratorToStream(filteredEvents))
221                .map(filterAndDerefResourceTypes(lookupSession))
222                .flatMap(handleMoveEvents(lookupSession))
223                .forEach(this::post);
224        } finally {
225            if (lookupSession != null) {
226                lookupSession.logout();
227            }
228        }
229    }
230
231    private void post(final FedoraEvent evt) {
232        eventBus.post(evt);
233    }
234}