001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.kernel.impl.utils.iterators; 017 018import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel; 019import static com.hp.hpl.jena.vocabulary.RDF.type; 020import static org.fcrepo.kernel.impl.rdf.ManagedRdf.isManagedMixin; 021import static org.slf4j.LoggerFactory.getLogger; 022 023import com.google.common.base.Joiner; 024import org.fcrepo.kernel.models.FedoraResource; 025import org.fcrepo.kernel.exception.MalformedRdfException; 026import org.fcrepo.kernel.exception.RepositoryRuntimeException; 027import org.fcrepo.kernel.identifiers.IdentifierConverter; 028 029import javax.jcr.RepositoryException; 030import javax.jcr.Session; 031 032import org.fcrepo.kernel.impl.rdf.JcrRdfTools; 033import org.fcrepo.kernel.utils.iterators.RdfStream; 034import org.fcrepo.kernel.utils.iterators.RdfStreamConsumer; 035import org.slf4j.Logger; 036 037import com.google.common.base.Predicate; 038import com.google.common.util.concurrent.ListenableFuture; 039import com.google.common.util.concurrent.SettableFuture; 040import com.hp.hpl.jena.graph.Triple; 041import com.hp.hpl.jena.rdf.model.Model; 042import com.hp.hpl.jena.rdf.model.Resource; 043import com.hp.hpl.jena.rdf.model.Statement; 044 045import java.util.ArrayList; 046import java.util.List; 047 048/** 049 * @author ajs6f 050 * @since Oct 24, 2013 051 */ 052public abstract class PersistingRdfStreamConsumer implements RdfStreamConsumer { 053 054 private final RdfStream stream; 055 056 private final IdentifierConverter<Resource, FedoraResource> idTranslator; 057 058 // if it's not about a Fedora resource, we don't care. 059 protected final Predicate<Triple> isFedoraSubjectTriple; 060 061 private final JcrRdfTools jcrRdfTools; 062 063 private static final Model m = createDefaultModel(); 064 065 private static final Logger LOGGER = getLogger(PersistingRdfStreamConsumer.class); 066 067 private final List<String> exceptions; 068 069 /** 070 * Ordinary constructor. 071 * 072 * @param idTranslator the id translator 073 * @param session the session 074 * @param stream the rdf stream 075 */ 076 public PersistingRdfStreamConsumer(final IdentifierConverter<Resource, FedoraResource> idTranslator, 077 final Session session, final RdfStream stream) { 078 this.idTranslator = idTranslator; 079 this.jcrRdfTools = new JcrRdfTools(idTranslator, session); 080 this.isFedoraSubjectTriple = new Predicate<Triple>() { 081 082 @Override 083 public boolean apply(final Triple t) { 084 085 final boolean result = idTranslator.inDomain(m.asStatement(t).getSubject()) 086 || t.getSubject().isBlank(); 087 if (result) { 088 LOGGER.debug( 089 "Discovered a Fedora-relevant subject in triple: {}.", 090 t); 091 } else { 092 LOGGER.debug("Ignoring triple: {}.", t); 093 } 094 return result; 095 } 096 097 }; 098 // we knock out non-Fedora RDF 099 this.stream = 100 stream.withThisContext(stream.filter(isFedoraSubjectTriple)); 101 102 this.exceptions = new ArrayList<>(); 103 } 104 105 @Override 106 public void consume() throws MalformedRdfException { 107 while (stream.hasNext()) { 108 final Statement t = m.asStatement(stream.next()); 109 LOGGER.debug("Operating triple {}.", t); 110 111 try { 112 operateOnTriple(t); 113 } catch (final MalformedRdfException e) { 114 exceptions.add(e.getMessage()); 115 } 116 } 117 118 if (!exceptions.isEmpty()) { 119 throw new MalformedRdfException(Joiner.on("\n").join(exceptions)); 120 } 121 } 122 123 protected void operateOnTriple(final Statement input) throws MalformedRdfException { 124 try { 125 126 final Statement t = jcrRdfTools.skolemize(idTranslator, input); 127 128 final Resource subject = t.getSubject(); 129 final FedoraResource subjectNode = translator().convert(subject); 130 131 // if this is a user-managed RDF type assertion, update the node's 132 // mixins. If it isn't, treat it as a "data" property. 133 if (t.getPredicate().equals(type) && t.getObject().isResource()) { 134 final Resource mixinResource = t.getObject().asResource(); 135 if (!isManagedMixin.apply(mixinResource)) { 136 LOGGER.debug("Operating on node: {} with mixin: {}.", 137 subjectNode, mixinResource); 138 operateOnMixin(mixinResource, subjectNode); 139 } else { 140 LOGGER.debug("Found repository-managed mixin on which we will not operate."); 141 } 142 } else { 143 LOGGER.debug("Operating on node: {} from triple: {}.", subjectNode, 144 t); 145 operateOnProperty(t, subjectNode); 146 } 147 } catch (final RepositoryException | RepositoryRuntimeException e) { 148 throw new MalformedRdfException(e.getMessage(), e); 149 } 150 } 151 152 protected abstract void operateOnProperty(final Statement t, 153 final FedoraResource subjectNode) throws RepositoryException; 154 155 protected abstract void operateOnMixin(final Resource mixinResource, 156 final FedoraResource subjectNode) throws RepositoryException; 157 158 @Override 159 public ListenableFuture<Boolean> consumeAsync() { 160 // TODO make this actually asynch 161 final SettableFuture<Boolean> result = SettableFuture.create(); 162 try { 163 consume(); 164 result.set(true); 165 } catch (final MalformedRdfException e) { 166 LOGGER.warn("Got exception consuming RDF stream", e); 167 result.setException(e); 168 result.set(false); 169 } 170 return result; 171 } 172 173 174 /** 175 * @return the stream 176 */ 177 public RdfStream stream() { 178 return stream; 179 } 180 181 182 /** 183 * @return the idTranslator 184 */ 185 public IdentifierConverter<Resource,FedoraResource> translator() { 186 return idTranslator; 187 } 188 189 190 /** 191 * @return the jcrRdfTools 192 */ 193 public JcrRdfTools jcrRdfTools() { 194 return jcrRdfTools; 195 } 196 197}