001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.api.utils.iterators;
017
018import static com.google.common.base.Objects.equal;
019import static com.google.common.collect.ImmutableSet.copyOf;
020import static com.google.common.collect.Iterators.singletonIterator;
021import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
022import static java.util.Objects.hash;
023
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.Map;
028import java.util.function.Function;
029import java.util.function.Predicate;
030
031import javax.jcr.Session;
032
033import com.google.common.collect.ForwardingIterator;
034import com.google.common.collect.Iterators;
035import com.hp.hpl.jena.graph.Node;
036import com.hp.hpl.jena.graph.Triple;
037import com.hp.hpl.jena.rdf.model.Model;
038import com.hp.hpl.jena.rdf.model.Statement;
039
040/**
041 * A stream of RDF triples along with some useful context.
042 *
043 * @author ajs6f
044 * @since Oct 9, 2013
045 */
046public class RdfStream extends ForwardingIterator<Triple> {
047
048    private final Map<String, String> namespaces = new HashMap<>();
049
050    protected Iterator<Triple> triples;
051
052    protected Session context;
053
054    protected Node topic;
055
056    private static final Triple[] NONE = new Triple[] {};
057
058    /**
059     * Constructor that begins the stream with proffered triples.
060     *
061     * @param triples the triples
062     * @param <Tr> extends {@link Triple}
063     * @param <T> extends {@link Iterable}
064     */
065    public <Tr extends Triple, T extends Iterator<Tr>> RdfStream(final T triples) {
066        super();
067        this.triples = Iterators.transform(triples, cast()::apply);
068    }
069
070    /**
071     * Constructor that begins the stream with proffered triples.
072     *
073     * @param triples the triples
074     * @param <Tr> extends {@link Triple}
075     * @param <T> extends {@link Iterable}
076     */
077    public <Tr extends Triple, T extends Iterable<Tr>> RdfStream(final T triples) {
078        this(triples.iterator());
079    }
080
081    /**
082     * Constructor that begins the stream with proffered triples.
083     *
084     * @param triples the triples
085     * @param <Tr> extends {@link Triple}
086     * @param <T> extends {@link Collection}
087     */
088    public <Tr extends Triple, T extends Collection<Tr>> RdfStream(
089            final T triples) {
090        this(triples.iterator());
091    }
092
093    /**
094     * Constructor that begins the stream with proffered triples.
095     *
096     * @param triples the triples
097     * @param <T> extends {@link Triple}
098     */
099    @SafeVarargs
100    public <T extends Triple> RdfStream(final T... triples) {
101        this(Iterators.forArray(triples));
102    }
103
104    /**
105     * Constructor that begins the stream with proffered statements.
106     *
107     * @param statements the statements
108     * @param <T> extends {@link Statement}
109     */
110    @SafeVarargs
111    public <T extends Statement> RdfStream(final T... statements) {
112        this(Iterators.transform(Iterators.forArray(statements),
113                x -> x.asTriple()));
114    }
115
116    /**
117     * Constructor that begins the stream with proffered triple.
118     *
119     * @param triple the triple
120     * @param <T> extends {@link Triple}
121     */
122    public <T extends Triple> RdfStream(final T triple) {
123        this(Iterators.forArray(new Triple[] { triple }));
124    }
125
126    /**
127     * Constructor that begins the stream without any triples.
128     */
129    public RdfStream() {
130        this(NONE);
131    }
132
133    /**
134     * Returns the proffered {@link Triple}s with the context of this RdfStream.
135     *
136     * @param stream the stream
137     * @param <Tr> extends {@link Triple}
138     * @param <T> extends {@link Iterator}
139     * @return proffered Triples with the context of this RDFStream
140     */
141    public <Tr extends Triple, T extends Iterator<Tr>> RdfStream withThisContext(final T stream) {
142        return new RdfStream(stream).namespaces(namespaces()).topic(topic());
143    }
144
145    /**
146     * Returns the proffered {@link Triple}s with the context of this RdfStream.
147     *
148     * @param stream the stream
149     * @param <Tr> extends {@link Triple}
150     * @param <T> extends {@link Iterator}
151     * @return proffered Triples with the context of this RDFStream
152     */
153    public <Tr extends Triple, T extends Iterable<Tr>> RdfStream withThisContext(final T stream) {
154        return new RdfStream(stream).namespaces(namespaces()).topic(topic());
155    }
156
157    /**
158     * @param newTriples Triples to add.
159     * @return This object for continued use.
160     */
161    public RdfStream concat(final Iterator<? extends Triple> newTriples) {
162        triples = Iterators.concat(triples, newTriples);
163        return this;
164    }
165
166    /**
167     * @param newTriple Triples to add.
168     * @param <T> extends {@link Triple}
169     * @return This object for continued use.
170     */
171    public <T extends Triple> RdfStream concat(final T newTriple) {
172        triples = Iterators.concat(triples, singletonIterator(newTriple));
173        return this;
174    }
175
176    /**
177     * @param newTriples Triples to add.
178     * @param <T> extends {@link Triple}
179     * @return This object for continued use.
180     */
181    @SuppressWarnings("unchecked")
182    public <T extends Triple> RdfStream concat(final T... newTriples) {
183        triples = Iterators.concat(triples, Iterators.forArray(newTriples));
184        return this;
185    }
186
187    /**
188     * @param newTriples Triples to add.
189     * @return This object for continued use.
190     */
191    public RdfStream concat(final Collection<? extends Triple> newTriples) {
192        triples = Iterators.concat(triples, newTriples.iterator());
193        return this;
194    }
195
196    /**
197     * As {@link Iterators#limit(Iterator, int)} while maintaining context.
198     *
199     * @param limit the limit
200     * @return RDFStream
201     */
202    public RdfStream limit(final Integer limit) {
203        return (limit == -1) ? this : withThisContext(Iterators.limit(this, limit));
204    }
205
206    /**
207     * As {@link Iterators#advance(Iterator, int)} while maintaining context.
208     *
209     * @param skipNum the skip number
210     * @return RDFStream
211     */
212    public RdfStream skip(final Integer skipNum) {
213        Iterators.advance(this, skipNum);
214        return this;
215    }
216
217    /**
218     * Filter the RDF triples while maintaining context.
219     *
220     * @param predicate the predicate
221     * @return RdfStream
222     */
223    public RdfStream filter(final Predicate<? super Triple> predicate) {
224        return withThisContext(Iterators.filter(this, predicate::test));
225    }
226
227    /**
228     * Apply a Function to an Iterator.
229     *
230     * @param f the parameter f
231     * @param <ToType> extends {@link Iterator}
232     * @return Iterator
233     */
234    public <ToType> Iterator<ToType> transform(final Function<? super Triple, ToType> f) {
235        return Iterators.transform(this, f::apply);
236    }
237
238    /**
239     * RdfStream
240     *
241     * @param prefix the prefix
242     * @param uri the uri
243     * @return This object for continued use.
244     */
245    public RdfStream namespace(final String prefix, final String uri) {
246        namespaces.put(prefix, uri);
247        return this;
248    }
249
250    /**
251     * @param nses the property of nses
252     * @return This object for continued use.
253     */
254    public RdfStream namespaces(final Map<String, String> nses) {
255        namespaces.putAll(nses);
256        return this;
257    }
258
259    /**
260     * @return The {@link Session} in context
261     */
262    public Session session() {
263        return this.context;
264    }
265
266    /**
267     * Sets the JCR context of this stream
268     *
269     * @param session The {@link Session} in context
270     * @return the JCR context of this stream
271     */
272    public RdfStream session(final Session session) {
273        this.context = session;
274        return this;
275    }
276
277    /**
278     * @return The {@link Node} topic in context
279     */
280    public Node topic() {
281        return this.topic;
282    }
283
284    /**
285     * Sets the topic of this stream
286     *
287     * @param topic The {@link Node} topic in context
288     * @return the stream
289     */
290    public RdfStream topic(final Node topic) {
291        this.topic = topic;
292        return this;
293    }
294
295    /**
296     * WARNING! This method exhausts the RdfStream on which it is called!
297     *
298     * @return A {@link Model} containing the prefix mappings and triples in this stream of RDF
299     */
300    public Model asModel() {
301        final Model model = createDefaultModel();
302        model.setNsPrefixes(namespaces());
303        for (final Triple t : this.iterable()) {
304            model.add(model.asStatement(t));
305        }
306        return model;
307    }
308
309    /**
310     * @param model A {@link Model} containing the prefix mappings and triples to be put into this stream of RDF
311     * @return RDFStream
312     */
313    public static RdfStream fromModel(final Model model) {
314        final Iterator<Triple> triples = Iterators.transform(model.listStatements(), x -> x.asTriple());
315        return new RdfStream(triples).namespaces(model.getNsPrefixMap());
316    }
317
318    @Override
319    protected Iterator<Triple> delegate() {
320        return triples;
321    }
322
323    /**
324     * @return an anonymous {@literal Iterable<Triple>} for use with for-each etc.
325     */
326    public Iterable<Triple> iterable() {
327        return new Iterable<Triple>() {
328
329            @Override
330            public Iterator<Triple> iterator() {
331                return triples;
332            }
333        };
334    }
335
336    /**
337     * @return Namespaces in scope for this stream.
338     */
339    public Map<String, String> namespaces() {
340        return namespaces;
341    }
342
343    private static <T extends Triple> Function<T, Triple> cast() {
344        return new Function<T, Triple>() {
345
346            @Override
347            public Triple apply(final T prototriple) {
348                return prototriple;
349            }
350
351        };
352    }
353
354    protected static <From, To> Iterator<To> flatMap(final Iterator<From> i, final Function<From, Iterator<To>> f) {
355        return Iterators.concat(Iterators.transform(i, f::apply));
356    }
357
358    /*
359     * We ignore duplicated triples for equality. (non-Javadoc)
360     * @see java.lang.Object#equals(java.lang.Object)
361     */
362    @Override
363    public boolean equals(final Object o) {
364        if (o == null) {
365            return false;
366        }
367        if (o == this) {
368            return true;
369        }
370        if (!(o instanceof RdfStream)) {
371            return false;
372        }
373        final RdfStream rdfo = (RdfStream) o;
374
375        final boolean triplesEqual =
376                equal(copyOf(rdfo.triples), copyOf(this.triples));
377
378        final boolean namespaceMappingsEqual =
379                equal(rdfo.namespaces(), this.namespaces());
380
381        final boolean topicEqual =
382                equal(rdfo.topic(), this.topic());
383
384        return triplesEqual && namespaceMappingsEqual && topicEqual;
385
386    }
387
388    @Override
389    public int hashCode() {
390        return hash(namespaces(), triples, topic());
391    }
392
393}