001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.kernel.modeshape.utils.iterators; 017 018import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel; 019import static com.hp.hpl.jena.vocabulary.RDF.type; 020import static java.lang.String.join; 021import static org.fcrepo.kernel.modeshape.rdf.ManagedRdf.isManagedMixin; 022import static org.slf4j.LoggerFactory.getLogger; 023 024import org.fcrepo.kernel.api.models.FedoraResource; 025import org.fcrepo.kernel.api.exception.ConstraintViolationException; 026import org.fcrepo.kernel.api.exception.IncorrectTripleSubjectException; 027import org.fcrepo.kernel.api.exception.MalformedRdfException; 028import org.fcrepo.kernel.api.exception.ServerManagedTypeException; 029import org.fcrepo.kernel.api.exception.OutOfDomainSubjectException; 030import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 031import org.fcrepo.kernel.api.identifiers.IdentifierConverter; 032 033import javax.jcr.RepositoryException; 034import javax.jcr.Session; 035 036import org.fcrepo.kernel.api.utils.iterators.RdfStream; 037import org.fcrepo.kernel.api.utils.iterators.RdfStreamConsumer; 038import org.fcrepo.kernel.modeshape.rdf.JcrRdfTools; 039import org.slf4j.Logger; 040 041import com.google.common.util.concurrent.ListenableFuture; 042import com.google.common.util.concurrent.SettableFuture; 043import com.hp.hpl.jena.graph.Node; 044import com.hp.hpl.jena.graph.Triple; 045import com.hp.hpl.jena.rdf.model.Model; 046import com.hp.hpl.jena.rdf.model.Resource; 047import com.hp.hpl.jena.rdf.model.Statement; 048 049import java.util.ArrayList; 050import java.util.List; 051import java.util.function.Predicate; 052 053/** 054 * @author ajs6f 055 * @since Oct 24, 2013 056 */ 057public abstract class PersistingRdfStreamConsumer implements RdfStreamConsumer { 058 059 private final RdfStream stream; 060 061 private final IdentifierConverter<Resource, FedoraResource> idTranslator; 062 063 // if it's not about a Fedora resource, we don't care. 064 protected final Predicate<Triple> isFedoraSubjectTriple; 065 066 private final JcrRdfTools jcrRdfTools; 067 068 private static final Model m = createDefaultModel(); 069 070 private static final Logger LOGGER = getLogger(PersistingRdfStreamConsumer.class); 071 072 private final List<String> exceptions; 073 074 /** 075 * Ordinary constructor. 076 * 077 * @param idTranslator the id translator 078 * @param session the session 079 * @param stream the rdf stream 080 */ 081 public PersistingRdfStreamConsumer(final IdentifierConverter<Resource, FedoraResource> idTranslator, 082 final Session session, final RdfStream stream) { 083 this.idTranslator = idTranslator; 084 this.jcrRdfTools = new JcrRdfTools(idTranslator, session); 085 this.isFedoraSubjectTriple = t -> { 086 087 final Node subject = t.getSubject(); 088 final Node topic = stream().topic(); 089 090 // blank nodes are okay 091 if (!t.getSubject().isBlank()) { 092 final String subjectURI = subject.getURI(); 093 final int hashIndex = subjectURI.lastIndexOf("#"); 094 // a hash URI with the same base as the topic is okay, as is the topic itself 095 if ((hashIndex > 0 && topic.getURI().equals(subjectURI.substring(0, hashIndex))) 096 || topic.equals(subject)) { 097 LOGGER.debug("Discovered a Fedora-relevant subject in triple: {}.", t); 098 return true; 099 } 100 // the subject was inappropriate in one of two ways 101 if (translator().inDomain(m.asRDFNode(subject).asResource())) { 102 // it was in-domain, but not within this resource 103 LOGGER.error("{} is not in the topic of this RDF, which is {}.", subject, topic); 104 throw new IncorrectTripleSubjectException(subject + 105 " is not in the topic of this RDF, which is " + topic); 106 } 107 // it wasn't even in in-domain! 108 LOGGER.error("subject {} is not in repository domain.", subject); 109 throw new OutOfDomainSubjectException(subject); 110 } 111 return true; 112 }; 113 // we fail on non-Fedora RDF 114 this.stream = stream.withThisContext(stream.filter(isFedoraSubjectTriple::test)); 115 116 this.exceptions = new ArrayList<>(); 117 } 118 119 @Override 120 public void consume() throws MalformedRdfException { 121 while (stream.hasNext()) { 122 final Statement t = m.asStatement(stream.next()); 123 LOGGER.debug("Operating triple {}.", t); 124 125 try { 126 operateOnTriple(t); 127 } catch (final ConstraintViolationException e) { 128 throw e; 129 } catch (final MalformedRdfException e) { 130 exceptions.add(e.getMessage()); 131 } 132 } 133 134 if (!exceptions.isEmpty()) { 135 throw new MalformedRdfException(join("\n", exceptions)); 136 } 137 } 138 139 protected void operateOnTriple(final Statement input) throws MalformedRdfException { 140 try { 141 142 final Statement t = jcrRdfTools.skolemize(idTranslator, input); 143 144 final Resource subject = t.getSubject(); 145 final FedoraResource subjectNode = translator().convert(subject); 146 147 // if this is a user-managed RDF type assertion, update the node's 148 // mixins. If it isn't, treat it as a "data" property. 149 if (t.getPredicate().equals(type) && t.getObject().isResource()) { 150 final Resource mixinResource = t.getObject().asResource(); 151 if (!isManagedMixin.apply(mixinResource)) { 152 LOGGER.debug("Operating on node: {} with mixin: {}.", 153 subjectNode, mixinResource); 154 operateOnMixin(mixinResource, subjectNode); 155 } else { 156 LOGGER.debug("Found repository-managed mixin on which we will not operate."); 157 throw new ServerManagedTypeException(String.format( 158 "The repository type (%s) of this resource is system managed.", mixinResource)); 159 } 160 } else { 161 LOGGER.debug("Operating on node: {} from triple: {}.", subjectNode, 162 t); 163 operateOnProperty(t, subjectNode); 164 } 165 } catch (final ConstraintViolationException e) { 166 throw e; 167 } catch (final RepositoryException | RepositoryRuntimeException e) { 168 throw new MalformedRdfException(e.getMessage(), e); 169 } 170 } 171 172 protected abstract void operateOnProperty(final Statement t, 173 final FedoraResource subjectNode) throws RepositoryException; 174 175 protected abstract void operateOnMixin(final Resource mixinResource, 176 final FedoraResource subjectNode) throws RepositoryException; 177 178 @Override 179 public ListenableFuture<Boolean> consumeAsync() { 180 // TODO make this actually asynch 181 final SettableFuture<Boolean> result = SettableFuture.create(); 182 try { 183 consume(); 184 result.set(true); 185 } catch (final MalformedRdfException e) { 186 LOGGER.warn("Got exception consuming RDF stream", e); 187 result.setException(e); 188 result.set(false); 189 } 190 return result; 191 } 192 193 194 /** 195 * @return the stream 196 */ 197 public RdfStream stream() { 198 return stream; 199 } 200 201 /** 202 * @return the idTranslator 203 */ 204 public IdentifierConverter<Resource, FedoraResource> translator() { 205 return idTranslator; 206 } 207 208 /** 209 * @return the jcrRdfTools 210 */ 211 public JcrRdfTools jcrRdfTools() { 212 return jcrRdfTools; 213 } 214 215}