001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.kernel.api.rdf; 019 020import static java.util.Spliterator.IMMUTABLE; 021import static java.util.Spliterators.spliteratorUnknownSize; 022import static java.util.stream.Stream.empty; 023import static java.util.stream.StreamSupport.stream; 024 025import java.util.Comparator; 026import java.util.Objects; 027import java.util.function.Consumer; 028import java.util.function.Predicate; 029import java.util.stream.Stream; 030 031import com.hp.hpl.jena.graph.Node; 032import com.hp.hpl.jena.graph.Triple; 033import com.hp.hpl.jena.rdf.model.Model; 034import com.hp.hpl.jena.rdf.model.Statement; 035import org.fcrepo.kernel.api.utils.WrappingStream; 036import org.fcrepo.kernel.api.RdfStream; 037 038/** 039 * Implementation of a context-bearing RDF stream 040 * 041 * @author acoburn 042 * @since Dec 6, 2015 043 */ 044public class DefaultRdfStream extends WrappingStream<Triple> implements RdfStream { 045 046 protected final Node node; 047 048 /** 049 * Create an RdfStream 050 * @param node the topic of the stream 051 */ 052 public DefaultRdfStream(final Node node) { 053 this(node, empty()); 054 } 055 056 /** 057 * Create an RdfStream 058 * @param node the topic of the stream 059 * @param stream the incoming stream 060 */ 061 public DefaultRdfStream(final Node node, final Stream<Triple> stream) { 062 Objects.requireNonNull(node); 063 this.node = node; 064 this.stream = stream; 065 } 066 067 /** 068 * Create an RdfStream from an existing Model. 069 * @param node The subject node 070 * @param model An input Model 071 * @return a new RdfStream object 072 */ 073 public static RdfStream fromModel(final Node node, final Model model) { 074 return new DefaultRdfStream(node, 075 stream(spliteratorUnknownSize(model.listStatements(), IMMUTABLE), false).map(Statement::asTriple)); 076 } 077 078 /** 079 * Concatenate a Triple stream to the existing stream 080 * @param stream additional triples 081 */ 082 protected void concat(final Stream<Triple> stream) { 083 this.stream = Stream.concat(this.stream, stream); 084 } 085 086 @Override 087 public Node topic() { 088 return node; 089 } 090 091 @Override 092 public RdfStream distinct() { 093 return new DefaultRdfStream(topic(), stream.distinct()); 094 } 095 096 @Override 097 public RdfStream filter(final Predicate<? super Triple> predicate) { 098 return new DefaultRdfStream(topic(), stream.filter(predicate)); 099 } 100 101 @Override 102 public RdfStream limit(final long maxSize) { 103 return new DefaultRdfStream(topic(), stream.limit(maxSize)); 104 } 105 106 @Override 107 public RdfStream peek(final Consumer<? super Triple> action) { 108 return new DefaultRdfStream(topic(), stream.peek(action)); 109 } 110 111 @Override 112 public RdfStream skip(final long n) { 113 return new DefaultRdfStream(topic(), stream.skip(n)); 114 } 115 116 @Override 117 public RdfStream sorted() { 118 return new DefaultRdfStream(topic(), stream.sorted()); 119 } 120 121 @Override 122 public RdfStream sorted(final Comparator<? super Triple> comparator) { 123 return new DefaultRdfStream(topic(), stream.sorted(comparator)); 124 } 125 126 @Override 127 public RdfStream onClose(final Runnable closeHandler) { 128 return new DefaultRdfStream(topic(), stream.onClose(closeHandler)); 129 } 130 131 @Override 132 public RdfStream parallel() { 133 return new DefaultRdfStream(topic(), stream.parallel()); 134 } 135 136 @Override 137 public RdfStream sequential() { 138 return new DefaultRdfStream(topic(), stream.sequential()); 139 } 140 141 @Override 142 public RdfStream unordered() { 143 return new DefaultRdfStream(topic(), stream.unordered()); 144 } 145}