001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.kernel.api.rdf;
019
020import static java.util.Spliterator.IMMUTABLE;
021import static java.util.Spliterators.spliteratorUnknownSize;
022import static java.util.stream.Stream.empty;
023import static java.util.stream.StreamSupport.stream;
024
025import java.util.Comparator;
026import java.util.Objects;
027import java.util.function.Consumer;
028import java.util.function.Predicate;
029import java.util.stream.Stream;
030
031import com.hp.hpl.jena.graph.Node;
032import com.hp.hpl.jena.graph.Triple;
033import com.hp.hpl.jena.rdf.model.Model;
034import com.hp.hpl.jena.rdf.model.Statement;
035import org.fcrepo.kernel.api.utils.WrappingStream;
036import org.fcrepo.kernel.api.RdfStream;
037
038/**
039 * Implementation of a context-bearing RDF stream
040 *
041 * @author acoburn
042 * @since Dec 6, 2015
043 */
044public class DefaultRdfStream extends WrappingStream<Triple> implements RdfStream {
045
046    protected final Node node;
047
048    /**
049     * Create an RdfStream
050     * @param node the topic of the stream
051     */
052    public DefaultRdfStream(final Node node) {
053        this(node, empty());
054    }
055
056    /**
057     * Create an RdfStream
058     * @param node the topic of the stream
059     * @param stream the incoming stream
060     */
061    public DefaultRdfStream(final Node node, final Stream<Triple> stream) {
062        Objects.requireNonNull(node);
063        this.node = node;
064        this.stream = stream;
065    }
066
067    /**
068     * Create an RdfStream from an existing Model.
069     * @param node The subject node
070     * @param model An input Model
071     * @return a new RdfStream object
072     */
073    public static RdfStream fromModel(final Node node, final Model model) {
074        return new DefaultRdfStream(node,
075                stream(spliteratorUnknownSize(model.listStatements(), IMMUTABLE), false).map(Statement::asTriple));
076    }
077
078    /**
079     * Concatenate a Triple stream to the existing stream
080     * @param stream additional triples
081     */
082    protected void concat(final Stream<Triple> stream) {
083        this.stream = Stream.concat(this.stream, stream);
084    }
085
086    @Override
087    public Node topic() {
088        return node;
089    }
090
091    @Override
092    public RdfStream distinct() {
093        return new DefaultRdfStream(topic(), stream.distinct());
094    }
095
096    @Override
097    public RdfStream filter(final Predicate<? super Triple> predicate) {
098        return new DefaultRdfStream(topic(), stream.filter(predicate));
099    }
100
101    @Override
102    public RdfStream limit(final long maxSize) {
103        return new DefaultRdfStream(topic(), stream.limit(maxSize));
104    }
105
106    @Override
107    public RdfStream peek(final Consumer<? super Triple> action) {
108        return new DefaultRdfStream(topic(), stream.peek(action));
109    }
110
111    @Override
112    public RdfStream skip(final long n) {
113        return new DefaultRdfStream(topic(), stream.skip(n));
114    }
115
116    @Override
117    public RdfStream sorted() {
118        return new DefaultRdfStream(topic(), stream.sorted());
119    }
120
121    @Override
122    public RdfStream sorted(final Comparator<? super Triple> comparator) {
123        return new DefaultRdfStream(topic(), stream.sorted(comparator));
124    }
125
126    @Override
127    public RdfStream onClose(final Runnable closeHandler) {
128        return new DefaultRdfStream(topic(), stream.onClose(closeHandler));
129    }
130
131    @Override
132    public RdfStream parallel() {
133        return new DefaultRdfStream(topic(), stream.parallel());
134    }
135
136    @Override
137    public RdfStream sequential() {
138        return new DefaultRdfStream(topic(), stream.sequential());
139    }
140
141    @Override
142    public RdfStream unordered() {
143        return new DefaultRdfStream(topic(), stream.unordered());
144    }
145}