001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.kernel.modeshape.observer; 019 020import static com.codahale.metrics.MetricRegistry.name; 021import static com.google.common.collect.Iterators.filter; 022import static java.util.stream.Collectors.toSet; 023import static java.util.stream.Stream.concat; 024import static java.util.stream.Stream.of; 025import static javax.jcr.observation.Event.NODE_ADDED; 026import static javax.jcr.observation.Event.NODE_MOVED; 027import static javax.jcr.observation.Event.NODE_REMOVED; 028import static javax.jcr.observation.Event.PROPERTY_ADDED; 029import static javax.jcr.observation.Event.PROPERTY_CHANGED; 030import static javax.jcr.observation.Event.PROPERTY_REMOVED; 031import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_BINARY; 032import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_CONTAINER; 033import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_REPOSITORY_ROOT; 034import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_RESOURCE; 035import static org.fcrepo.kernel.api.FedoraTypes.LDP_BASIC_CONTAINER; 036import static org.fcrepo.kernel.api.FedoraTypes.LDP_CONTAINER; 037import static org.fcrepo.kernel.api.FedoraTypes.LDP_NON_RDF_SOURCE; 038import static org.fcrepo.kernel.api.FedoraTypes.LDP_RDF_SOURCE; 039import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_DELETION; 040import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_RELOCATION; 041import static org.fcrepo.kernel.modeshape.FedoraSessionImpl.getJcrSession; 042import static org.fcrepo.kernel.modeshape.FedoraJcrConstants.ROOT; 043import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.JCR_NAMESPACE; 044import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.JCR_NT_NAMESPACE; 045import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.MIX_NAMESPACE; 046import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.MODE_NAMESPACE; 047import static org.fcrepo.kernel.modeshape.utils.NamespaceTools.getNamespaceRegistry; 048import static org.fcrepo.kernel.modeshape.utils.UncheckedFunction.uncheck; 049import static org.fcrepo.kernel.modeshape.utils.StreamUtils.iteratorToStream; 050import static org.slf4j.LoggerFactory.getLogger; 051 052import org.fcrepo.metrics.RegistryService; 053 054import java.util.Iterator; 055import java.util.Map; 056import java.util.Set; 057import java.util.function.Function; 058import java.util.stream.Stream; 059 060import javax.annotation.PostConstruct; 061import javax.annotation.PreDestroy; 062import javax.inject.Inject; 063import javax.jcr.NamespaceRegistry; 064import javax.jcr.RepositoryException; 065import javax.jcr.Session; 066import javax.jcr.observation.Event; 067import javax.jcr.observation.EventListener; 068 069import org.fcrepo.kernel.api.FedoraRepository; 070import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 071import org.fcrepo.kernel.api.models.FedoraResource; 072import org.fcrepo.kernel.api.observer.FedoraEvent; 073import org.fcrepo.kernel.modeshape.FedoraResourceImpl; 074import org.fcrepo.kernel.modeshape.observer.eventmappings.InternalExternalEventMapper; 075 076import org.slf4j.Logger; 077 078import com.codahale.metrics.Counter; 079import com.google.common.collect.ImmutableSet; 080import com.google.common.eventbus.EventBus; 081 082/** 083 * Simple JCR EventListener that filters JCR Events through a Fedora EventFilter, maps the results through a mapper, 084 * and puts the resulting stream onto the internal Fedora EventBus as a stream of FedoraEvents. 085 * 086 * @author eddies 087 * @author ajs6f 088 * @since Feb 7, 2013 089 */ 090public class SimpleObserver implements EventListener { 091 092 private static final Logger LOGGER = getLogger(SimpleObserver.class); 093 094 private static final Set<String> filteredNamespaces = ImmutableSet.of( 095 JCR_NAMESPACE, MIX_NAMESPACE, JCR_NT_NAMESPACE, MODE_NAMESPACE); 096 097 /** 098 * A simple counter of events that pass through this observer 099 */ 100 static final Counter EVENT_COUNTER = 101 RegistryService.getInstance().getMetrics().counter(name(SimpleObserver.class, "onEvent")); 102 103 static final Integer EVENT_TYPES = NODE_ADDED + NODE_REMOVED + NODE_MOVED + PROPERTY_ADDED + PROPERTY_CHANGED 104 + PROPERTY_REMOVED; 105 106 /** 107 * Note: This function maps a FedoraEvent to a Stream of some number of FedoraEvents. This is because a MOVE event 108 * may lead to an arbitrarily large number of additional events for any child resources. In the event of this not 109 * being a MOVE event, the same FedoraEvent is returned, wrapped in a Stream. For a MOVEd resource, the resource in 110 * question will be translated to two FedoraEvents: a MOVED event for the new resource location and a REMOVED event 111 * corresponding to the old location. The same pair of FedoraEvents will also be generated for each child resource. 112 */ 113 private static Function<FedoraEvent, Stream<FedoraEvent>> handleMoveEvents(final Session session) { 114 return evt -> { 115 if (evt.getTypes().contains(RESOURCE_RELOCATION)) { 116 final Map<String, String> movePath = evt.getInfo(); 117 final String dest = movePath.get("destAbsPath"); 118 final String src = movePath.get("srcAbsPath"); 119 try { 120 final FedoraResource resource = new FedoraResourceImpl(session.getNode(evt.getPath())); 121 return concat(of(evt), resource.getChildren(true).map(FedoraResource::getPath) 122 .flatMap(path -> of( 123 new FedoraEventImpl(RESOURCE_RELOCATION, path, evt.getResourceTypes(), evt.getUserID(), 124 evt.getDate(), evt.getInfo()), 125 new FedoraEventImpl(RESOURCE_DELETION, path.replaceFirst(dest, src), evt.getResourceTypes(), 126 evt.getUserID(), evt.getDate(), evt.getInfo())))); 127 } catch (final RepositoryException ex) { 128 throw new RepositoryRuntimeException(ex); 129 } 130 } 131 return of(evt); 132 }; 133 } 134 135 /** 136 * Note: Certain RDF types are generated dynamically. These are added here, based on 137 * certain type hints. 138 */ 139 private static Function<String, Stream<String>> dynamicTypes = type -> { 140 if (type.equals(ROOT)) { 141 return of(FEDORA_REPOSITORY_ROOT, FEDORA_RESOURCE, FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE, 142 LDP_BASIC_CONTAINER); 143 } else if (type.equals(FEDORA_CONTAINER)) { 144 return of(FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE); 145 } else if (type.equals(FEDORA_BINARY)) { 146 return of(FEDORA_BINARY, LDP_NON_RDF_SOURCE); 147 } else { 148 return of(type); 149 } 150 }; 151 152 private static Function<FedoraEvent, FedoraEvent> filterAndDerefResourceTypes(final Session session) { 153 final NamespaceRegistry registry = getNamespaceRegistry(session); 154 return evt -> { 155 final Set<String> resourceTypes = evt.getResourceTypes().stream() 156 .flatMap(dynamicTypes).map(type -> type.split(":")) 157 .filter(pair -> pair.length == 2).map(uncheck(pair -> new String[]{registry.getURI(pair[0]), pair[1]})) 158 .filter(pair -> !filteredNamespaces.contains(pair[0])).map(pair -> pair[0] + pair[1]).collect(toSet()); 159 return new FedoraEventImpl(evt.getTypes(), evt.getPath(), resourceTypes, evt.getUserID(), 160 evt.getDate(), evt.getInfo()); 161 }; 162 } 163 164 @Inject 165 private FedoraRepository repository; 166 167 @Inject 168 private EventBus eventBus; 169 170 @Inject 171 private InternalExternalEventMapper eventMapper; 172 173 @Inject 174 private EventFilter eventFilter; 175 176 // THIS SESSION SHOULD NOT BE USED TO LOOK UP NODES 177 // it is used only to register and deregister this observer to the JCR 178 private Session session; 179 180 /** 181 * Register this observer with the JCR event listeners 182 * 183 * @throws RepositoryException if repository exception occurred 184 */ 185 @PostConstruct 186 public void buildListener() throws RepositoryException { 187 LOGGER.debug("Constructing an observer for JCR events..."); 188 session = getJcrSession(repository.login()); 189 session.getWorkspace().getObservationManager() 190 .addEventListener(this, EVENT_TYPES, "/", true, null, null, false); 191 session.save(); 192 } 193 194 /** 195 * logout of the session 196 * 197 * @throws RepositoryException if repository exception occurred 198 */ 199 @PreDestroy 200 public void stopListening() throws RepositoryException { 201 try { 202 LOGGER.debug("Destroying an observer for JCR events..."); 203 session.getWorkspace().getObservationManager().removeEventListener(this); 204 } finally { 205 session.logout(); 206 } 207 } 208 209 /** 210 * Filter JCR events and transform them into our own FedoraEvents. 211 * 212 * @param events the JCR events 213 */ 214 @Override 215 public void onEvent(final javax.jcr.observation.EventIterator events) { 216 Session lookupSession = null; 217 try { 218 lookupSession = getJcrSession(repository.login()); 219 220 @SuppressWarnings("unchecked") 221 final Iterator<Event> filteredEvents = filter(events, eventFilter::test); 222 eventMapper.apply(iteratorToStream(filteredEvents)) 223 .map(filterAndDerefResourceTypes(lookupSession)) 224 .flatMap(handleMoveEvents(lookupSession)) 225 .forEach(this::post); 226 } finally { 227 if (lookupSession != null) { 228 lookupSession.logout(); 229 } 230 } 231 } 232 233 private void post(final FedoraEvent evt) { 234 eventBus.post(evt); 235 EVENT_COUNTER.inc(); 236 } 237}