001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.kernel.modeshape.utils.iterators; 017 018import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel; 019import static com.hp.hpl.jena.vocabulary.RDF.type; 020import static java.lang.String.join; 021import static org.fcrepo.kernel.modeshape.rdf.ManagedRdf.isManagedMixin; 022import static org.slf4j.LoggerFactory.getLogger; 023 024import org.fcrepo.kernel.api.models.FedoraResource; 025import org.fcrepo.kernel.api.exception.ConstraintViolationException; 026import org.fcrepo.kernel.api.exception.IncorrectTripleSubjectException; 027import org.fcrepo.kernel.api.exception.MalformedRdfException; 028import org.fcrepo.kernel.api.exception.ServerManagedTypeException; 029import org.fcrepo.kernel.api.exception.OutOfDomainSubjectException; 030import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 031import org.fcrepo.kernel.api.identifiers.IdentifierConverter; 032 033import javax.jcr.RepositoryException; 034import javax.jcr.Session; 035 036import org.fcrepo.kernel.api.utils.iterators.RdfStream; 037import org.fcrepo.kernel.api.utils.iterators.RdfStreamConsumer; 038import org.fcrepo.kernel.modeshape.rdf.JcrRdfTools; 039import org.slf4j.Logger; 040 041import com.google.common.util.concurrent.ListenableFuture; 042import com.google.common.util.concurrent.SettableFuture; 043import com.hp.hpl.jena.graph.Node; 044import com.hp.hpl.jena.graph.Triple; 045import com.hp.hpl.jena.rdf.model.Model; 046import com.hp.hpl.jena.rdf.model.Resource; 047import com.hp.hpl.jena.rdf.model.Statement; 048 049import java.util.ArrayList; 050import java.util.List; 051import java.util.function.Predicate; 052 053/** 054 * @author ajs6f 055 * @since Oct 24, 2013 056 */ 057public abstract class PersistingRdfStreamConsumer implements RdfStreamConsumer { 058 059 private final RdfStream stream; 060 061 private final IdentifierConverter<Resource, FedoraResource> idTranslator; 062 063 // if it's not about a Fedora resource, we don't care. 064 protected final Predicate<Triple> isFedoraSubjectTriple; 065 066 private final JcrRdfTools jcrRdfTools; 067 068 private static final Model m = createDefaultModel(); 069 070 private static final Logger LOGGER = getLogger(PersistingRdfStreamConsumer.class); 071 072 private final List<String> exceptions; 073 074 /** 075 * Ordinary constructor. 076 * 077 * @param idTranslator the id translator 078 * @param session the session 079 * @param stream the rdf stream 080 */ 081 public PersistingRdfStreamConsumer(final IdentifierConverter<Resource, FedoraResource> idTranslator, 082 final Session session, final RdfStream stream) { 083 this.idTranslator = idTranslator; 084 this.jcrRdfTools = new JcrRdfTools(idTranslator, session); 085 this.isFedoraSubjectTriple = t -> { 086 087 final Node subject = t.getSubject(); 088 final Node topic = stream().topic(); 089 090 // blank nodes are okay 091 if (!t.getSubject().isBlank()) { 092 final String subjectURI = subject.getURI(); 093 final int hashIndex = subjectURI.lastIndexOf("#"); 094 // a hash URI with the same base as the topic is okay, as is the topic itself 095 if ((hashIndex > 0 && topic.getURI().equals(subjectURI.substring(0, hashIndex))) 096 || topic.equals(subject)) { 097 LOGGER.debug("Discovered a Fedora-relevant subject in triple: {}.", t); 098 return true; 099 } 100 // the subject was inappropriate in one of two ways 101 if (translator().inDomain(m.asRDFNode(subject).asResource())) { 102 // it was in-domain, but not within this resource 103 LOGGER.error("{} is not in the topic of this RDF, which is {}.", subject, topic); 104 throw new IncorrectTripleSubjectException(subject + 105 " is not in the topic of this RDF, which is " + topic); 106 } 107 // it wasn't even in in-domain! 108 LOGGER.error("subject {} is not in repository domain.", subject); 109 throw new OutOfDomainSubjectException(subject); 110 } 111 return true; 112 }; 113 // we fail on non-Fedora RDF 114 this.stream = stream.withThisContext(stream.filter(isFedoraSubjectTriple::test)); 115 116 this.exceptions = new ArrayList<>(); 117 } 118 119 @Override 120 public void consume() throws MalformedRdfException { 121 while (stream.hasNext()) { 122 final Statement t = m.asStatement(stream.next()); 123 LOGGER.debug("Operating triple {}.", t); 124 125 try { 126 operateOnTriple(t); 127 } catch (final ConstraintViolationException e) { 128 throw e; 129 } catch (final MalformedRdfException e) { 130 exceptions.add(e.getMessage()); 131 } 132 } 133 134 if (!exceptions.isEmpty()) { 135 throw new MalformedRdfException(join("\n", exceptions)); 136 } 137 } 138 139 protected void operateOnTriple(final Statement input) throws MalformedRdfException { 140 try { 141 142 final Statement t = jcrRdfTools.skolemize(idTranslator, input); 143 144 final Resource subject = t.getSubject(); 145 final FedoraResource subjectNode = translator().convert(subject); 146 147 // if this is a user-managed RDF type assertion, update the node's 148 // mixins. If it isn't, treat it as a "data" property. 149 if (t.getPredicate().equals(type) && t.getObject().isResource()) { 150 final Resource mixinResource = t.getObject().asResource(); 151 if (!isManagedMixin.test(mixinResource)) { 152 LOGGER.debug("Operating on node: {} with mixin: {}.", 153 subjectNode, mixinResource); 154 operateOnMixin(mixinResource, subjectNode); 155 } else { 156 LOGGER.error("Found repository-managed mixin {} in triple {} on which we will not operate.", 157 mixinResource, t); 158 throw new ServerManagedTypeException(String.format( 159 "The repository type (%s) of this resource is system managed.", mixinResource)); 160 } 161 } else { 162 LOGGER.debug("Operating on node: {} from triple: {}.", subjectNode, 163 t); 164 operateOnProperty(t, subjectNode); 165 } 166 } catch (final ConstraintViolationException e) { 167 throw e; 168 } catch (final RepositoryException | RepositoryRuntimeException e) { 169 throw new MalformedRdfException(e.getMessage(), e); 170 } 171 } 172 173 protected abstract void operateOnProperty(final Statement t, 174 final FedoraResource subjectNode) throws RepositoryException; 175 176 protected abstract void operateOnMixin(final Resource mixinResource, 177 final FedoraResource subjectNode) throws RepositoryException; 178 179 @Override 180 public ListenableFuture<Boolean> consumeAsync() { 181 // TODO make this actually asynch 182 final SettableFuture<Boolean> result = SettableFuture.create(); 183 try { 184 consume(); 185 result.set(true); 186 } catch (final MalformedRdfException e) { 187 LOGGER.warn("Got exception consuming RDF stream", e); 188 result.setException(e); 189 result.set(false); 190 } 191 return result; 192 } 193 194 195 /** 196 * @return the stream 197 */ 198 public RdfStream stream() { 199 return stream; 200 } 201 202 /** 203 * @return the idTranslator 204 */ 205 public IdentifierConverter<Resource, FedoraResource> translator() { 206 return idTranslator; 207 } 208 209 /** 210 * @return the jcrRdfTools 211 */ 212 public JcrRdfTools jcrRdfTools() { 213 return jcrRdfTools; 214 } 215 216}