001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.utils.iterators;
017
018import static com.google.common.base.Objects.equal;
019import static com.google.common.collect.ImmutableSet.copyOf;
020import static com.google.common.collect.Iterators.singletonIterator;
021import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
022import static java.util.Objects.hash;
023
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.Map;
028
029import javax.jcr.Session;
030
031import com.google.common.base.Function;
032import com.google.common.base.Predicate;
033import com.google.common.collect.ForwardingIterator;
034import com.google.common.collect.Iterators;
035import com.hp.hpl.jena.graph.Node;
036import com.hp.hpl.jena.graph.Triple;
037import com.hp.hpl.jena.rdf.model.Model;
038import com.hp.hpl.jena.rdf.model.Statement;
039
040/**
041 * A stream of RDF triples along with some useful context.
042 * 
043 * @author ajs6f
044 * @since Oct 9, 2013
045 */
046public class RdfStream extends ForwardingIterator<Triple> {
047
048    private Map<String, String> namespaces = new HashMap<>();
049
050    protected Iterator<Triple> triples;
051
052    protected Session context;
053
054    protected Node topic;
055
056    private static final Triple[] NONE = new Triple[] {};
057
058    /**
059     * Constructor that begins the stream with proffered triples.
060     * 
061     * @param triples the triples
062     * @param <Tr> extends {@link Triple}
063     * @param <T> extends {@link Iterable}
064     */
065    public <Tr extends Triple, T extends Iterator<Tr>> RdfStream(final T triples) {
066        super();
067        this.triples = Iterators.transform(triples, cast());
068    }
069
070    /**
071     * Constructor that begins the stream with proffered triples.
072     * 
073     * @param triples the triples
074     * @param <Tr> extends {@link Triple}
075     * @param <T> extends {@link Iterable}
076     */
077    public <Tr extends Triple, T extends Iterable<Tr>> RdfStream(final T triples) {
078        this(triples.iterator());
079    }
080
081    /**
082     * Constructor that begins the stream with proffered triples.
083     * 
084     * @param triples the triples
085     * @param <Tr> extends {@link Triple}
086     * @param <T> extends {@link Collection}
087     */
088    public <Tr extends Triple, T extends Collection<Tr>> RdfStream(
089            final T triples) {
090        this(triples.iterator());
091    }
092
093    /**
094     * Constructor that begins the stream with proffered triples.
095     * 
096     * @param triples the triples
097     * @param <T> extends {@link Triple}
098     */
099    @SafeVarargs
100    public <T extends Triple> RdfStream(final T... triples) {
101        this(Iterators.forArray(triples));
102    }
103
104    /**
105     * Constructor that begins the stream with proffered statements.
106     * 
107     * @param statements the statements
108     * @param <T> extends {@link Statement}
109     */
110    @SafeVarargs
111    public <T extends Statement> RdfStream(final T... statements) {
112        this(Iterators.transform(Iterators.forArray(statements),
113                statement2triple));
114    }
115
116    /**
117     * Constructor that begins the stream with proffered triple.
118     * 
119     * @param triple the triple
120     * @param <T> extends {@link Triple}
121     */
122    public <T extends Triple> RdfStream(final T triple) {
123        this(Iterators.forArray(new Triple[] { triple }));
124    }
125
126    /**
127     * Constructor that begins the stream without any triples.
128     */
129    public RdfStream() {
130        this(NONE);
131    }
132
133    /**
134     * Returns the proffered {@link Triple}s with the context of this RdfStream.
135     * 
136     * @param stream the stream
137     * @param <Tr> extends {@link Triple}
138     * @param <T> extends {@link Iterator}
139     * @return proffered Triples with the context of this RDFStream
140     */
141    public <Tr extends Triple, T extends Iterator<Tr>> RdfStream withThisContext(final T stream) {
142        return new RdfStream(stream).namespaces(namespaces()).topic(topic());
143    }
144
145    /**
146     * Returns the proffered {@link Triple}s with the context of this RdfStream.
147     * 
148     * @param stream the stream
149     * @param <Tr> extends {@link Triple}
150     * @param <T> extends {@link Iterator}
151     * @return proffered Triples with the context of this RDFStream
152     */
153    public <Tr extends Triple, T extends Iterable<Tr>> RdfStream withThisContext(final T stream) {
154        return new RdfStream(stream).namespaces(namespaces()).topic(topic());
155    }
156
157    /**
158     * @param newTriples Triples to add.
159     * @return This object for continued use.
160     */
161    public RdfStream concat(final Iterator<? extends Triple> newTriples) {
162        triples = Iterators.concat(triples, newTriples);
163        return this;
164    }
165
166    /**
167     * @param newTriple Triples to add.
168     * @param <T> extends {@link Triple}
169     * @return This object for continued use.
170     */
171    public <T extends Triple> RdfStream concat(final T newTriple) {
172        triples = Iterators.concat(triples, singletonIterator(newTriple));
173        return this;
174    }
175
176    /**
177     * @param newTriples Triples to add.
178     * @param <T> extends {@link Triple}
179     * @return This object for continued use.
180     */
181    public <T extends Triple> RdfStream concat(@SuppressWarnings("unchecked") final T... newTriples) {
182        triples = Iterators.concat(triples, Iterators.forArray(newTriples));
183        return this;
184    }
185
186    /**
187     * @param newTriples Triples to add.
188     * @return This object for continued use.
189     */
190    public RdfStream concat(final Collection<? extends Triple> newTriples) {
191        triples = Iterators.concat(triples, newTriples.iterator());
192        return this;
193    }
194
195    /**
196     * As {@link Iterators#limit(Iterator, int)} while maintaining context.
197     * 
198     * @param limit the limit
199     * @return RDFStream
200     */
201    public RdfStream limit(final Integer limit) {
202        return (limit == -1) ? this : withThisContext(Iterators.limit(this, limit));
203    }
204
205    /**
206     * As {@link Iterators#advance(Iterator, int)} while maintaining context.
207     * 
208     * @param skipNum the skip number
209     * @return RDFStream
210     */
211    public RdfStream skip(final Integer skipNum) {
212        Iterators.advance(this, skipNum);
213        return this;
214    }
215
216    /**
217     * As {@link Iterators#filter(Iterator, Predicate)} while maintaining context.
218     * 
219     * @param predicate the predicate
220     * @return RdfStream
221     */
222    public RdfStream filter(final Predicate<? super Triple> predicate) {
223        return withThisContext(Iterators.filter(this, predicate));
224    }
225
226    /**
227     * As {@link Iterators#transform(Iterator, Function)}.
228     * 
229     * @param f the parameter f
230     * @param <ToType> extends {@link Iterator}
231     * @return Iterator
232     */
233    public <ToType> Iterator<ToType> transform(final Function<? super Triple, ToType> f) {
234        return Iterators.transform(this, f);
235    }
236
237    /**
238     * RdfStream
239     * 
240     * @param prefix the prefix
241     * @param uri the uri
242     * @return This object for continued use.
243     */
244    public RdfStream namespace(final String prefix, final String uri) {
245        namespaces.put(prefix, uri);
246        return this;
247    }
248
249    /**
250     * @param nses the property of nses
251     * @return This object for continued use.
252     */
253    public RdfStream namespaces(final Map<String, String> nses) {
254        namespaces.putAll(nses);
255        return this;
256    }
257
258    /**
259     * @return The {@link Session} in context
260     */
261    public Session session() {
262        return this.context;
263    }
264
265    /**
266     * Sets the JCR context of this stream
267     * 
268     * @param session The {@link Session} in context
269     * @return the JCR context of this stream
270     */
271    public RdfStream session(final Session session) {
272        this.context = session;
273        return this;
274    }
275
276    /**
277     * @return The {@link Node} topic in context
278     */
279    public Node topic() {
280        return this.topic;
281    }
282
283    /**
284     * Sets the topic of this stream
285     * 
286     * @param topic The {@link Node} topic in context
287     * @return the stream
288     */
289    public RdfStream topic(final Node topic) {
290        this.topic = topic;
291        return this;
292    }
293
294    /**
295     * WARNING! This method exhausts the RdfStream on which it is called!
296     * 
297     * @return A {@link Model} containing the prefix mappings and triples in this stream of RDF
298     */
299    public Model asModel() {
300        final Model model = createDefaultModel();
301        model.setNsPrefixes(namespaces());
302        for (final Triple t : this.iterable()) {
303            model.add(model.asStatement(t));
304        }
305        return model;
306    }
307
308    /**
309     * @param model A {@link Model} containing the prefix mappings and triples to be put into this stream of RDF
310     * @return RDFStream
311     */
312    public static RdfStream fromModel(final Model model) {
313        final Iterator<Triple> triples = Iterators.transform(model.listStatements(), statement2triple);
314        return new RdfStream(triples).namespaces(model.getNsPrefixMap());
315    }
316
317    private static Function<Statement, Triple> statement2triple = new Function<Statement, Triple>() {
318
319        @Override
320        public Triple apply(final Statement s) {
321            return s.asTriple();
322        }
323
324    };
325
326    @Override
327    protected Iterator<Triple> delegate() {
328        return triples;
329    }
330
331    /**
332     * @return an anonymous {@literal Iterable<Triple>} for use with for-each etc.
333     */
334    public Iterable<Triple> iterable() {
335        return new Iterable<Triple>() {
336
337            @Override
338            public Iterator<Triple> iterator() {
339                return triples;
340            }
341        };
342    }
343
344    /**
345     * @return Namespaces in scope for this stream.
346     */
347    public Map<String, String> namespaces() {
348        return namespaces;
349    }
350
351    private static <T extends Triple> Function<T, Triple> cast() {
352        return new Function<T, Triple>() {
353
354            @Override
355            public Triple apply(final T prototriple) {
356                return prototriple;
357            }
358
359        };
360    }
361
362    /*
363     * We ignore duplicated triples for equality. (non-Javadoc)
364     * @see java.lang.Object#equals(java.lang.Object)
365     */
366    @Override
367    public boolean equals(final Object o) {
368        if (o == null) {
369            return false;
370        }
371        if (o == this) {
372            return true;
373        }
374        if (!(o instanceof RdfStream)) {
375            return false;
376        }
377        final RdfStream rdfo = (RdfStream) o;
378
379        final boolean triplesEqual =
380                equal(copyOf(rdfo.triples), copyOf(this.triples));
381
382        final boolean namespaceMappingsEqual =
383                equal(rdfo.namespaces(), this.namespaces());
384
385        final boolean topicEqual =
386                equal(rdfo.topic(), this.topic());
387
388        return triplesEqual && namespaceMappingsEqual && topicEqual;
389
390    }
391
392    @Override
393    public int hashCode() {
394        return hash(namespaces(), triples, topic());
395    }
396
397}