001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.kernel.modeshape.utils.iterators;
019
020import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
021import static com.hp.hpl.jena.rdf.model.ResourceFactory.createResource;
022import static com.hp.hpl.jena.vocabulary.RDF.type;
023import static java.lang.String.join;
024import static org.fcrepo.kernel.modeshape.rdf.ManagedRdf.isManagedMixin;
025import static org.fcrepo.kernel.modeshape.utils.FedoraTypesUtils.getJcrNode;
026import static org.fcrepo.kernel.modeshape.utils.FedoraTypesUtils.isFedoraBinary;
027import static org.fcrepo.kernel.api.FedoraTypes.FCR_METADATA;
028import static org.slf4j.LoggerFactory.getLogger;
029
030import org.fcrepo.kernel.api.models.FedoraResource;
031import org.fcrepo.kernel.api.exception.ConstraintViolationException;
032import org.fcrepo.kernel.api.exception.IncorrectTripleSubjectException;
033import org.fcrepo.kernel.api.exception.MalformedRdfException;
034import org.fcrepo.kernel.api.exception.ServerManagedTypeException;
035import org.fcrepo.kernel.api.exception.OutOfDomainSubjectException;
036import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
037import org.fcrepo.kernel.api.identifiers.IdentifierConverter;
038
039import javax.jcr.RepositoryException;
040import javax.jcr.Session;
041
042import org.fcrepo.kernel.api.RdfStream;
043import org.fcrepo.kernel.modeshape.rdf.JcrRdfTools;
044import org.fcrepo.kernel.api.rdf.DefaultRdfStream;
045import org.slf4j.Logger;
046
047import com.google.common.util.concurrent.ListenableFuture;
048import com.google.common.util.concurrent.SettableFuture;
049import com.hp.hpl.jena.graph.Node;
050import com.hp.hpl.jena.graph.Triple;
051import com.hp.hpl.jena.rdf.model.Model;
052import com.hp.hpl.jena.rdf.model.Resource;
053import com.hp.hpl.jena.rdf.model.Statement;
054
055import java.util.ArrayList;
056import java.util.List;
057import java.util.function.Predicate;
058
059/**
060 * @author ajs6f
061 * @since Oct 24, 2013
062 */
063public abstract class PersistingRdfStreamConsumer implements RdfStreamConsumer {
064
065    private final RdfStream stream;
066
067    private final IdentifierConverter<Resource, FedoraResource> idTranslator;
068
069    // if it's not about a Fedora resource, we don't care.
070    protected final Predicate<Triple> isFedoraSubjectTriple;
071
072    private final JcrRdfTools jcrRdfTools;
073
074    private static final Model m = createDefaultModel();
075
076    private static final Logger LOGGER = getLogger(PersistingRdfStreamConsumer.class);
077
078    private final List<String> exceptions;
079
080    /**
081     * Ordinary constructor.
082     *
083     * @param idTranslator the id translator
084     * @param session the session
085     * @param stream the rdf stream
086     */
087    public PersistingRdfStreamConsumer(final IdentifierConverter<Resource, FedoraResource> idTranslator,
088            final Session session, final RdfStream stream) {
089        this.idTranslator = idTranslator;
090        this.jcrRdfTools = new JcrRdfTools(idTranslator, session);
091        this.isFedoraSubjectTriple = t -> {
092
093            final Node subject = t.getSubject();
094            final Node topic = stream().topic();
095
096            // blank nodes are okay
097            if (!t.getSubject().isBlank()) {
098                final String subjectURI = subject.getURI();
099                final int hashIndex = subjectURI.lastIndexOf("#");
100                // a hash URI with the same base as the topic is okay, as is the topic itself
101                if ((hashIndex > 0 && topic.getURI().equals(subjectURI.substring(0, hashIndex)))
102                        || topic.equals(subject)) {
103                    LOGGER.debug("Discovered a Fedora-relevant subject in triple: {}.", t);
104                    return true;
105                } else if (topic.getURI().equals(subject.getURI() + "/" + FCR_METADATA)
106                       && isFedoraBinary.test(getJcrNode(translator().convert(createResource(subject.getURI()))))) {
107                    LOGGER.debug("Discovered a NonRDFSource subject in triple: {}.", t);
108                    return true;
109                }
110                // the subject was inappropriate in one of two ways
111                if (translator().inDomain(m.asRDFNode(subject).asResource())) {
112                    // it was in-domain, but not within this resource
113                    LOGGER.error("{} is not in the topic of this RDF, which is {}.", subject, topic);
114                    throw new IncorrectTripleSubjectException(subject +
115                            " is not in the topic of this RDF, which is " + topic);
116                }
117                // it wasn't even in in-domain!
118                LOGGER.error("subject {} is not in repository domain.", subject);
119                throw new OutOfDomainSubjectException(subject);
120            }
121            return true;
122        };
123
124        this.stream = new DefaultRdfStream(stream.topic(), stream.filter(isFedoraSubjectTriple));
125
126        this.exceptions = new ArrayList<>();
127    }
128
129    @Override
130    public void consume() throws MalformedRdfException {
131        stream.forEach(t -> {
132            final Statement s = m.asStatement(t);
133            LOGGER.debug("Operating on triple {}.", s);
134
135            try {
136                operateOnTriple(s);
137            } catch (final ConstraintViolationException e) {
138                throw e;
139            } catch (final MalformedRdfException e) {
140                exceptions.add(e.getMessage());
141            }
142        });
143
144        if (!exceptions.isEmpty()) {
145            throw new MalformedRdfException(join("\n", exceptions));
146        }
147    }
148
149    protected void operateOnTriple(final Statement input) throws MalformedRdfException {
150        try {
151
152            final Statement t = jcrRdfTools.skolemize(idTranslator, input);
153
154            final Resource subject = t.getSubject();
155            final FedoraResource subjectNode = translator().convert(subject);
156
157            // if this is a user-managed RDF type assertion, update the node's
158            // mixins. If it isn't, treat it as a "data" property.
159            if (t.getPredicate().equals(type) && t.getObject().isResource()) {
160                final Resource mixinResource = t.getObject().asResource();
161                if (!isManagedMixin.test(mixinResource)) {
162                    LOGGER.debug("Operating on node: {} with mixin: {}.",
163                            subjectNode, mixinResource);
164                    operateOnMixin(mixinResource, subjectNode);
165                } else {
166                    LOGGER.error("Found repository-managed mixin {} in triple {} on which we will not operate.",
167                            mixinResource, t);
168                    throw new ServerManagedTypeException(String.format(
169                            "The repository type (%s) of this resource is system managed.", mixinResource));
170                }
171            } else {
172                LOGGER.debug("Operating on node: {} from triple: {}.", subjectNode,
173                        t);
174                operateOnProperty(t, subjectNode);
175            }
176        } catch (final ConstraintViolationException e) {
177            throw e;
178        } catch (final RepositoryException | RepositoryRuntimeException e) {
179            throw new MalformedRdfException(e.getMessage(), e);
180        }
181    }
182
183    protected abstract void operateOnProperty(final Statement t,
184        final FedoraResource subjectNode) throws RepositoryException;
185
186    protected abstract void operateOnMixin(final Resource mixinResource,
187        final FedoraResource subjectNode) throws RepositoryException;
188
189    @Override
190    public ListenableFuture<Boolean> consumeAsync() {
191        // TODO make this actually asynch
192        final SettableFuture<Boolean> result = SettableFuture.create();
193        try {
194            consume();
195            result.set(true);
196        } catch (final MalformedRdfException e) {
197            LOGGER.warn("Got exception consuming RDF stream", e);
198            result.setException(e);
199            result.set(false);
200        }
201        return result;
202    }
203
204
205    /**
206     * @return the stream
207     */
208    public RdfStream stream() {
209        return stream;
210    }
211
212    /**
213     * @return the idTranslator
214     */
215    public IdentifierConverter<Resource, FedoraResource> translator() {
216        return idTranslator;
217    }
218
219    /**
220     * @return the jcrRdfTools
221     */
222    public JcrRdfTools jcrRdfTools() {
223        return jcrRdfTools;
224    }
225
226}