001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.kernel.modeshape.utils.iterators; 019 020import static org.apache.jena.rdf.model.ModelFactory.createDefaultModel; 021import static org.apache.jena.rdf.model.ResourceFactory.createResource; 022import static org.apache.jena.vocabulary.RDF.type; 023import static java.lang.String.join; 024import static org.fcrepo.kernel.modeshape.rdf.ManagedRdf.isManagedMixin; 025import static org.fcrepo.kernel.modeshape.utils.FedoraTypesUtils.getJcrNode; 026import static org.fcrepo.kernel.modeshape.utils.FedoraTypesUtils.isFedoraBinary; 027import static org.fcrepo.kernel.api.FedoraTypes.FCR_METADATA; 028import static org.slf4j.LoggerFactory.getLogger; 029 030import org.fcrepo.kernel.api.models.FedoraResource; 031import org.fcrepo.kernel.api.exception.ConstraintViolationException; 032import org.fcrepo.kernel.api.exception.IncorrectTripleSubjectException; 033import org.fcrepo.kernel.api.exception.MalformedRdfException; 034import org.fcrepo.kernel.api.exception.ServerManagedTypeException; 035import org.fcrepo.kernel.api.exception.OutOfDomainSubjectException; 036import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 037import org.fcrepo.kernel.api.identifiers.IdentifierConverter; 038 039import javax.jcr.RepositoryException; 040import javax.jcr.Session; 041 042import org.fcrepo.kernel.api.RdfStream; 043import org.fcrepo.kernel.modeshape.rdf.JcrRdfTools; 044import org.fcrepo.kernel.api.rdf.DefaultRdfStream; 045import org.slf4j.Logger; 046 047import com.google.common.util.concurrent.ListenableFuture; 048import com.google.common.util.concurrent.SettableFuture; 049import org.apache.jena.graph.Node; 050import org.apache.jena.graph.Triple; 051import org.apache.jena.rdf.model.Model; 052import org.apache.jena.rdf.model.Resource; 053import org.apache.jena.rdf.model.Statement; 054 055import java.util.ArrayList; 056import java.util.List; 057import java.util.function.Predicate; 058 059/** 060 * @author ajs6f 061 * @since Oct 24, 2013 062 */ 063public abstract class PersistingRdfStreamConsumer implements RdfStreamConsumer { 064 065 private final RdfStream stream; 066 067 private final IdentifierConverter<Resource, FedoraResource> idTranslator; 068 069 // if it's not about a Fedora resource, we don't care. 070 private final Predicate<Triple> isFedoraSubjectTriple; 071 072 private final JcrRdfTools jcrRdfTools; 073 074 private static final Model m = createDefaultModel(); 075 076 private static final Logger LOGGER = getLogger(PersistingRdfStreamConsumer.class); 077 078 private final List<String> exceptions; 079 080 /** 081 * Ordinary constructor. 082 * 083 * @param idTranslator the id translator 084 * @param session the session 085 * @param stream the rdf stream 086 */ 087 public PersistingRdfStreamConsumer(final IdentifierConverter<Resource, FedoraResource> idTranslator, 088 final Session session, final RdfStream stream) { 089 this.idTranslator = idTranslator; 090 this.jcrRdfTools = new JcrRdfTools(idTranslator, session); 091 this.isFedoraSubjectTriple = t -> { 092 093 final Node subject = t.getSubject(); 094 final Node topic = stream().topic(); 095 096 // blank nodes are okay 097 if (!t.getSubject().isBlank()) { 098 final String subjectURI = subject.getURI(); 099 final int hashIndex = subjectURI.lastIndexOf("#"); 100 // a hash URI with the same base as the topic is okay, as is the topic itself 101 if ((hashIndex > 0 && topic.getURI().equals(subjectURI.substring(0, hashIndex))) 102 || topic.equals(subject)) { 103 LOGGER.debug("Discovered a Fedora-relevant subject in triple: {}.", t); 104 return true; 105 } else if (topic.getURI().equals(subject.getURI() + "/" + FCR_METADATA) 106 && isFedoraBinary.test(getJcrNode(translator().convert(createResource(subject.getURI()))))) { 107 LOGGER.debug("Discovered a NonRDFSource subject in triple: {}.", t); 108 return true; 109 } 110 // the subject was inappropriate in one of two ways 111 if (translator().inDomain(m.asRDFNode(subject).asResource())) { 112 // it was in-domain, but not within this resource 113 LOGGER.error("{} is not in the topic of this RDF, which is {}.", subject, topic); 114 throw new IncorrectTripleSubjectException(subject + 115 " is not in the topic of this RDF, which is " + topic); 116 } 117 // it wasn't even in in-domain! 118 LOGGER.error("subject {} is not in repository domain.", subject); 119 throw new OutOfDomainSubjectException(subject); 120 } 121 return true; 122 }; 123 124 this.stream = new DefaultRdfStream(stream.topic(), stream.filter(isFedoraSubjectTriple)); 125 126 this.exceptions = new ArrayList<>(); 127 } 128 129 @Override 130 public void consume() { 131 stream.forEach(t -> { 132 final Statement s = m.asStatement(t); 133 LOGGER.debug("Operating on triple {}.", s); 134 135 try { 136 operateOnTriple(s); 137 } catch (final MalformedRdfException e) { 138 exceptions.add(e.getMessage()); 139 } catch (final ConstraintViolationException e) { 140 throw e; 141 } 142 }); 143 144 if (!exceptions.isEmpty()) { 145 throw new MalformedRdfException(join("\n", exceptions)); 146 } 147 } 148 149 protected void operateOnTriple(final Statement input) { 150 try { 151 152 final Statement t = jcrRdfTools.skolemize(idTranslator, input, stream().topic().toString()); 153 154 final Resource subject = t.getSubject(); 155 final FedoraResource subjectNode = translator().convert(subject); 156 157 // if this is a user-managed RDF type assertion, update the node's 158 // mixins. If it isn't, treat it as a "data" property. 159 if (t.getPredicate().equals(type) && t.getObject().isResource()) { 160 final Resource mixinResource = t.getObject().asResource(); 161 if (!isManagedMixin.test(mixinResource)) { 162 LOGGER.debug("Operating on node: {} with mixin: {}.", 163 subjectNode, mixinResource); 164 operateOnMixin(mixinResource, subjectNode); 165 } else { 166 LOGGER.error("Found repository-managed mixin {} in triple {} on which we will not operate.", 167 mixinResource, t); 168 throw new ServerManagedTypeException(String.format( 169 "The repository type (%s) of this resource is system managed.", mixinResource)); 170 } 171 } else { 172 LOGGER.debug("Operating on node: {} from triple: {}.", subjectNode, 173 t); 174 operateOnProperty(t, subjectNode); 175 } 176 } catch (final ConstraintViolationException e) { 177 throw e; 178 } catch (final RepositoryException | RepositoryRuntimeException e) { 179 throw new MalformedRdfException(e.getMessage(), e); 180 } 181 } 182 183 protected abstract void operateOnProperty(final Statement t, 184 final FedoraResource subjectNode) throws RepositoryException; 185 186 protected abstract void operateOnMixin(final Resource mixinResource, 187 final FedoraResource subjectNode) throws RepositoryException; 188 189 @Override 190 public ListenableFuture<Boolean> consumeAsync() { 191 // TODO make this actually asynch 192 final SettableFuture<Boolean> result = SettableFuture.create(); 193 try { 194 consume(); 195 result.set(true); 196 } catch (final MalformedRdfException e) { 197 LOGGER.warn("Got exception consuming RDF stream", e); 198 result.setException(e); 199 result.set(false); 200 } 201 return result; 202 } 203 204 205 /** 206 * @return the stream 207 */ 208 public RdfStream stream() { 209 return stream; 210 } 211 212 /** 213 * @return the idTranslator 214 */ 215 public IdentifierConverter<Resource, FedoraResource> translator() { 216 return idTranslator; 217 } 218 219 /** 220 * @return the jcrRdfTools 221 */ 222 public JcrRdfTools jcrRdfTools() { 223 return jcrRdfTools; 224 } 225 226}