001/**
002 * Copyright 2014 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.fcrepo.kernel.utils.iterators;
018
019import static com.google.common.base.Objects.equal;
020import static com.google.common.collect.ImmutableSet.copyOf;
021import static com.google.common.collect.Iterators.singletonIterator;
022import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
023import static java.util.Objects.hash;
024
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.Map;
029
030import javax.jcr.Session;
031
032import com.google.common.base.Function;
033import com.google.common.base.Predicate;
034import com.google.common.collect.ForwardingIterator;
035import com.google.common.collect.Iterators;
036import com.hp.hpl.jena.graph.Node;
037import com.hp.hpl.jena.graph.Triple;
038import com.hp.hpl.jena.rdf.model.Model;
039import com.hp.hpl.jena.rdf.model.Statement;
040
041/**
042 * A stream of RDF triples along with some useful context.
043 * 
044 * @author ajs6f
045 * @since Oct 9, 2013
046 */
047public class RdfStream extends ForwardingIterator<Triple> {
048
049    private Map<String, String> namespaces = new HashMap<>();
050
051    protected Iterator<Triple> triples;
052
053    protected Session context;
054
055    protected Node topic;
056
057    private static final Triple[] NONE = new Triple[] {};
058
059    /**
060     * Constructor that begins the stream with proffered triples.
061     * 
062     * @param triples
063     */
064    public <Tr extends Triple, T extends Iterator<Tr>> RdfStream(final T triples) {
065        super();
066        this.triples = Iterators.transform(triples, cast());
067    }
068
069    /**
070     * Constructor that begins the stream with proffered triples.
071     * 
072     * @param triples
073     */
074    public <Tr extends Triple, T extends Iterable<Tr>> RdfStream(final T triples) {
075        this(triples.iterator());
076    }
077
078    /**
079     * Constructor that begins the stream with proffered triples.
080     * 
081     * @param triples
082     */
083    public <Tr extends Triple, T extends Collection<Tr>> RdfStream(
084            final T triples) {
085        this(triples.iterator());
086    }
087
088    /**
089     * Constructor that begins the stream with proffered triples.
090     * 
091     * @param triples
092     */
093    @SafeVarargs
094    public <T extends Triple> RdfStream(final T... triples) {
095        this(Iterators.forArray(triples));
096    }
097
098    /**
099     * Constructor that begins the stream with proffered statements.
100     * 
101     * @param statements
102     */
103    @SafeVarargs
104    public <T extends Statement> RdfStream(final T... statements) {
105        this(Iterators.transform(Iterators.forArray(statements),
106                statement2triple));
107    }
108
109    /**
110     * Constructor that begins the stream with proffered triple.
111     * 
112     * @param triple
113     */
114    public <T extends Triple> RdfStream(final T triple) {
115        this(Iterators.forArray(new Triple[] { triple }));
116    }
117
118    /**
119     * Constructor that begins the stream without any triples.
120     */
121    public RdfStream() {
122        this(NONE);
123    }
124
125    /**
126     * Returns the proffered {@link Triple}s with the context of this RdfStream.
127     * 
128     * @param stream
129     * @return proffered Triples with the context of this RDFStream
130     */
131    public <Tr extends Triple, T extends Iterator<Tr>> RdfStream withThisContext(final T stream) {
132        return new RdfStream(stream).namespaces(namespaces()).topic(topic());
133    }
134
135    /**
136     * Returns the proffered {@link Triple}s with the context of this RdfStream.
137     * 
138     * @param stream
139     * @return proffered Triples with the context of this RDFStream
140     */
141    public <Tr extends Triple, T extends Iterable<Tr>> RdfStream withThisContext(final T stream) {
142        return new RdfStream(stream).namespaces(namespaces()).topic(topic());
143    }
144
145    /**
146     * @param newTriples Triples to add.
147     * @return This object for continued use.
148     */
149    public RdfStream concat(final Iterator<? extends Triple> newTriples) {
150        triples = Iterators.concat(triples, newTriples);
151        return this;
152    }
153
154    /**
155     * @param newTriple Triples to add.
156     * @return This object for continued use.
157     */
158    public <T extends Triple> RdfStream concat(final T newTriple) {
159        triples = Iterators.concat(triples, singletonIterator(newTriple));
160        return this;
161    }
162
163    /**
164     * @param newTriples Triples to add.
165     * @return This object for continued use.
166     */
167    public <T extends Triple> RdfStream concat(@SuppressWarnings("unchecked") final T... newTriples) {
168        triples = Iterators.concat(triples, Iterators.forArray(newTriples));
169        return this;
170    }
171
172    /**
173     * @param newTriples Triples to add.
174     * @return This object for continued use.
175     */
176    public RdfStream concat(final Collection<? extends Triple> newTriples) {
177        triples = Iterators.concat(triples, newTriples.iterator());
178        return this;
179    }
180
181    /**
182     * As {@link Iterators#limit(Iterator, int)} while maintaining context.
183     * 
184     * @param limit
185     * @return RDFStream
186     */
187    public RdfStream limit(final Integer limit) {
188        return (limit == -1) ? this : withThisContext(Iterators.limit(this, limit));
189    }
190
191    /**
192     * As {@link Iterators#advance(Iterator, int)} while maintaining context.
193     * 
194     * @param skipNum
195     * @return RDFStream
196     */
197    public RdfStream skip(final Integer skipNum) {
198        Iterators.advance(this, skipNum);
199        return this;
200    }
201
202    /**
203     * As {@link Iterators#filter(Iterator, Predicate)} while maintaining context.
204     * 
205     * @param predicate
206     * @return RdfStream
207     */
208    public RdfStream filter(final Predicate<? super Triple> predicate) {
209        return withThisContext(Iterators.filter(this, predicate));
210    }
211
212    /**
213     * As {@link Iterators#transform(Iterator, Function)}.
214     * 
215     * @param f
216     * @return Iterator
217     */
218    public <ToType> Iterator<ToType> transform(final Function<? super Triple, ToType> f) {
219        return Iterators.transform(this, f);
220    }
221
222    /**
223     * RdfStream
224     * 
225     * @param prefix
226     * @param uri
227     * @return This object for continued use.
228     */
229    public RdfStream namespace(final String prefix, final String uri) {
230        namespaces.put(prefix, uri);
231        return this;
232    }
233
234    /**
235     * @param nses
236     * @return This object for continued use.
237     */
238    public RdfStream namespaces(final Map<String, String> nses) {
239        namespaces.putAll(nses);
240        return this;
241    }
242
243    /**
244     * @return The {@link Session} in context
245     */
246    public Session session() {
247        return this.context;
248    }
249
250    /**
251     * Sets the JCR context of this stream
252     * 
253     * @param session The {@link Session} in context
254     */
255    public RdfStream session(final Session session) {
256        this.context = session;
257        return this;
258    }
259
260    /**
261     * @return The {@link Node} topic in context
262     */
263    public Node topic() {
264        return this.topic;
265    }
266
267    /**
268     * Sets the topic of this stream
269     * 
270     * @param topic The {@link Node} topic in context
271     */
272    public RdfStream topic(final Node topic) {
273        this.topic = topic;
274        return this;
275    }
276
277    /**
278     * WARNING! This method exhausts the RdfStream on which it is called!
279     * 
280     * @return A {@link Model} containing the prefix mappings and triples in this stream of RDF
281     */
282    public Model asModel() {
283        final Model model = createDefaultModel();
284        model.setNsPrefixes(namespaces());
285        for (final Triple t : this.iterable()) {
286            model.add(model.asStatement(t));
287        }
288        return model;
289    }
290
291    /**
292     * @param model A {@link Model} containing the prefix mappings and triples to be put into this stream of RDF
293     * @return RDFStream
294     */
295    public static RdfStream fromModel(final Model model) {
296        final Iterator<Triple> triples = Iterators.transform(model.listStatements(), statement2triple);
297        return new RdfStream(triples).namespaces(model.getNsPrefixMap());
298    }
299
300    private static Function<Statement, Triple> statement2triple = new Function<Statement, Triple>() {
301
302        @Override
303        public Triple apply(final Statement s) {
304            return s.asTriple();
305        }
306
307    };
308
309    @Override
310    protected Iterator<Triple> delegate() {
311        return triples;
312    }
313
314    /**
315     * @return an anonymous Iterable<Triple> for use with for-each etc.
316     */
317    public Iterable<Triple> iterable() {
318        return new Iterable<Triple>() {
319
320            @Override
321            public Iterator<Triple> iterator() {
322                return triples;
323            }
324        };
325    }
326
327    /**
328     * @return Namespaces in scope for this stream.
329     */
330    public Map<String, String> namespaces() {
331        return namespaces;
332    }
333
334    private static <T extends Triple> Function<T, Triple> cast() {
335        return new Function<T, Triple>() {
336
337            @Override
338            public Triple apply(final T prototriple) {
339                return prototriple;
340            }
341
342        };
343    }
344
345    /*
346     * We ignore duplicated triples for equality. (non-Javadoc)
347     * @see java.lang.Object#equals(java.lang.Object)
348     */
349    @Override
350    public boolean equals(final Object o) {
351        if (o == null) {
352            return false;
353        }
354        if (o == this) {
355            return true;
356        }
357        if (!(o instanceof RdfStream)) {
358            return false;
359        }
360        final RdfStream rdfo = (RdfStream) o;
361
362        final boolean triplesEqual =
363                equal(copyOf(rdfo.triples), copyOf(this.triples));
364
365        final boolean namespaceMappingsEqual =
366                equal(rdfo.namespaces(), this.namespaces());
367
368        final boolean topicEqual =
369                equal(rdfo.topic(), this.topic());
370
371        return triplesEqual && namespaceMappingsEqual && topicEqual;
372
373    }
374
375    @Override
376    public int hashCode() {
377        return hash(namespaces(), triples, topic());
378    }
379
380}