001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.kernel.modeshape.observer; 019 020import static com.codahale.metrics.MetricRegistry.name; 021import static com.google.common.collect.Iterators.filter; 022import static java.util.stream.Collectors.toSet; 023import static java.util.stream.Stream.concat; 024import static java.util.stream.Stream.of; 025import static javax.jcr.observation.Event.NODE_ADDED; 026import static javax.jcr.observation.Event.NODE_MOVED; 027import static javax.jcr.observation.Event.NODE_REMOVED; 028import static javax.jcr.observation.Event.PROPERTY_ADDED; 029import static javax.jcr.observation.Event.PROPERTY_CHANGED; 030import static javax.jcr.observation.Event.PROPERTY_REMOVED; 031import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_BINARY; 032import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_CONTAINER; 033import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_REPOSITORY_ROOT; 034import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_RESOURCE; 035import static org.fcrepo.kernel.api.FedoraTypes.LDP_BASIC_CONTAINER; 036import static org.fcrepo.kernel.api.FedoraTypes.LDP_CONTAINER; 037import static org.fcrepo.kernel.api.FedoraTypes.LDP_NON_RDF_SOURCE; 038import static org.fcrepo.kernel.api.FedoraTypes.LDP_RDF_SOURCE; 039import static org.fcrepo.kernel.api.RdfLexicon.JCR_NAMESPACE; 040import static org.fcrepo.kernel.api.RdfLexicon.JCR_NT_NAMESPACE; 041import static org.fcrepo.kernel.api.RdfLexicon.MIX_NAMESPACE; 042import static org.fcrepo.kernel.api.RdfLexicon.MODE_NAMESPACE; 043import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_DELETION; 044import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_RELOCATION; 045import static org.fcrepo.kernel.modeshape.FedoraJcrConstants.ROOT; 046import static org.fcrepo.kernel.modeshape.utils.NamespaceTools.getNamespaceRegistry; 047import static org.fcrepo.kernel.modeshape.utils.UncheckedFunction.uncheck; 048import static org.fcrepo.kernel.modeshape.utils.StreamUtils.iteratorToStream; 049import static org.slf4j.LoggerFactory.getLogger; 050 051import org.fcrepo.metrics.RegistryService; 052 053import java.util.Iterator; 054import java.util.Map; 055import java.util.Set; 056import java.util.function.Function; 057import java.util.stream.Stream; 058 059import javax.annotation.PostConstruct; 060import javax.annotation.PreDestroy; 061import javax.inject.Inject; 062import javax.jcr.NamespaceRegistry; 063import javax.jcr.RepositoryException; 064import javax.jcr.Session; 065import javax.jcr.observation.Event; 066import javax.jcr.observation.EventListener; 067 068import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 069import org.fcrepo.kernel.api.models.FedoraResource; 070import org.fcrepo.kernel.api.observer.FedoraEvent; 071import org.fcrepo.kernel.modeshape.FedoraResourceImpl; 072import org.fcrepo.kernel.modeshape.observer.eventmappings.InternalExternalEventMapper; 073 074import org.modeshape.jcr.api.Repository; 075import org.slf4j.Logger; 076 077import com.codahale.metrics.Counter; 078import com.google.common.collect.ImmutableSet; 079import com.google.common.eventbus.EventBus; 080 081/** 082 * Simple JCR EventListener that filters JCR Events through a Fedora EventFilter, maps the results through a mapper, 083 * and puts the resulting stream onto the internal Fedora EventBus as a stream of FedoraEvents. 084 * 085 * @author eddies 086 * @author ajs6f 087 * @since Feb 7, 2013 088 */ 089public class SimpleObserver implements EventListener { 090 091 private static final Logger LOGGER = getLogger(SimpleObserver.class); 092 093 private static final Set<String> filteredNamespaces = ImmutableSet.of( 094 JCR_NAMESPACE, MIX_NAMESPACE, JCR_NT_NAMESPACE, MODE_NAMESPACE); 095 096 /** 097 * A simple counter of events that pass through this observer 098 */ 099 static final Counter EVENT_COUNTER = 100 RegistryService.getInstance().getMetrics().counter(name(SimpleObserver.class, "onEvent")); 101 102 static final Integer EVENT_TYPES = NODE_ADDED + NODE_REMOVED + NODE_MOVED + PROPERTY_ADDED + PROPERTY_CHANGED 103 + PROPERTY_REMOVED; 104 105 /** 106 * Note: This function maps a FedoraEvent to a Stream of some number of FedoraEvents. This is because a MOVE event 107 * may lead to an arbitrarily large number of additional events for any child resources. In the event of this not 108 * being a MOVE event, the same FedoraEvent is returned, wrapped in a Stream. For a MOVEd resource, the resource in 109 * question will be translated to two FedoraEvents: a MOVED event for the new resource location and a REMOVED event 110 * corresponding to the old location. The same pair of FedoraEvents will also be generated for each child resource. 111 */ 112 private static Function<FedoraEvent, Stream<FedoraEvent>> handleMoveEvents(final Session session) { 113 return evt -> { 114 if (evt.getTypes().contains(RESOURCE_RELOCATION)) { 115 final Map<String, String> movePath = evt.getInfo(); 116 final String dest = movePath.get("destAbsPath"); 117 final String src = movePath.get("srcAbsPath"); 118 try { 119 final FedoraResource resource = new FedoraResourceImpl(session.getNode(evt.getPath())); 120 return concat(of(evt), resource.getChildren(true).map(FedoraResource::getPath) 121 .flatMap(path -> of( 122 new FedoraEventImpl(RESOURCE_RELOCATION, path, evt.getResourceTypes(), evt.getUserID(), 123 evt.getDate(), evt.getInfo()), 124 new FedoraEventImpl(RESOURCE_DELETION, path.replaceFirst(dest, src), evt.getResourceTypes(), 125 evt.getUserID(), evt.getDate(), evt.getInfo())))); 126 } catch (final RepositoryException ex) { 127 throw new RepositoryRuntimeException(ex); 128 } 129 } 130 return of(evt); 131 }; 132 } 133 134 /** 135 * Note: Certain RDF types are generated dynamically. These are added here, based on 136 * certain type hints. 137 */ 138 private static Function<String, Stream<String>> dynamicTypes = type -> { 139 if (type.equals(ROOT)) { 140 return of(FEDORA_REPOSITORY_ROOT, FEDORA_RESOURCE, FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE, 141 LDP_BASIC_CONTAINER); 142 } else if (type.equals(FEDORA_CONTAINER)) { 143 return of(FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE); 144 } else if (type.equals(FEDORA_BINARY)) { 145 return of(FEDORA_BINARY, LDP_NON_RDF_SOURCE); 146 } else { 147 return of(type); 148 } 149 }; 150 151 private static Function<FedoraEvent, FedoraEvent> filterAndDerefResourceTypes(final Session session) { 152 final NamespaceRegistry registry = getNamespaceRegistry(session); 153 return evt -> { 154 final Set<String> resourceTypes = evt.getResourceTypes().stream() 155 .flatMap(dynamicTypes).map(type -> type.split(":")) 156 .filter(pair -> pair.length == 2).map(uncheck(pair -> new String[]{registry.getURI(pair[0]), pair[1]})) 157 .filter(pair -> !filteredNamespaces.contains(pair[0])).map(pair -> pair[0] + pair[1]).collect(toSet()); 158 return new FedoraEventImpl(evt.getTypes(), evt.getPath(), resourceTypes, evt.getUserID(), 159 evt.getDate(), evt.getInfo()); 160 }; 161 } 162 163 @Inject 164 private Repository repository; 165 166 @Inject 167 private EventBus eventBus; 168 169 @Inject 170 private InternalExternalEventMapper eventMapper; 171 172 @Inject 173 private EventFilter eventFilter; 174 175 // THIS SESSION SHOULD NOT BE USED TO LOOK UP NODES 176 // it is used only to register and deregister this observer to the JCR 177 private Session session; 178 179 /** 180 * Register this observer with the JCR event listeners 181 * 182 * @throws RepositoryException if repository exception occurred 183 */ 184 @PostConstruct 185 public void buildListener() throws RepositoryException { 186 LOGGER.debug("Constructing an observer for JCR events..."); 187 session = repository.login(); 188 session.getWorkspace().getObservationManager() 189 .addEventListener(this, EVENT_TYPES, "/", true, null, null, false); 190 session.save(); 191 } 192 193 /** 194 * logout of the session 195 * 196 * @throws RepositoryException if repository exception occurred 197 */ 198 @PreDestroy 199 public void stopListening() throws RepositoryException { 200 try { 201 LOGGER.debug("Destroying an observer for JCR events..."); 202 session.getWorkspace().getObservationManager().removeEventListener(this); 203 } finally { 204 session.logout(); 205 } 206 } 207 208 /** 209 * Filter JCR events and transform them into our own FedoraEvents. 210 * 211 * @param events the JCR events 212 */ 213 @Override 214 public void onEvent(final javax.jcr.observation.EventIterator events) { 215 Session lookupSession = null; 216 try { 217 lookupSession = repository.login(); 218 219 @SuppressWarnings("unchecked") 220 final Iterator<Event> filteredEvents = filter(events, eventFilter::test); 221 eventMapper.apply(iteratorToStream(filteredEvents)) 222 .map(filterAndDerefResourceTypes(lookupSession)) 223 .flatMap(handleMoveEvents(lookupSession)) 224 .forEach(this::post); 225 } catch (final RepositoryException ex) { 226 throw new RepositoryRuntimeException(ex); 227 } finally { 228 if (lookupSession != null) { 229 lookupSession.logout(); 230 } 231 } 232 } 233 234 private void post(final FedoraEvent evt) { 235 eventBus.post(evt); 236 EVENT_COUNTER.inc(); 237 } 238}