001/*
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.api.rdf;
017
018import static java.util.Spliterator.IMMUTABLE;
019import static java.util.Spliterators.spliteratorUnknownSize;
020import static java.util.stream.Stream.empty;
021import static java.util.stream.StreamSupport.stream;
022
023import java.util.Comparator;
024import java.util.Objects;
025import java.util.function.Consumer;
026import java.util.function.Predicate;
027import java.util.stream.Stream;
028
029import com.hp.hpl.jena.graph.Node;
030import com.hp.hpl.jena.graph.Triple;
031import com.hp.hpl.jena.rdf.model.Model;
032import com.hp.hpl.jena.rdf.model.Statement;
033import org.fcrepo.kernel.api.utils.WrappingStream;
034import org.fcrepo.kernel.api.RdfStream;
035
036/**
037 * Implementation of a context-bearing RDF stream
038 *
039 * @author acoburn
040 * @since Dec 6, 2015
041 */
042public class DefaultRdfStream extends WrappingStream<Triple> implements RdfStream {
043
044    protected final Node node;
045
046    /**
047     * Create an RdfStream
048     * @param node the topic of the stream
049     */
050    public DefaultRdfStream(final Node node) {
051        this(node, empty());
052    }
053
054    /**
055     * Create an RdfStream
056     * @param node the topic of the stream
057     * @param stream the incoming stream
058     */
059    public DefaultRdfStream(final Node node, final Stream<Triple> stream) {
060        Objects.requireNonNull(node);
061        this.node = node;
062        this.stream = stream;
063    }
064
065    /**
066     * Create an RdfStream from an existing Model.
067     * @param node The subject node
068     * @param model An input Model
069     * @return a new RdfStream object
070     */
071    public static RdfStream fromModel(final Node node, final Model model) {
072        return new DefaultRdfStream(node,
073                stream(spliteratorUnknownSize(model.listStatements(), IMMUTABLE), false).map(Statement::asTriple));
074    }
075
076    /**
077     * Concatenate a Triple stream to the existing stream
078     * @param stream additional triples
079     */
080    protected void concat(final Stream<Triple> stream) {
081        this.stream = Stream.concat(this.stream, stream);
082    }
083
084    @Override
085    public Node topic() {
086        return node;
087    }
088
089    @Override
090    public RdfStream distinct() {
091        return new DefaultRdfStream(topic(), stream.distinct());
092    }
093
094    @Override
095    public RdfStream filter(final Predicate<? super Triple> predicate) {
096        return new DefaultRdfStream(topic(), stream.filter(predicate));
097    }
098
099    @Override
100    public RdfStream limit(final long maxSize) {
101        return new DefaultRdfStream(topic(), stream.limit(maxSize));
102    }
103
104    @Override
105    public RdfStream peek(final Consumer<? super Triple> action) {
106        return new DefaultRdfStream(topic(), stream.peek(action));
107    }
108
109    @Override
110    public RdfStream skip(final long n) {
111        return new DefaultRdfStream(topic(), stream.skip(n));
112    }
113
114    @Override
115    public RdfStream sorted() {
116        return new DefaultRdfStream(topic(), stream.sorted());
117    }
118
119    @Override
120    public RdfStream sorted(final Comparator<? super Triple> comparator) {
121        return new DefaultRdfStream(topic(), stream.sorted(comparator));
122    }
123
124    @Override
125    public RdfStream onClose(final Runnable closeHandler) {
126        return new DefaultRdfStream(topic(), stream.onClose(closeHandler));
127    }
128
129    @Override
130    public RdfStream parallel() {
131        return new DefaultRdfStream(topic(), stream.parallel());
132    }
133
134    @Override
135    public RdfStream sequential() {
136        return new DefaultRdfStream(topic(), stream.sequential());
137    }
138
139    @Override
140    public RdfStream unordered() {
141        return new DefaultRdfStream(topic(), stream.unordered());
142    }
143}