001/* 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.kernel.modeshape.observer; 017 018import static com.codahale.metrics.MetricRegistry.name; 019import static com.google.common.collect.Iterators.filter; 020import static java.util.Collections.emptyMap; 021import static java.util.stream.Stream.concat; 022import static java.util.stream.Stream.of; 023import static javax.jcr.observation.Event.NODE_ADDED; 024import static javax.jcr.observation.Event.NODE_MOVED; 025import static javax.jcr.observation.Event.NODE_REMOVED; 026import static javax.jcr.observation.Event.PROPERTY_ADDED; 027import static javax.jcr.observation.Event.PROPERTY_CHANGED; 028import static javax.jcr.observation.Event.PROPERTY_REMOVED; 029import static org.fcrepo.kernel.modeshape.rdf.JcrRdfTools.getRDFNamespaceForJcrNamespace; 030import static org.fcrepo.kernel.modeshape.utils.FedoraTypesUtils.isPublicJcrProperty; 031import static org.fcrepo.kernel.modeshape.utils.NamespaceTools.getNamespaceRegistry; 032import static org.fcrepo.kernel.modeshape.utils.StreamUtils.iteratorToStream; 033import static org.slf4j.LoggerFactory.getLogger; 034 035import org.fcrepo.metrics.RegistryService; 036 037import java.util.Iterator; 038import java.util.Map; 039import java.util.function.Function; 040import java.util.stream.Stream; 041 042import javax.annotation.PostConstruct; 043import javax.annotation.PreDestroy; 044import javax.inject.Inject; 045import javax.jcr.NamespaceRegistry; 046import javax.jcr.RepositoryException; 047import javax.jcr.Session; 048import javax.jcr.observation.Event; 049import javax.jcr.observation.EventListener; 050 051import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 052import org.fcrepo.kernel.api.models.FedoraResource; 053import org.fcrepo.kernel.api.observer.FedoraEvent; 054import org.fcrepo.kernel.api.observer.EventType; 055import org.fcrepo.kernel.modeshape.FedoraResourceImpl; 056import org.fcrepo.kernel.modeshape.observer.eventmappings.InternalExternalEventMapper; 057 058import org.modeshape.jcr.api.Repository; 059import org.slf4j.Logger; 060 061import com.codahale.metrics.Counter; 062import com.google.common.eventbus.EventBus; 063 064/** 065 * Simple JCR EventListener that filters JCR Events through a Fedora EventFilter, maps the results through a mapper, 066 * and puts the resulting stream onto the internal Fedora EventBus as a stream of FedoraEvents. 067 * 068 * @author eddies 069 * @author ajs6f 070 * @since Feb 7, 2013 071 */ 072public class SimpleObserver implements EventListener { 073 074 private static final Logger LOGGER = getLogger(SimpleObserver.class); 075 076 /** 077 * A simple counter of events that pass through this observer 078 */ 079 static final Counter EVENT_COUNTER = 080 RegistryService.getInstance().getMetrics().counter(name(SimpleObserver.class, "onEvent")); 081 082 static final Integer EVENT_TYPES = NODE_ADDED + NODE_REMOVED + NODE_MOVED + PROPERTY_ADDED + PROPERTY_CHANGED 083 + PROPERTY_REMOVED; 084 085 /** 086 * Note: In order to resolve the JCR-based properties (e.g. jcr:lastModified or 087 * ebucore:mimeType) a new FedoarEvent is created and the dereferenced properties 088 * are added to that new event. 089 */ 090 private static Function<Session, Function<FedoraEvent, FedoraEvent>> namespaceResolver = session -> evt -> { 091 final NamespaceRegistry namespaceRegistry = getNamespaceRegistry(session); 092 final FedoraEvent event = new FedoraEventImpl(evt.getTypes(), evt.getPath(), evt.getUserID(), 093 evt.getUserData(), evt.getDate(), evt.getInfo()); 094 095 evt.getProperties().stream() 096 .filter(isPublicJcrProperty.or(property -> !property.startsWith("jcr:"))) 097 .forEach(property -> { 098 final String[] parts = property.split(":", 2); 099 if (parts.length == 2) { 100 try { 101 event.addProperty( 102 getRDFNamespaceForJcrNamespace(namespaceRegistry.getURI(parts[0])) + parts[1]); 103 } catch (final RepositoryException ex) { 104 LOGGER.warn("Prefix could not be dereferenced using the namespace registry, skipping: {}", 105 property); 106 } 107 } else { 108 LOGGER.warn("Prefix could not be determined, skipping: {}", property); 109 } 110 }); 111 return event; 112 }; 113 114 /** 115 * Note: This function maps a FedoraEvent to a Stream of some number of FedoraEvents. This is because a MOVE event 116 * may lead to an arbitrarily large number of additional events for any child resources. In the event of this not 117 * being a MOVE event, the same FedoraEvent is returned, wrapped in a Stream. For a MOVEd resource, the resource in 118 * question will be translated to two FedoraEvents: a MOVED event for the new resource location and a REMOVED event 119 * corresponding to the old location. The same pair of FedoraEvents will also be generated for each child resource. 120 */ 121 private static Function<Session, Function<FedoraEvent, Stream<FedoraEvent>>> handleMoveEvents = session -> evt -> { 122 if (evt.getTypes().contains(EventType.NODE_MOVED)) { 123 final Map<String, String> movePath = evt.getInfo(); 124 final String dest = movePath.get("destAbsPath"); 125 final String src = movePath.get("srcAbsPath"); 126 try { 127 final FedoraResource resource = new FedoraResourceImpl(session.getNode(evt.getPath())); 128 return concat(of(evt), resource.getChildren(true).map(FedoraResource::getPath) 129 .flatMap(path -> of( 130 new FedoraEventImpl(EventType.NODE_MOVED, path, evt.getUserID(), 131 evt.getUserData(), evt.getDate(), emptyMap()), 132 new FedoraEventImpl(EventType.NODE_REMOVED, path.replaceFirst(dest, src), 133 evt.getUserID(), evt.getUserData(), evt.getDate(), emptyMap())))); 134 } catch (final RepositoryException ex) { 135 throw new RepositoryRuntimeException(ex); 136 } 137 } 138 return of(evt); 139 }; 140 141 @Inject 142 private Repository repository; 143 144 @Inject 145 private EventBus eventBus; 146 147 @Inject 148 private InternalExternalEventMapper eventMapper; 149 150 @Inject 151 private EventFilter eventFilter; 152 153 // THIS SESSION SHOULD NOT BE USED TO LOOK UP NODES 154 // it is used only to register and deregister this observer to the JCR 155 private Session session; 156 157 /** 158 * Register this observer with the JCR event listeners 159 * 160 * @throws RepositoryException if repository exception occurred 161 */ 162 @PostConstruct 163 public void buildListener() throws RepositoryException { 164 LOGGER.debug("Constructing an observer for JCR events..."); 165 session = repository.login(); 166 session.getWorkspace().getObservationManager() 167 .addEventListener(this, EVENT_TYPES, "/", true, null, null, false); 168 session.save(); 169 } 170 171 /** 172 * logout of the session 173 * 174 * @throws RepositoryException if repository exception occurred 175 */ 176 @PreDestroy 177 public void stopListening() throws RepositoryException { 178 try { 179 LOGGER.debug("Destroying an observer for JCR events..."); 180 session.getWorkspace().getObservationManager().removeEventListener(this); 181 } finally { 182 session.logout(); 183 } 184 } 185 186 /** 187 * Filter JCR events and transform them into our own FedoraEvents. 188 * 189 * @param events the JCR events 190 */ 191 @Override 192 public void onEvent(final javax.jcr.observation.EventIterator events) { 193 Session lookupSession = null; 194 try { 195 lookupSession = repository.login(); 196 final Function<FedoraEvent, Stream<FedoraEvent>> moveEventHandler = handleMoveEvents.apply(lookupSession); 197 198 @SuppressWarnings("unchecked") 199 final Iterator<Event> filteredEvents = filter(events, eventFilter::test); 200 eventMapper.apply(iteratorToStream(filteredEvents)) 201 .flatMap(moveEventHandler) 202 .map(namespaceResolver.apply(lookupSession)) 203 .forEach(this::post); 204 } catch (final RepositoryException ex) { 205 throw new RepositoryRuntimeException(ex); 206 } finally { 207 if (lookupSession != null) { 208 lookupSession.logout(); 209 } 210 } 211 } 212 213 private void post(final FedoraEvent evt) { 214 eventBus.post(evt); 215 EVENT_COUNTER.inc(); 216 } 217}