001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.utils.iterators;
017
018import static com.google.common.base.Objects.equal;
019import static com.google.common.collect.ImmutableSet.copyOf;
020import static com.google.common.collect.Iterators.singletonIterator;
021import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
022import static java.util.Objects.hash;
023
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.Map;
028
029import javax.jcr.Session;
030
031import com.google.common.base.Function;
032import com.google.common.base.Predicate;
033import com.google.common.collect.ForwardingIterator;
034import com.google.common.collect.Iterators;
035import com.hp.hpl.jena.graph.Node;
036import com.hp.hpl.jena.graph.Triple;
037import com.hp.hpl.jena.rdf.model.Model;
038import com.hp.hpl.jena.rdf.model.Statement;
039
040/**
041 * A stream of RDF triples along with some useful context.
042 * 
043 * @author ajs6f
044 * @since Oct 9, 2013
045 */
046public class RdfStream extends ForwardingIterator<Triple> {
047
048    private Map<String, String> namespaces = new HashMap<>();
049
050    protected Iterator<Triple> triples;
051
052    protected Session context;
053
054    protected Node topic;
055
056    private static final Triple[] NONE = new Triple[] {};
057
058    /**
059     * Constructor that begins the stream with proffered triples.
060     * 
061     * @param triples
062     */
063    public <Tr extends Triple, T extends Iterator<Tr>> RdfStream(final T triples) {
064        super();
065        this.triples = Iterators.transform(triples, cast());
066    }
067
068    /**
069     * Constructor that begins the stream with proffered triples.
070     * 
071     * @param triples
072     */
073    public <Tr extends Triple, T extends Iterable<Tr>> RdfStream(final T triples) {
074        this(triples.iterator());
075    }
076
077    /**
078     * Constructor that begins the stream with proffered triples.
079     * 
080     * @param triples
081     */
082    public <Tr extends Triple, T extends Collection<Tr>> RdfStream(
083            final T triples) {
084        this(triples.iterator());
085    }
086
087    /**
088     * Constructor that begins the stream with proffered triples.
089     * 
090     * @param triples
091     */
092    @SafeVarargs
093    public <T extends Triple> RdfStream(final T... triples) {
094        this(Iterators.forArray(triples));
095    }
096
097    /**
098     * Constructor that begins the stream with proffered statements.
099     * 
100     * @param statements
101     */
102    @SafeVarargs
103    public <T extends Statement> RdfStream(final T... statements) {
104        this(Iterators.transform(Iterators.forArray(statements),
105                statement2triple));
106    }
107
108    /**
109     * Constructor that begins the stream with proffered triple.
110     * 
111     * @param triple
112     */
113    public <T extends Triple> RdfStream(final T triple) {
114        this(Iterators.forArray(new Triple[] { triple }));
115    }
116
117    /**
118     * Constructor that begins the stream without any triples.
119     */
120    public RdfStream() {
121        this(NONE);
122    }
123
124    /**
125     * Returns the proffered {@link Triple}s with the context of this RdfStream.
126     * 
127     * @param stream
128     * @return proffered Triples with the context of this RDFStream
129     */
130    public <Tr extends Triple, T extends Iterator<Tr>> RdfStream withThisContext(final T stream) {
131        return new RdfStream(stream).namespaces(namespaces()).topic(topic());
132    }
133
134    /**
135     * Returns the proffered {@link Triple}s with the context of this RdfStream.
136     * 
137     * @param stream
138     * @return proffered Triples with the context of this RDFStream
139     */
140    public <Tr extends Triple, T extends Iterable<Tr>> RdfStream withThisContext(final T stream) {
141        return new RdfStream(stream).namespaces(namespaces()).topic(topic());
142    }
143
144    /**
145     * @param newTriples Triples to add.
146     * @return This object for continued use.
147     */
148    public RdfStream concat(final Iterator<? extends Triple> newTriples) {
149        triples = Iterators.concat(triples, newTriples);
150        return this;
151    }
152
153    /**
154     * @param newTriple Triples to add.
155     * @return This object for continued use.
156     */
157    public <T extends Triple> RdfStream concat(final T newTriple) {
158        triples = Iterators.concat(triples, singletonIterator(newTriple));
159        return this;
160    }
161
162    /**
163     * @param newTriples Triples to add.
164     * @return This object for continued use.
165     */
166    public <T extends Triple> RdfStream concat(@SuppressWarnings("unchecked") final T... newTriples) {
167        triples = Iterators.concat(triples, Iterators.forArray(newTriples));
168        return this;
169    }
170
171    /**
172     * @param newTriples Triples to add.
173     * @return This object for continued use.
174     */
175    public RdfStream concat(final Collection<? extends Triple> newTriples) {
176        triples = Iterators.concat(triples, newTriples.iterator());
177        return this;
178    }
179
180    /**
181     * As {@link Iterators#limit(Iterator, int)} while maintaining context.
182     * 
183     * @param limit
184     * @return RDFStream
185     */
186    public RdfStream limit(final Integer limit) {
187        return (limit == -1) ? this : withThisContext(Iterators.limit(this, limit));
188    }
189
190    /**
191     * As {@link Iterators#advance(Iterator, int)} while maintaining context.
192     * 
193     * @param skipNum
194     * @return RDFStream
195     */
196    public RdfStream skip(final Integer skipNum) {
197        Iterators.advance(this, skipNum);
198        return this;
199    }
200
201    /**
202     * As {@link Iterators#filter(Iterator, Predicate)} while maintaining context.
203     * 
204     * @param predicate
205     * @return RdfStream
206     */
207    public RdfStream filter(final Predicate<? super Triple> predicate) {
208        return withThisContext(Iterators.filter(this, predicate));
209    }
210
211    /**
212     * As {@link Iterators#transform(Iterator, Function)}.
213     * 
214     * @param f
215     * @return Iterator
216     */
217    public <ToType> Iterator<ToType> transform(final Function<? super Triple, ToType> f) {
218        return Iterators.transform(this, f);
219    }
220
221    /**
222     * RdfStream
223     * 
224     * @param prefix
225     * @param uri
226     * @return This object for continued use.
227     */
228    public RdfStream namespace(final String prefix, final String uri) {
229        namespaces.put(prefix, uri);
230        return this;
231    }
232
233    /**
234     * @param nses
235     * @return This object for continued use.
236     */
237    public RdfStream namespaces(final Map<String, String> nses) {
238        namespaces.putAll(nses);
239        return this;
240    }
241
242    /**
243     * @return The {@link Session} in context
244     */
245    public Session session() {
246        return this.context;
247    }
248
249    /**
250     * Sets the JCR context of this stream
251     * 
252     * @param session The {@link Session} in context
253     */
254    public RdfStream session(final Session session) {
255        this.context = session;
256        return this;
257    }
258
259    /**
260     * @return The {@link Node} topic in context
261     */
262    public Node topic() {
263        return this.topic;
264    }
265
266    /**
267     * Sets the topic of this stream
268     * 
269     * @param topic The {@link Node} topic in context
270     */
271    public RdfStream topic(final Node topic) {
272        this.topic = topic;
273        return this;
274    }
275
276    /**
277     * WARNING! This method exhausts the RdfStream on which it is called!
278     * 
279     * @return A {@link Model} containing the prefix mappings and triples in this stream of RDF
280     */
281    public Model asModel() {
282        final Model model = createDefaultModel();
283        model.setNsPrefixes(namespaces());
284        for (final Triple t : this.iterable()) {
285            model.add(model.asStatement(t));
286        }
287        return model;
288    }
289
290    /**
291     * @param model A {@link Model} containing the prefix mappings and triples to be put into this stream of RDF
292     * @return RDFStream
293     */
294    public static RdfStream fromModel(final Model model) {
295        final Iterator<Triple> triples = Iterators.transform(model.listStatements(), statement2triple);
296        return new RdfStream(triples).namespaces(model.getNsPrefixMap());
297    }
298
299    private static Function<Statement, Triple> statement2triple = new Function<Statement, Triple>() {
300
301        @Override
302        public Triple apply(final Statement s) {
303            return s.asTriple();
304        }
305
306    };
307
308    @Override
309    protected Iterator<Triple> delegate() {
310        return triples;
311    }
312
313    /**
314     * @return an anonymous Iterable<Triple> for use with for-each etc.
315     */
316    public Iterable<Triple> iterable() {
317        return new Iterable<Triple>() {
318
319            @Override
320            public Iterator<Triple> iterator() {
321                return triples;
322            }
323        };
324    }
325
326    /**
327     * @return Namespaces in scope for this stream.
328     */
329    public Map<String, String> namespaces() {
330        return namespaces;
331    }
332
333    private static <T extends Triple> Function<T, Triple> cast() {
334        return new Function<T, Triple>() {
335
336            @Override
337            public Triple apply(final T prototriple) {
338                return prototriple;
339            }
340
341        };
342    }
343
344    /*
345     * We ignore duplicated triples for equality. (non-Javadoc)
346     * @see java.lang.Object#equals(java.lang.Object)
347     */
348    @Override
349    public boolean equals(final Object o) {
350        if (o == null) {
351            return false;
352        }
353        if (o == this) {
354            return true;
355        }
356        if (!(o instanceof RdfStream)) {
357            return false;
358        }
359        final RdfStream rdfo = (RdfStream) o;
360
361        final boolean triplesEqual =
362                equal(copyOf(rdfo.triples), copyOf(this.triples));
363
364        final boolean namespaceMappingsEqual =
365                equal(rdfo.namespaces(), this.namespaces());
366
367        final boolean topicEqual =
368                equal(rdfo.topic(), this.topic());
369
370        return triplesEqual && namespaceMappingsEqual && topicEqual;
371
372    }
373
374    @Override
375    public int hashCode() {
376        return hash(namespaces(), triples, topic());
377    }
378
379}