001/*
002 * The contents of this file are subject to the license and copyright
003 * detailed in the LICENSE and NOTICE files at the root of the source
004 * tree.
005 */
006package org.fcrepo.kernel.api.rdf;
007
008import static java.util.Spliterator.IMMUTABLE;
009import static java.util.Spliterators.spliteratorUnknownSize;
010import static java.util.stream.Stream.empty;
011import static java.util.stream.StreamSupport.stream;
012
013import java.util.Comparator;
014import java.util.Objects;
015import java.util.function.Consumer;
016import java.util.function.Predicate;
017import java.util.stream.Stream;
018
019import org.apache.jena.graph.Node;
020import org.apache.jena.graph.Triple;
021import org.apache.jena.rdf.model.Model;
022import org.apache.jena.rdf.model.Statement;
023import org.fcrepo.kernel.api.utils.WrappingStream;
024import org.fcrepo.kernel.api.RdfStream;
025
026/**
027 * Implementation of a context-bearing RDF stream
028 *
029 * @author acoburn
030 * @since Dec 6, 2015
031 */
032public class DefaultRdfStream extends WrappingStream<Triple> implements RdfStream {
033
034    private final Node node;
035
036    /**
037     * Create an RdfStream
038     * @param node the topic of the stream
039     */
040    public DefaultRdfStream(final Node node) {
041        this(node, empty());
042    }
043
044    /**
045     * Create an RdfStream
046     * @param node the topic of the stream
047     * @param stream the incoming stream
048     */
049    public DefaultRdfStream(final Node node, final Stream<Triple> stream) {
050        Objects.requireNonNull(node);
051        this.node = node;
052        this.stream = stream;
053    }
054
055    /**
056     * Create an RdfStream from an existing Model.
057     * @param node The subject node
058     * @param model An input Model
059     * @return a new RdfStream object
060     */
061    public static RdfStream fromModel(final Node node, final Model model) {
062        return new DefaultRdfStream(node,
063                stream(spliteratorUnknownSize(model.listStatements(), IMMUTABLE), false).map(Statement::asTriple));
064    }
065
066    /**
067     * Concatenate a Triple stream to the existing stream
068     * @param stream additional triples
069     */
070    protected void concat(final Stream<Triple> stream) {
071        this.stream = Stream.concat(this.stream, stream);
072    }
073
074    @Override
075    public Node topic() {
076        return node;
077    }
078
079    @Override
080    public RdfStream distinct() {
081        return new DefaultRdfStream(topic(), stream.distinct());
082    }
083
084    @Override
085    public RdfStream filter(final Predicate<? super Triple> predicate) {
086        return new DefaultRdfStream(topic(), stream.filter(predicate));
087    }
088
089    @Override
090    public RdfStream limit(final long maxSize) {
091        return new DefaultRdfStream(topic(), stream.limit(maxSize));
092    }
093
094    @Override
095    public RdfStream peek(final Consumer<? super Triple> action) {
096        return new DefaultRdfStream(topic(), stream.peek(action));
097    }
098
099    @Override
100    public RdfStream skip(final long n) {
101        return new DefaultRdfStream(topic(), stream.skip(n));
102    }
103
104    @Override
105    public RdfStream sorted() {
106        return new DefaultRdfStream(topic(), stream.sorted());
107    }
108
109    @Override
110    public RdfStream sorted(final Comparator<? super Triple> comparator) {
111        return new DefaultRdfStream(topic(), stream.sorted(comparator));
112    }
113
114    @Override
115    public RdfStream onClose(final Runnable closeHandler) {
116        return new DefaultRdfStream(topic(), stream.onClose(closeHandler));
117    }
118
119    @Override
120    public RdfStream parallel() {
121        return new DefaultRdfStream(topic(), stream.parallel());
122    }
123
124    @Override
125    public RdfStream sequential() {
126        return new DefaultRdfStream(topic(), stream.sequential());
127    }
128
129    @Override
130    public RdfStream unordered() {
131        return new DefaultRdfStream(topic(), stream.unordered());
132    }
133}