001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.impl.utils.iterators;
017
018import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
019import static com.hp.hpl.jena.vocabulary.RDF.type;
020import static org.fcrepo.kernel.impl.rdf.ManagedRdf.isManagedMixin;
021import static org.slf4j.LoggerFactory.getLogger;
022
023import com.google.common.base.Joiner;
024import org.fcrepo.kernel.models.FedoraResource;
025import org.fcrepo.kernel.exception.MalformedRdfException;
026import org.fcrepo.kernel.exception.RepositoryRuntimeException;
027import org.fcrepo.kernel.identifiers.IdentifierConverter;
028
029import javax.jcr.RepositoryException;
030import javax.jcr.Session;
031
032import org.fcrepo.kernel.impl.rdf.JcrRdfTools;
033import org.fcrepo.kernel.utils.iterators.RdfStream;
034import org.fcrepo.kernel.utils.iterators.RdfStreamConsumer;
035import org.slf4j.Logger;
036
037import com.google.common.base.Predicate;
038import com.google.common.util.concurrent.ListenableFuture;
039import com.google.common.util.concurrent.SettableFuture;
040import com.hp.hpl.jena.graph.Triple;
041import com.hp.hpl.jena.rdf.model.Model;
042import com.hp.hpl.jena.rdf.model.Resource;
043import com.hp.hpl.jena.rdf.model.Statement;
044
045import java.util.ArrayList;
046import java.util.List;
047
048/**
049 * @author ajs6f
050 * @since Oct 24, 2013
051 */
052public abstract class PersistingRdfStreamConsumer implements RdfStreamConsumer {
053
054    private final RdfStream stream;
055
056    private final IdentifierConverter<Resource, FedoraResource> idTranslator;
057
058    // if it's not about a Fedora resource, we don't care.
059    protected final Predicate<Triple> isFedoraSubjectTriple;
060
061    private final JcrRdfTools jcrRdfTools;
062
063    private static final Model m = createDefaultModel();
064
065    private static final Logger LOGGER = getLogger(PersistingRdfStreamConsumer.class);
066
067    private final List<String> exceptions;
068
069    /**
070     * Ordinary constructor.
071     *
072     * @param idTranslator the id translator
073     * @param session the session
074     * @param stream the rdf stream
075     */
076    public PersistingRdfStreamConsumer(final IdentifierConverter<Resource, FedoraResource> idTranslator,
077            final Session session, final RdfStream stream) {
078        this.idTranslator = idTranslator;
079        this.jcrRdfTools = new JcrRdfTools(idTranslator, session);
080        this.isFedoraSubjectTriple = new Predicate<Triple>() {
081
082            @Override
083            public boolean apply(final Triple t) {
084
085                final boolean result = idTranslator.inDomain(m.asStatement(t).getSubject())
086                        || t.getSubject().isBlank();
087                if (result) {
088                    LOGGER.debug(
089                            "Discovered a Fedora-relevant subject in triple: {}.",
090                            t);
091                } else {
092                    LOGGER.debug("Ignoring triple: {}.", t);
093                }
094                return result;
095            }
096
097        };
098        // we knock out non-Fedora RDF
099        this.stream =
100                stream.withThisContext(stream.filter(isFedoraSubjectTriple));
101
102        this.exceptions = new ArrayList<>();
103    }
104
105    @Override
106    public void consume() throws MalformedRdfException {
107        while (stream.hasNext()) {
108            final Statement t = m.asStatement(stream.next());
109            LOGGER.debug("Operating triple {}.", t);
110
111            try {
112                operateOnTriple(t);
113            } catch (final MalformedRdfException e) {
114                exceptions.add(e.getMessage());
115            }
116        }
117
118        if (!exceptions.isEmpty()) {
119            throw new MalformedRdfException(Joiner.on("\n").join(exceptions));
120        }
121    }
122
123    protected void operateOnTriple(final Statement input) throws MalformedRdfException {
124        try {
125
126            final Statement t = jcrRdfTools.skolemize(idTranslator, input);
127
128            final Resource subject = t.getSubject();
129            final FedoraResource subjectNode = translator().convert(subject);
130
131            // if this is a user-managed RDF type assertion, update the node's
132            // mixins. If it isn't, treat it as a "data" property.
133            if (t.getPredicate().equals(type) && t.getObject().isResource()) {
134                final Resource mixinResource = t.getObject().asResource();
135                if (!isManagedMixin.apply(mixinResource)) {
136                    LOGGER.debug("Operating on node: {} with mixin: {}.",
137                            subjectNode, mixinResource);
138                    operateOnMixin(mixinResource, subjectNode);
139                } else {
140                    LOGGER.debug("Found repository-managed mixin on which we will not operate.");
141                }
142            } else {
143                LOGGER.debug("Operating on node: {} from triple: {}.", subjectNode,
144                        t);
145                operateOnProperty(t, subjectNode);
146            }
147        } catch (final RepositoryException | RepositoryRuntimeException e) {
148            throw new MalformedRdfException(e.getMessage(), e);
149        }
150    }
151
152    protected abstract void operateOnProperty(final Statement t,
153        final FedoraResource subjectNode) throws RepositoryException;
154
155    protected abstract void operateOnMixin(final Resource mixinResource,
156        final FedoraResource subjectNode) throws RepositoryException;
157
158    @Override
159    public ListenableFuture<Boolean> consumeAsync() {
160        // TODO make this actually asynch
161        final SettableFuture<Boolean> result = SettableFuture.create();
162        try {
163            consume();
164            result.set(true);
165        } catch (final MalformedRdfException e) {
166            LOGGER.warn("Got exception consuming RDF stream", e);
167            result.setException(e);
168            result.set(false);
169        }
170        return result;
171    }
172
173
174    /**
175     * @return the stream
176     */
177    public RdfStream stream() {
178        return stream;
179    }
180
181
182    /**
183     * @return the idTranslator
184     */
185    public IdentifierConverter<Resource,FedoraResource> translator() {
186        return idTranslator;
187    }
188
189
190    /**
191     * @return the jcrRdfTools
192     */
193    public JcrRdfTools jcrRdfTools() {
194        return jcrRdfTools;
195    }
196
197}