001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.kernel.modeshape.utils.iterators; 017 018import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel; 019import static com.hp.hpl.jena.rdf.model.ResourceFactory.createResource; 020import static com.hp.hpl.jena.vocabulary.RDF.type; 021import static java.lang.String.join; 022import static org.fcrepo.kernel.modeshape.rdf.ManagedRdf.isManagedMixin; 023import static org.fcrepo.kernel.modeshape.utils.FedoraTypesUtils.isFedoraBinary; 024import static org.fcrepo.kernel.api.FedoraTypes.FCR_METADATA; 025import static org.slf4j.LoggerFactory.getLogger; 026 027import org.fcrepo.kernel.api.models.FedoraResource; 028import org.fcrepo.kernel.api.exception.ConstraintViolationException; 029import org.fcrepo.kernel.api.exception.IncorrectTripleSubjectException; 030import org.fcrepo.kernel.api.exception.MalformedRdfException; 031import org.fcrepo.kernel.api.exception.ServerManagedTypeException; 032import org.fcrepo.kernel.api.exception.OutOfDomainSubjectException; 033import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 034import org.fcrepo.kernel.api.identifiers.IdentifierConverter; 035 036import javax.jcr.RepositoryException; 037import javax.jcr.Session; 038 039import org.fcrepo.kernel.api.utils.iterators.RdfStream; 040import org.fcrepo.kernel.api.utils.iterators.RdfStreamConsumer; 041import org.fcrepo.kernel.modeshape.rdf.JcrRdfTools; 042import org.slf4j.Logger; 043 044import com.google.common.util.concurrent.ListenableFuture; 045import com.google.common.util.concurrent.SettableFuture; 046import com.hp.hpl.jena.graph.Node; 047import com.hp.hpl.jena.graph.Triple; 048import com.hp.hpl.jena.rdf.model.Model; 049import com.hp.hpl.jena.rdf.model.Resource; 050import com.hp.hpl.jena.rdf.model.Statement; 051 052import java.util.ArrayList; 053import java.util.List; 054import java.util.function.Predicate; 055 056/** 057 * @author ajs6f 058 * @since Oct 24, 2013 059 */ 060public abstract class PersistingRdfStreamConsumer implements RdfStreamConsumer { 061 062 private final RdfStream stream; 063 064 private final IdentifierConverter<Resource, FedoraResource> idTranslator; 065 066 // if it's not about a Fedora resource, we don't care. 067 protected final Predicate<Triple> isFedoraSubjectTriple; 068 069 private final JcrRdfTools jcrRdfTools; 070 071 private static final Model m = createDefaultModel(); 072 073 private static final Logger LOGGER = getLogger(PersistingRdfStreamConsumer.class); 074 075 private final List<String> exceptions; 076 077 /** 078 * Ordinary constructor. 079 * 080 * @param idTranslator the id translator 081 * @param session the session 082 * @param stream the rdf stream 083 */ 084 public PersistingRdfStreamConsumer(final IdentifierConverter<Resource, FedoraResource> idTranslator, 085 final Session session, final RdfStream stream) { 086 this.idTranslator = idTranslator; 087 this.jcrRdfTools = new JcrRdfTools(idTranslator, session); 088 this.isFedoraSubjectTriple = t -> { 089 090 final Node subject = t.getSubject(); 091 final Node topic = stream().topic(); 092 093 // blank nodes are okay 094 if (!t.getSubject().isBlank()) { 095 final String subjectURI = subject.getURI(); 096 final int hashIndex = subjectURI.lastIndexOf("#"); 097 // a hash URI with the same base as the topic is okay, as is the topic itself 098 if ((hashIndex > 0 && topic.getURI().equals(subjectURI.substring(0, hashIndex))) 099 || topic.equals(subject)) { 100 LOGGER.debug("Discovered a Fedora-relevant subject in triple: {}.", t); 101 return true; 102 } else if (topic.getURI().equals(subject.getURI() + "/" + FCR_METADATA) 103 && isFedoraBinary.test(translator().convert(createResource(subject.getURI())).getNode())) { 104 LOGGER.debug("Discovered a NonRDFSource subject in triple: {}.", t); 105 return true; 106 } 107 // the subject was inappropriate in one of two ways 108 if (translator().inDomain(m.asRDFNode(subject).asResource())) { 109 // it was in-domain, but not within this resource 110 LOGGER.error("{} is not in the topic of this RDF, which is {}.", subject, topic); 111 throw new IncorrectTripleSubjectException(subject + 112 " is not in the topic of this RDF, which is " + topic); 113 } 114 // it wasn't even in in-domain! 115 LOGGER.error("subject {} is not in repository domain.", subject); 116 throw new OutOfDomainSubjectException(subject); 117 } 118 return true; 119 }; 120 // we fail on non-Fedora RDF 121 this.stream = stream.withThisContext(stream.filter(isFedoraSubjectTriple::test)); 122 123 this.exceptions = new ArrayList<>(); 124 } 125 126 @Override 127 public void consume() throws MalformedRdfException { 128 while (stream.hasNext()) { 129 final Statement t = m.asStatement(stream.next()); 130 LOGGER.debug("Operating triple {}.", t); 131 132 try { 133 operateOnTriple(t); 134 } catch (final ConstraintViolationException e) { 135 throw e; 136 } catch (final MalformedRdfException e) { 137 exceptions.add(e.getMessage()); 138 } 139 } 140 141 if (!exceptions.isEmpty()) { 142 throw new MalformedRdfException(join("\n", exceptions)); 143 } 144 } 145 146 protected void operateOnTriple(final Statement input) throws MalformedRdfException { 147 try { 148 149 final Statement t = jcrRdfTools.skolemize(idTranslator, input); 150 151 final Resource subject = t.getSubject(); 152 final FedoraResource subjectNode = translator().convert(subject); 153 154 // if this is a user-managed RDF type assertion, update the node's 155 // mixins. If it isn't, treat it as a "data" property. 156 if (t.getPredicate().equals(type) && t.getObject().isResource()) { 157 final Resource mixinResource = t.getObject().asResource(); 158 if (!isManagedMixin.test(mixinResource)) { 159 LOGGER.debug("Operating on node: {} with mixin: {}.", 160 subjectNode, mixinResource); 161 operateOnMixin(mixinResource, subjectNode); 162 } else { 163 LOGGER.error("Found repository-managed mixin {} in triple {} on which we will not operate.", 164 mixinResource, t); 165 throw new ServerManagedTypeException(String.format( 166 "The repository type (%s) of this resource is system managed.", mixinResource)); 167 } 168 } else { 169 LOGGER.debug("Operating on node: {} from triple: {}.", subjectNode, 170 t); 171 operateOnProperty(t, subjectNode); 172 } 173 } catch (final ConstraintViolationException e) { 174 throw e; 175 } catch (final RepositoryException | RepositoryRuntimeException e) { 176 throw new MalformedRdfException(e.getMessage(), e); 177 } 178 } 179 180 protected abstract void operateOnProperty(final Statement t, 181 final FedoraResource subjectNode) throws RepositoryException; 182 183 protected abstract void operateOnMixin(final Resource mixinResource, 184 final FedoraResource subjectNode) throws RepositoryException; 185 186 @Override 187 public ListenableFuture<Boolean> consumeAsync() { 188 // TODO make this actually asynch 189 final SettableFuture<Boolean> result = SettableFuture.create(); 190 try { 191 consume(); 192 result.set(true); 193 } catch (final MalformedRdfException e) { 194 LOGGER.warn("Got exception consuming RDF stream", e); 195 result.setException(e); 196 result.set(false); 197 } 198 return result; 199 } 200 201 202 /** 203 * @return the stream 204 */ 205 public RdfStream stream() { 206 return stream; 207 } 208 209 /** 210 * @return the idTranslator 211 */ 212 public IdentifierConverter<Resource, FedoraResource> translator() { 213 return idTranslator; 214 } 215 216 /** 217 * @return the jcrRdfTools 218 */ 219 public JcrRdfTools jcrRdfTools() { 220 return jcrRdfTools; 221 } 222 223}