001/* 002 * The contents of this file are subject to the license and copyright 003 * detailed in the LICENSE and NOTICE files at the root of the source 004 * tree. 005 */ 006package org.fcrepo.kernel.api.rdf; 007 008import static java.util.Spliterator.IMMUTABLE; 009import static java.util.Spliterators.spliteratorUnknownSize; 010import static java.util.stream.Stream.empty; 011import static java.util.stream.StreamSupport.stream; 012 013import java.util.Comparator; 014import java.util.Objects; 015import java.util.function.Consumer; 016import java.util.function.Predicate; 017import java.util.stream.Stream; 018 019import org.apache.jena.graph.Node; 020import org.apache.jena.graph.Triple; 021import org.apache.jena.rdf.model.Model; 022import org.apache.jena.rdf.model.Statement; 023import org.fcrepo.kernel.api.utils.WrappingStream; 024import org.fcrepo.kernel.api.RdfStream; 025 026/** 027 * Implementation of a context-bearing RDF stream 028 * 029 * @author acoburn 030 * @since Dec 6, 2015 031 */ 032public class DefaultRdfStream extends WrappingStream<Triple> implements RdfStream { 033 034 private final Node node; 035 036 /** 037 * Create an RdfStream 038 * @param node the topic of the stream 039 */ 040 public DefaultRdfStream(final Node node) { 041 this(node, empty()); 042 } 043 044 /** 045 * Create an RdfStream 046 * @param node the topic of the stream 047 * @param stream the incoming stream 048 */ 049 public DefaultRdfStream(final Node node, final Stream<Triple> stream) { 050 Objects.requireNonNull(node); 051 this.node = node; 052 this.stream = stream; 053 } 054 055 /** 056 * Create an RdfStream from an existing Model. 057 * @param node The subject node 058 * @param model An input Model 059 * @return a new RdfStream object 060 */ 061 public static RdfStream fromModel(final Node node, final Model model) { 062 return new DefaultRdfStream(node, 063 stream(spliteratorUnknownSize(model.listStatements(), IMMUTABLE), false).map(Statement::asTriple)); 064 } 065 066 /** 067 * Concatenate a Triple stream to the existing stream 068 * @param stream additional triples 069 */ 070 protected void concat(final Stream<Triple> stream) { 071 this.stream = Stream.concat(this.stream, stream); 072 } 073 074 @Override 075 public Node topic() { 076 return node; 077 } 078 079 @Override 080 public RdfStream distinct() { 081 return new DefaultRdfStream(topic(), stream.distinct()); 082 } 083 084 @Override 085 public RdfStream filter(final Predicate<? super Triple> predicate) { 086 return new DefaultRdfStream(topic(), stream.filter(predicate)); 087 } 088 089 @Override 090 public RdfStream limit(final long maxSize) { 091 return new DefaultRdfStream(topic(), stream.limit(maxSize)); 092 } 093 094 @Override 095 public RdfStream peek(final Consumer<? super Triple> action) { 096 return new DefaultRdfStream(topic(), stream.peek(action)); 097 } 098 099 @Override 100 public RdfStream skip(final long n) { 101 return new DefaultRdfStream(topic(), stream.skip(n)); 102 } 103 104 @Override 105 public RdfStream sorted() { 106 return new DefaultRdfStream(topic(), stream.sorted()); 107 } 108 109 @Override 110 public RdfStream sorted(final Comparator<? super Triple> comparator) { 111 return new DefaultRdfStream(topic(), stream.sorted(comparator)); 112 } 113 114 @Override 115 public RdfStream onClose(final Runnable closeHandler) { 116 return new DefaultRdfStream(topic(), stream.onClose(closeHandler)); 117 } 118 119 @Override 120 public RdfStream parallel() { 121 return new DefaultRdfStream(topic(), stream.parallel()); 122 } 123 124 @Override 125 public RdfStream sequential() { 126 return new DefaultRdfStream(topic(), stream.sequential()); 127 } 128 129 @Override 130 public RdfStream unordered() { 131 return new DefaultRdfStream(topic(), stream.unordered()); 132 } 133}