001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.modeshape.utils.iterators;
017
018import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
019import static com.hp.hpl.jena.rdf.model.ResourceFactory.createResource;
020import static com.hp.hpl.jena.vocabulary.RDF.type;
021import static java.lang.String.join;
022import static org.fcrepo.kernel.modeshape.rdf.ManagedRdf.isManagedMixin;
023import static org.fcrepo.kernel.modeshape.utils.FedoraTypesUtils.isFedoraBinary;
024import static org.fcrepo.kernel.api.FedoraTypes.FCR_METADATA;
025import static org.slf4j.LoggerFactory.getLogger;
026
027import org.fcrepo.kernel.api.models.FedoraResource;
028import org.fcrepo.kernel.api.exception.ConstraintViolationException;
029import org.fcrepo.kernel.api.exception.IncorrectTripleSubjectException;
030import org.fcrepo.kernel.api.exception.MalformedRdfException;
031import org.fcrepo.kernel.api.exception.ServerManagedTypeException;
032import org.fcrepo.kernel.api.exception.OutOfDomainSubjectException;
033import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
034import org.fcrepo.kernel.api.identifiers.IdentifierConverter;
035
036import javax.jcr.RepositoryException;
037import javax.jcr.Session;
038
039import org.fcrepo.kernel.api.utils.iterators.RdfStream;
040import org.fcrepo.kernel.api.utils.iterators.RdfStreamConsumer;
041import org.fcrepo.kernel.modeshape.rdf.JcrRdfTools;
042import org.slf4j.Logger;
043
044import com.google.common.util.concurrent.ListenableFuture;
045import com.google.common.util.concurrent.SettableFuture;
046import com.hp.hpl.jena.graph.Node;
047import com.hp.hpl.jena.graph.Triple;
048import com.hp.hpl.jena.rdf.model.Model;
049import com.hp.hpl.jena.rdf.model.Resource;
050import com.hp.hpl.jena.rdf.model.Statement;
051
052import java.util.ArrayList;
053import java.util.List;
054import java.util.function.Predicate;
055
056/**
057 * @author ajs6f
058 * @since Oct 24, 2013
059 */
060public abstract class PersistingRdfStreamConsumer implements RdfStreamConsumer {
061
062    private final RdfStream stream;
063
064    private final IdentifierConverter<Resource, FedoraResource> idTranslator;
065
066    // if it's not about a Fedora resource, we don't care.
067    protected final Predicate<Triple> isFedoraSubjectTriple;
068
069    private final JcrRdfTools jcrRdfTools;
070
071    private static final Model m = createDefaultModel();
072
073    private static final Logger LOGGER = getLogger(PersistingRdfStreamConsumer.class);
074
075    private final List<String> exceptions;
076
077    /**
078     * Ordinary constructor.
079     *
080     * @param idTranslator the id translator
081     * @param session the session
082     * @param stream the rdf stream
083     */
084    public PersistingRdfStreamConsumer(final IdentifierConverter<Resource, FedoraResource> idTranslator,
085            final Session session, final RdfStream stream) {
086        this.idTranslator = idTranslator;
087        this.jcrRdfTools = new JcrRdfTools(idTranslator, session);
088        this.isFedoraSubjectTriple = t -> {
089
090            final Node subject = t.getSubject();
091            final Node topic = stream().topic();
092
093            // blank nodes are okay
094            if (!t.getSubject().isBlank()) {
095                final String subjectURI = subject.getURI();
096                final int hashIndex = subjectURI.lastIndexOf("#");
097                // a hash URI with the same base as the topic is okay, as is the topic itself
098                if ((hashIndex > 0 && topic.getURI().equals(subjectURI.substring(0, hashIndex)))
099                        || topic.equals(subject)) {
100                    LOGGER.debug("Discovered a Fedora-relevant subject in triple: {}.", t);
101                    return true;
102                } else if (topic.getURI().equals(subject.getURI() + "/" + FCR_METADATA)
103                       && isFedoraBinary.test(translator().convert(createResource(subject.getURI())).getNode())) {
104                    LOGGER.debug("Discovered a NonRDFSource subject in triple: {}.", t);
105                    return true;
106                }
107                // the subject was inappropriate in one of two ways
108                if (translator().inDomain(m.asRDFNode(subject).asResource())) {
109                    // it was in-domain, but not within this resource
110                    LOGGER.error("{} is not in the topic of this RDF, which is {}.", subject, topic);
111                    throw new IncorrectTripleSubjectException(subject +
112                            " is not in the topic of this RDF, which is " + topic);
113                }
114                // it wasn't even in in-domain!
115                LOGGER.error("subject {} is not in repository domain.", subject);
116                throw new OutOfDomainSubjectException(subject);
117            }
118            return true;
119        };
120        // we fail on non-Fedora RDF
121        this.stream = stream.withThisContext(stream.filter(isFedoraSubjectTriple::test));
122
123        this.exceptions = new ArrayList<>();
124    }
125
126    @Override
127    public void consume() throws MalformedRdfException {
128        while (stream.hasNext()) {
129            final Statement t = m.asStatement(stream.next());
130            LOGGER.debug("Operating triple {}.", t);
131
132            try {
133                operateOnTriple(t);
134            } catch (final ConstraintViolationException e) {
135                throw e;
136            } catch (final MalformedRdfException e) {
137                exceptions.add(e.getMessage());
138            }
139        }
140
141        if (!exceptions.isEmpty()) {
142            throw new MalformedRdfException(join("\n", exceptions));
143        }
144    }
145
146    protected void operateOnTriple(final Statement input) throws MalformedRdfException {
147        try {
148
149            final Statement t = jcrRdfTools.skolemize(idTranslator, input);
150
151            final Resource subject = t.getSubject();
152            final FedoraResource subjectNode = translator().convert(subject);
153
154            // if this is a user-managed RDF type assertion, update the node's
155            // mixins. If it isn't, treat it as a "data" property.
156            if (t.getPredicate().equals(type) && t.getObject().isResource()) {
157                final Resource mixinResource = t.getObject().asResource();
158                if (!isManagedMixin.test(mixinResource)) {
159                    LOGGER.debug("Operating on node: {} with mixin: {}.",
160                            subjectNode, mixinResource);
161                    operateOnMixin(mixinResource, subjectNode);
162                } else {
163                    LOGGER.error("Found repository-managed mixin {} in triple {} on which we will not operate.",
164                            mixinResource, t);
165                    throw new ServerManagedTypeException(String.format(
166                            "The repository type (%s) of this resource is system managed.", mixinResource));
167                }
168            } else {
169                LOGGER.debug("Operating on node: {} from triple: {}.", subjectNode,
170                        t);
171                operateOnProperty(t, subjectNode);
172            }
173        } catch (final ConstraintViolationException e) {
174            throw e;
175        } catch (final RepositoryException | RepositoryRuntimeException e) {
176            throw new MalformedRdfException(e.getMessage(), e);
177        }
178    }
179
180    protected abstract void operateOnProperty(final Statement t,
181        final FedoraResource subjectNode) throws RepositoryException;
182
183    protected abstract void operateOnMixin(final Resource mixinResource,
184        final FedoraResource subjectNode) throws RepositoryException;
185
186    @Override
187    public ListenableFuture<Boolean> consumeAsync() {
188        // TODO make this actually asynch
189        final SettableFuture<Boolean> result = SettableFuture.create();
190        try {
191            consume();
192            result.set(true);
193        } catch (final MalformedRdfException e) {
194            LOGGER.warn("Got exception consuming RDF stream", e);
195            result.setException(e);
196            result.set(false);
197        }
198        return result;
199    }
200
201
202    /**
203     * @return the stream
204     */
205    public RdfStream stream() {
206        return stream;
207    }
208
209    /**
210     * @return the idTranslator
211     */
212    public IdentifierConverter<Resource, FedoraResource> translator() {
213        return idTranslator;
214    }
215
216    /**
217     * @return the jcrRdfTools
218     */
219    public JcrRdfTools jcrRdfTools() {
220        return jcrRdfTools;
221    }
222
223}