001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.kernel.modeshape.observer; 019 020import static com.google.common.collect.Iterators.filter; 021import static java.util.stream.Collectors.toSet; 022import static java.util.stream.Stream.concat; 023import static java.util.stream.Stream.of; 024import static javax.jcr.observation.Event.NODE_ADDED; 025import static javax.jcr.observation.Event.NODE_MOVED; 026import static javax.jcr.observation.Event.NODE_REMOVED; 027import static javax.jcr.observation.Event.PROPERTY_ADDED; 028import static javax.jcr.observation.Event.PROPERTY_CHANGED; 029import static javax.jcr.observation.Event.PROPERTY_REMOVED; 030import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_BINARY; 031import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_CONTAINER; 032import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_NON_RDF_SOURCE_DESCRIPTION; 033import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_REPOSITORY_ROOT; 034import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_RESOURCE; 035import static org.fcrepo.kernel.api.FedoraTypes.LDP_BASIC_CONTAINER; 036import static org.fcrepo.kernel.api.FedoraTypes.LDP_CONTAINER; 037import static org.fcrepo.kernel.api.FedoraTypes.LDP_NON_RDF_SOURCE; 038import static org.fcrepo.kernel.api.FedoraTypes.LDP_RDF_SOURCE; 039import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_DELETION; 040import static org.fcrepo.kernel.api.observer.EventType.RESOURCE_RELOCATION; 041import static org.fcrepo.kernel.modeshape.FedoraSessionImpl.getJcrSession; 042import static org.fcrepo.kernel.modeshape.FedoraJcrConstants.ROOT; 043import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.JCR_NAMESPACE; 044import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.JCR_NT_NAMESPACE; 045import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.MIX_NAMESPACE; 046import static org.fcrepo.kernel.modeshape.RdfJcrLexicon.MODE_NAMESPACE; 047import static org.fcrepo.kernel.modeshape.utils.NamespaceTools.getNamespaceRegistry; 048import static org.fcrepo.kernel.modeshape.utils.UncheckedFunction.uncheck; 049import static org.fcrepo.kernel.modeshape.utils.StreamUtils.iteratorToStream; 050import static org.slf4j.LoggerFactory.getLogger; 051 052import java.util.Iterator; 053import java.util.Map; 054import java.util.Set; 055import java.util.function.Function; 056import java.util.stream.Stream; 057 058import javax.annotation.PostConstruct; 059import javax.annotation.PreDestroy; 060import javax.inject.Inject; 061import javax.jcr.NamespaceRegistry; 062import javax.jcr.RepositoryException; 063import javax.jcr.Session; 064import javax.jcr.observation.Event; 065import javax.jcr.observation.EventListener; 066 067import org.fcrepo.kernel.api.FedoraRepository; 068import org.fcrepo.kernel.api.FedoraSession; 069import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 070import org.fcrepo.kernel.api.models.FedoraResource; 071import org.fcrepo.kernel.api.observer.FedoraEvent; 072import org.fcrepo.kernel.modeshape.FedoraResourceImpl; 073import org.fcrepo.kernel.modeshape.FedoraSessionImpl; 074import org.fcrepo.kernel.modeshape.observer.eventmappings.InternalExternalEventMapper; 075import org.fcrepo.kernel.modeshape.utils.FedoraSessionUserUtil; 076 077import org.slf4j.Logger; 078 079import com.google.common.collect.ImmutableSet; 080import com.google.common.eventbus.EventBus; 081 082/** 083 * Simple JCR EventListener that filters JCR Events through a Fedora EventFilter, maps the results through a mapper, 084 * and puts the resulting stream onto the internal Fedora EventBus as a stream of FedoraEvents. 085 * 086 * @author eddies 087 * @author ajs6f 088 * @since Feb 7, 2013 089 */ 090public class SimpleObserver implements EventListener { 091 092 private static final Logger LOGGER = getLogger(SimpleObserver.class); 093 094 private static final Set<String> filteredNamespaces = ImmutableSet.of( 095 JCR_NAMESPACE, MIX_NAMESPACE, JCR_NT_NAMESPACE, MODE_NAMESPACE); 096 097 static final Integer EVENT_TYPES = NODE_ADDED + NODE_REMOVED + NODE_MOVED + PROPERTY_ADDED + PROPERTY_CHANGED 098 + PROPERTY_REMOVED; 099 100 /** 101 * Note: This function maps a FedoraEvent to a Stream of some number of FedoraEvents. This is because a MOVE event 102 * may lead to an arbitrarily large number of additional events for any child resources. In the event of this not 103 * being a MOVE event, the same FedoraEvent is returned, wrapped in a Stream. For a MOVEd resource, the resource in 104 * question will be translated to two FedoraEvents: a MOVED event for the new resource location and a REMOVED event 105 * corresponding to the old location. The same pair of FedoraEvents will also be generated for each child resource. 106 */ 107 private static Function<FedoraEvent, Stream<FedoraEvent>> handleMoveEvents(final Session session) { 108 return evt -> { 109 if (evt.getTypes().contains(RESOURCE_RELOCATION)) { 110 final Map<String, String> movePath = evt.getInfo(); 111 final String dest = movePath.get("destAbsPath"); 112 final String src = movePath.get("srcAbsPath"); 113 final FedoraSession fsession = new FedoraSessionImpl(session); 114 115 try { 116 final FedoraResource resource = new FedoraResourceImpl(session.getNode(evt.getPath())); 117 return concat(of(evt), resource.getChildren(true).map(FedoraResource::getPath) 118 .flatMap(path -> of( 119 new FedoraEventImpl(RESOURCE_RELOCATION, path, evt.getResourceTypes(), evt.getUserID(), 120 fsession.getUserURI(), evt.getDate(), evt.getInfo()), 121 new FedoraEventImpl(RESOURCE_DELETION, path.replaceFirst(dest, src), evt.getResourceTypes(), 122 evt.getUserID(), fsession.getUserURI(), evt.getDate(), evt.getInfo())))); 123 } catch (final RepositoryException ex) { 124 throw new RepositoryRuntimeException(ex); 125 } 126 } 127 return of(evt); 128 }; 129 } 130 131 /** 132 * Note: Certain RDF types are generated dynamically. These are added here, based on 133 * certain type hints. 134 */ 135 private static final Function<String, Stream<String>> dynamicTypes = type -> { 136 if (type.equals(ROOT)) { 137 return of(FEDORA_REPOSITORY_ROOT, FEDORA_RESOURCE, FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE, 138 LDP_BASIC_CONTAINER); 139 } else if (type.equals(FEDORA_CONTAINER)) { 140 return of(FEDORA_CONTAINER, LDP_CONTAINER, LDP_RDF_SOURCE); 141 } else if (type.equals(FEDORA_BINARY)) { 142 return of(FEDORA_BINARY, LDP_NON_RDF_SOURCE); 143 } else if (type.equals(FEDORA_NON_RDF_SOURCE_DESCRIPTION)) { 144 return of(FEDORA_BINARY, LDP_NON_RDF_SOURCE); 145 } else { 146 return of(type); 147 } 148 }; 149 150 private static Function<FedoraEvent, FedoraEvent> filterAndDerefResourceTypes(final Session session) { 151 final NamespaceRegistry registry = getNamespaceRegistry(session); 152 return evt -> { 153 final Set<String> resourceTypes = evt.getResourceTypes().stream() 154 .flatMap(dynamicTypes).map(type -> type.split(":")) 155 .filter(pair -> pair.length == 2).map(uncheck(pair -> new String[]{registry.getURI(pair[0]), pair[1]})) 156 .filter(pair -> !filteredNamespaces.contains(pair[0])).map(pair -> pair[0] + pair[1]).collect(toSet()); 157 return new FedoraEventImpl(evt.getTypes(), evt.getPath(), resourceTypes, evt.getUserID(), 158 FedoraSessionUserUtil.getUserURI(evt.getUserID()), evt.getDate(), evt.getInfo()); 159 }; 160 } 161 162 @Inject 163 private FedoraRepository repository; 164 165 @Inject 166 private EventBus eventBus; 167 168 @Inject 169 private InternalExternalEventMapper eventMapper; 170 171 @Inject 172 private EventFilter eventFilter; 173 174 // THIS SESSION SHOULD NOT BE USED TO LOOK UP NODES 175 // it is used only to register and deregister this observer to the JCR 176 private Session session; 177 178 /** 179 * Register this observer with the JCR event listeners 180 * 181 * @throws RepositoryException if repository exception occurred 182 */ 183 @PostConstruct 184 public void buildListener() throws RepositoryException { 185 LOGGER.debug("Constructing an observer for JCR events..."); 186 session = getJcrSession(repository.login()); 187 session.getWorkspace().getObservationManager() 188 .addEventListener(this, EVENT_TYPES, "/", true, null, null, false); 189 session.save(); 190 } 191 192 /** 193 * logout of the session 194 * 195 * @throws RepositoryException if repository exception occurred 196 */ 197 @PreDestroy 198 public void stopListening() throws RepositoryException { 199 try { 200 LOGGER.debug("Destroying an observer for JCR events..."); 201 session.getWorkspace().getObservationManager().removeEventListener(this); 202 } finally { 203 session.logout(); 204 } 205 } 206 207 /** 208 * Filter JCR events and transform them into our own FedoraEvents. 209 * 210 * @param events the JCR events 211 */ 212 @Override 213 public void onEvent(final javax.jcr.observation.EventIterator events) { 214 Session lookupSession = null; 215 try { 216 lookupSession = getJcrSession(repository.login()); 217 218 @SuppressWarnings("unchecked") 219 final Iterator<Event> filteredEvents = filter(events, eventFilter::test); 220 eventMapper.apply(iteratorToStream(filteredEvents)) 221 .map(filterAndDerefResourceTypes(lookupSession)) 222 .flatMap(handleMoveEvents(lookupSession)) 223 .forEach(this::post); 224 } finally { 225 if (lookupSession != null) { 226 lookupSession.logout(); 227 } 228 } 229 } 230 231 private void post(final FedoraEvent evt) { 232 eventBus.post(evt); 233 } 234}