001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.modeshape.utils.iterators;
017
018import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
019import static com.hp.hpl.jena.vocabulary.RDF.type;
020import static java.lang.String.join;
021import static org.fcrepo.kernel.modeshape.rdf.ManagedRdf.isManagedMixin;
022import static org.slf4j.LoggerFactory.getLogger;
023
024import org.fcrepo.kernel.api.models.FedoraResource;
025import org.fcrepo.kernel.api.exception.ConstraintViolationException;
026import org.fcrepo.kernel.api.exception.IncorrectTripleSubjectException;
027import org.fcrepo.kernel.api.exception.MalformedRdfException;
028import org.fcrepo.kernel.api.exception.ServerManagedTypeException;
029import org.fcrepo.kernel.api.exception.OutOfDomainSubjectException;
030import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
031import org.fcrepo.kernel.api.identifiers.IdentifierConverter;
032
033import javax.jcr.RepositoryException;
034import javax.jcr.Session;
035
036import org.fcrepo.kernel.api.utils.iterators.RdfStream;
037import org.fcrepo.kernel.api.utils.iterators.RdfStreamConsumer;
038import org.fcrepo.kernel.modeshape.rdf.JcrRdfTools;
039import org.slf4j.Logger;
040
041import com.google.common.util.concurrent.ListenableFuture;
042import com.google.common.util.concurrent.SettableFuture;
043import com.hp.hpl.jena.graph.Node;
044import com.hp.hpl.jena.graph.Triple;
045import com.hp.hpl.jena.rdf.model.Model;
046import com.hp.hpl.jena.rdf.model.Resource;
047import com.hp.hpl.jena.rdf.model.Statement;
048
049import java.util.ArrayList;
050import java.util.List;
051import java.util.function.Predicate;
052
053/**
054 * @author ajs6f
055 * @since Oct 24, 2013
056 */
057public abstract class PersistingRdfStreamConsumer implements RdfStreamConsumer {
058
059    private final RdfStream stream;
060
061    private final IdentifierConverter<Resource, FedoraResource> idTranslator;
062
063    // if it's not about a Fedora resource, we don't care.
064    protected final Predicate<Triple> isFedoraSubjectTriple;
065
066    private final JcrRdfTools jcrRdfTools;
067
068    private static final Model m = createDefaultModel();
069
070    private static final Logger LOGGER = getLogger(PersistingRdfStreamConsumer.class);
071
072    private final List<String> exceptions;
073
074    /**
075     * Ordinary constructor.
076     *
077     * @param idTranslator the id translator
078     * @param session the session
079     * @param stream the rdf stream
080     */
081    public PersistingRdfStreamConsumer(final IdentifierConverter<Resource, FedoraResource> idTranslator,
082            final Session session, final RdfStream stream) {
083        this.idTranslator = idTranslator;
084        this.jcrRdfTools = new JcrRdfTools(idTranslator, session);
085        this.isFedoraSubjectTriple = t -> {
086
087            final Node subject = t.getSubject();
088            final Node topic = stream().topic();
089
090            // blank nodes are okay
091            if (!t.getSubject().isBlank()) {
092                final String subjectURI = subject.getURI();
093                final int hashIndex = subjectURI.lastIndexOf("#");
094                // a hash URI with the same base as the topic is okay, as is the topic itself
095                if ((hashIndex > 0 && topic.getURI().equals(subjectURI.substring(0, hashIndex)))
096                        || topic.equals(subject)) {
097                    LOGGER.debug("Discovered a Fedora-relevant subject in triple: {}.", t);
098                    return true;
099                }
100                // the subject was inappropriate in one of two ways
101                if (translator().inDomain(m.asRDFNode(subject).asResource())) {
102                    // it was in-domain, but not within this resource
103                    LOGGER.error("{} is not in the topic of this RDF, which is {}.", subject, topic);
104                    throw new IncorrectTripleSubjectException(subject +
105                            " is not in the topic of this RDF, which is " + topic);
106                }
107                // it wasn't even in in-domain!
108                LOGGER.error("subject {} is not in repository domain.", subject);
109                throw new OutOfDomainSubjectException(subject);
110            }
111            return true;
112        };
113        // we fail on non-Fedora RDF
114        this.stream = stream.withThisContext(stream.filter(isFedoraSubjectTriple::test));
115
116        this.exceptions = new ArrayList<>();
117    }
118
119    @Override
120    public void consume() throws MalformedRdfException {
121        while (stream.hasNext()) {
122            final Statement t = m.asStatement(stream.next());
123            LOGGER.debug("Operating triple {}.", t);
124
125            try {
126                operateOnTriple(t);
127            } catch (final ConstraintViolationException e) {
128                throw e;
129            } catch (final MalformedRdfException e) {
130                exceptions.add(e.getMessage());
131            }
132        }
133
134        if (!exceptions.isEmpty()) {
135            throw new MalformedRdfException(join("\n", exceptions));
136        }
137    }
138
139    protected void operateOnTriple(final Statement input) throws MalformedRdfException {
140        try {
141
142            final Statement t = jcrRdfTools.skolemize(idTranslator, input);
143
144            final Resource subject = t.getSubject();
145            final FedoraResource subjectNode = translator().convert(subject);
146
147            // if this is a user-managed RDF type assertion, update the node's
148            // mixins. If it isn't, treat it as a "data" property.
149            if (t.getPredicate().equals(type) && t.getObject().isResource()) {
150                final Resource mixinResource = t.getObject().asResource();
151                if (!isManagedMixin.test(mixinResource)) {
152                    LOGGER.debug("Operating on node: {} with mixin: {}.",
153                            subjectNode, mixinResource);
154                    operateOnMixin(mixinResource, subjectNode);
155                } else {
156                    LOGGER.error("Found repository-managed mixin {} in triple {} on which we will not operate.",
157                            mixinResource, t);
158                    throw new ServerManagedTypeException(String.format(
159                            "The repository type (%s) of this resource is system managed.", mixinResource));
160                }
161            } else {
162                LOGGER.debug("Operating on node: {} from triple: {}.", subjectNode,
163                        t);
164                operateOnProperty(t, subjectNode);
165            }
166        } catch (final ConstraintViolationException e) {
167            throw e;
168        } catch (final RepositoryException | RepositoryRuntimeException e) {
169            throw new MalformedRdfException(e.getMessage(), e);
170        }
171    }
172
173    protected abstract void operateOnProperty(final Statement t,
174        final FedoraResource subjectNode) throws RepositoryException;
175
176    protected abstract void operateOnMixin(final Resource mixinResource,
177        final FedoraResource subjectNode) throws RepositoryException;
178
179    @Override
180    public ListenableFuture<Boolean> consumeAsync() {
181        // TODO make this actually asynch
182        final SettableFuture<Boolean> result = SettableFuture.create();
183        try {
184            consume();
185            result.set(true);
186        } catch (final MalformedRdfException e) {
187            LOGGER.warn("Got exception consuming RDF stream", e);
188            result.setException(e);
189            result.set(false);
190        }
191        return result;
192    }
193
194
195    /**
196     * @return the stream
197     */
198    public RdfStream stream() {
199        return stream;
200    }
201
202    /**
203     * @return the idTranslator
204     */
205    public IdentifierConverter<Resource, FedoraResource> translator() {
206        return idTranslator;
207    }
208
209    /**
210     * @return the jcrRdfTools
211     */
212    public JcrRdfTools jcrRdfTools() {
213        return jcrRdfTools;
214    }
215
216}