001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.kernel.modeshape.observer;
019
020import static com.codahale.metrics.MetricRegistry.name;
021import static com.google.common.collect.Iterators.filter;
022import static java.util.stream.Collectors.toSet;
023import static java.util.stream.Stream.concat;
024import static java.util.stream.Stream.of;
025import static javax.jcr.observation.Event.NODE_ADDED;
026import static javax.jcr.observation.Event.NODE_MOVED;
027import static javax.jcr.observation.Event.NODE_REMOVED;
028import static javax.jcr.observation.Event.PROPERTY_ADDED;
029import static javax.jcr.observation.Event.PROPERTY_CHANGED;
030import static javax.jcr.observation.Event.PROPERTY_REMOVED;
031import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_BINARY;
032import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_CONTAINER;
033import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_REPOSITORY_ROOT;
034import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_RESOURCE;
035import static org.fcrepo.kernel.api.FedoraTypes.LDP_BASIC_CONTAINER;
036import static org.fcrepo.kernel.api.FedoraTypes.LDP_CONTAINER;
037import static org.fcrepo.kernel.api.FedoraTypes.LDP_NON_RDF_SOURCE;
038import static org.fcrepo.kernel.api.FedoraTypes.LDP_RDF_SOURCE;
039import static org.fcrepo.kernel.api.RdfLexicon.JCR_NAMESPACE;
040import static org.fcrepo.kernel.api.RdfLexicon.JCR_NT_NAMESPACE;
041import static org.fcrepo.kernel.api.RdfLexicon.MIX_NAMESPACE;
042import static org.fcrepo.kernel.api.RdfLexicon.MODE_NAMESPACE;
043import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_DELETION;
044import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_RELOCATION;
045import static org.fcrepo.kernel.modeshape.FedoraJcrConstants.ROOT;
046import static org.fcrepo.kernel.modeshape.utils.NamespaceTools.getNamespaceRegistry;
047import static org.fcrepo.kernel.modeshape.utils.UncheckedFunction.uncheck;
048import static org.fcrepo.kernel.modeshape.utils.StreamUtils.iteratorToStream;
049import static org.slf4j.LoggerFactory.getLogger;
050
051import  org.fcrepo.metrics.RegistryService;
052
053import java.util.Iterator;
054import java.util.Map;
055import java.util.Set;
056import java.util.function.Function;
057import java.util.stream.Stream;
058
059import javax.annotation.PostConstruct;
060import javax.annotation.PreDestroy;
061import javax.inject.Inject;
062import javax.jcr.NamespaceRegistry;
063import javax.jcr.RepositoryException;
064import javax.jcr.Session;
065import javax.jcr.observation.Event;
066import javax.jcr.observation.EventListener;
067
068import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
069import org.fcrepo.kernel.api.models.FedoraResource;
070import org.fcrepo.kernel.api.observer.FedoraEvent;
071import org.fcrepo.kernel.modeshape.FedoraResourceImpl;
072import org.fcrepo.kernel.modeshape.observer.eventmappings.InternalExternalEventMapper;
073
074import org.modeshape.jcr.api.Repository;
075import org.slf4j.Logger;
076
077import com.codahale.metrics.Counter;
078import com.google.common.collect.ImmutableSet;
079import com.google.common.eventbus.EventBus;
080
081/**
082 * Simple JCR EventListener that filters JCR Events through a Fedora EventFilter, maps the results through a mapper,
083 * and puts the resulting stream onto the internal Fedora EventBus as a stream of FedoraEvents.
084 *
085 * @author eddies
086 * @author ajs6f
087 * @since Feb 7, 2013
088 */
089public class SimpleObserver implements EventListener {
090
091    private static final Logger LOGGER = getLogger(SimpleObserver.class);
092
093    private static final Set<String> filteredNamespaces = ImmutableSet.of(
094            JCR_NAMESPACE, MIX_NAMESPACE, JCR_NT_NAMESPACE, MODE_NAMESPACE);
095
096    /**
097     * A simple counter of events that pass through this observer
098     */
099    static final Counter EVENT_COUNTER =
100            RegistryService.getInstance().getMetrics().counter(name(SimpleObserver.class, "onEvent"));
101
102    static final Integer EVENT_TYPES = NODE_ADDED + NODE_REMOVED + NODE_MOVED + PROPERTY_ADDED + PROPERTY_CHANGED
103            + PROPERTY_REMOVED;
104
105    /**
106     * Note: This function maps a FedoraEvent to a Stream of some number of FedoraEvents. This is because a MOVE event
107     * may lead to an arbitrarily large number of additional events for any child resources. In the event of this not
108     * being a MOVE event, the same FedoraEvent is returned, wrapped in a Stream. For a MOVEd resource, the resource in
109     * question will be translated to two FedoraEvents: a MOVED event for the new resource location and a REMOVED event
110     * corresponding to the old location. The same pair of FedoraEvents will also be generated for each child resource.
111     */
112    private static Function<FedoraEvent, Stream<FedoraEvent>> handleMoveEvents(final Session session) {
113        return evt -> {
114            if (evt.getTypes().contains(RESOURCE_RELOCATION)) {
115                final Map<String, String> movePath = evt.getInfo();
116                final String dest = movePath.get("destAbsPath");
117                final String src = movePath.get("srcAbsPath");
118                try {
119                    final FedoraResource resource = new FedoraResourceImpl(session.getNode(evt.getPath()));
120                    return concat(of(evt), resource.getChildren(true).map(FedoraResource::getPath)
121                        .flatMap(path -> of(
122                            new FedoraEventImpl(RESOURCE_RELOCATION, path, evt.getResourceTypes(), evt.getUserID(),
123                                evt.getDate(), evt.getInfo()),
124                            new FedoraEventImpl(RESOURCE_DELETION, path.replaceFirst(dest, src), evt.getResourceTypes(),
125                                evt.getUserID(), evt.getDate(), evt.getInfo()))));
126                } catch (final RepositoryException ex) {
127                    throw new RepositoryRuntimeException(ex);
128                }
129            }
130            return of(evt);
131        };
132    }
133
134    /**
135     * Note: Certain RDF types are generated dynamically. These are added here, based on
136     * certain type hints.
137     */
138    private static Function<String, Stream<String>> dynamicTypes = type -> {
139        if (type.equals(ROOT)) {
140            return of(FEDORA_REPOSITORY_ROOT, FEDORA_RESOURCE, FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE,
141                    LDP_BASIC_CONTAINER);
142        } else if (type.equals(FEDORA_CONTAINER)) {
143            return of(FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE);
144        } else if (type.equals(FEDORA_BINARY)) {
145            return of(FEDORA_BINARY, LDP_NON_RDF_SOURCE);
146        } else {
147            return of(type);
148        }
149    };
150
151    private static Function<FedoraEvent, FedoraEvent> filterAndDerefResourceTypes(final Session session) {
152        final NamespaceRegistry registry = getNamespaceRegistry(session);
153        return evt -> {
154            final Set<String> resourceTypes = evt.getResourceTypes().stream()
155                .flatMap(dynamicTypes).map(type -> type.split(":"))
156                .filter(pair -> pair.length == 2).map(uncheck(pair -> new String[]{registry.getURI(pair[0]), pair[1]}))
157                .filter(pair -> !filteredNamespaces.contains(pair[0])).map(pair -> pair[0] + pair[1]).collect(toSet());
158            return new FedoraEventImpl(evt.getTypes(), evt.getPath(), resourceTypes, evt.getUserID(),
159                    evt.getDate(), evt.getInfo());
160        };
161    }
162
163    @Inject
164    private Repository repository;
165
166    @Inject
167    private EventBus eventBus;
168
169    @Inject
170    private InternalExternalEventMapper eventMapper;
171
172    @Inject
173    private EventFilter eventFilter;
174
175    // THIS SESSION SHOULD NOT BE USED TO LOOK UP NODES
176    // it is used only to register and deregister this observer to the JCR
177    private Session session;
178
179    /**
180     * Register this observer with the JCR event listeners
181     *
182     * @throws RepositoryException if repository exception occurred
183     */
184    @PostConstruct
185    public void buildListener() throws RepositoryException {
186        LOGGER.debug("Constructing an observer for JCR events...");
187        session = repository.login();
188        session.getWorkspace().getObservationManager()
189                .addEventListener(this, EVENT_TYPES, "/", true, null, null, false);
190        session.save();
191    }
192
193    /**
194     * logout of the session
195     *
196     * @throws RepositoryException if repository exception occurred
197     */
198    @PreDestroy
199    public void stopListening() throws RepositoryException {
200        try {
201            LOGGER.debug("Destroying an observer for JCR events...");
202            session.getWorkspace().getObservationManager().removeEventListener(this);
203        } finally {
204            session.logout();
205        }
206    }
207
208    /**
209     * Filter JCR events and transform them into our own FedoraEvents.
210     *
211     * @param events the JCR events
212     */
213    @Override
214    public void onEvent(final javax.jcr.observation.EventIterator events) {
215        Session lookupSession = null;
216        try {
217            lookupSession = repository.login();
218
219            @SuppressWarnings("unchecked")
220            final Iterator<Event> filteredEvents = filter(events, eventFilter::test);
221            eventMapper.apply(iteratorToStream(filteredEvents))
222                .map(filterAndDerefResourceTypes(lookupSession))
223                .flatMap(handleMoveEvents(lookupSession))
224                .forEach(this::post);
225        } catch (final RepositoryException ex) {
226            throw new RepositoryRuntimeException(ex);
227        } finally {
228            if (lookupSession != null) {
229                lookupSession.logout();
230            }
231        }
232    }
233
234    private void post(final FedoraEvent evt) {
235        eventBus.post(evt);
236        EVENT_COUNTER.inc();
237    }
238}