001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.kernel.api.utils.iterators; 017 018import static com.google.common.base.Objects.equal; 019import static com.google.common.collect.ImmutableSet.copyOf; 020import static com.google.common.collect.Iterators.singletonIterator; 021import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel; 022import static java.util.Objects.hash; 023 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.Map; 028import java.util.function.Function; 029import java.util.function.Predicate; 030 031import javax.jcr.Session; 032 033import com.google.common.collect.ForwardingIterator; 034import com.google.common.collect.Iterators; 035import com.hp.hpl.jena.graph.Node; 036import com.hp.hpl.jena.graph.Triple; 037import com.hp.hpl.jena.rdf.model.Model; 038import com.hp.hpl.jena.rdf.model.Statement; 039 040/** 041 * A stream of RDF triples along with some useful context. 042 * 043 * @author ajs6f 044 * @since Oct 9, 2013 045 */ 046public class RdfStream extends ForwardingIterator<Triple> { 047 048 private final Map<String, String> namespaces = new HashMap<>(); 049 050 protected Iterator<Triple> triples; 051 052 protected Session context; 053 054 protected Node topic; 055 056 private static final Triple[] NONE = new Triple[] {}; 057 058 /** 059 * Constructor that begins the stream with proffered triples. 060 * 061 * @param triples the triples 062 * @param <Tr> extends {@link Triple} 063 * @param <T> extends {@link Iterable} 064 */ 065 public <Tr extends Triple, T extends Iterator<Tr>> RdfStream(final T triples) { 066 super(); 067 this.triples = Iterators.transform(triples, cast()::apply); 068 } 069 070 /** 071 * Constructor that begins the stream with proffered triples. 072 * 073 * @param triples the triples 074 * @param <Tr> extends {@link Triple} 075 * @param <T> extends {@link Iterable} 076 */ 077 public <Tr extends Triple, T extends Iterable<Tr>> RdfStream(final T triples) { 078 this(triples.iterator()); 079 } 080 081 /** 082 * Constructor that begins the stream with proffered triples. 083 * 084 * @param triples the triples 085 * @param <Tr> extends {@link Triple} 086 * @param <T> extends {@link Collection} 087 */ 088 public <Tr extends Triple, T extends Collection<Tr>> RdfStream( 089 final T triples) { 090 this(triples.iterator()); 091 } 092 093 /** 094 * Constructor that begins the stream with proffered triples. 095 * 096 * @param triples the triples 097 * @param <T> extends {@link Triple} 098 */ 099 @SafeVarargs 100 public <T extends Triple> RdfStream(final T... triples) { 101 this(Iterators.forArray(triples)); 102 } 103 104 /** 105 * Constructor that begins the stream with proffered statements. 106 * 107 * @param statements the statements 108 * @param <T> extends {@link Statement} 109 */ 110 @SafeVarargs 111 public <T extends Statement> RdfStream(final T... statements) { 112 this(Iterators.transform(Iterators.forArray(statements), 113 x -> x.asTriple())); 114 } 115 116 /** 117 * Constructor that begins the stream with proffered triple. 118 * 119 * @param triple the triple 120 * @param <T> extends {@link Triple} 121 */ 122 public <T extends Triple> RdfStream(final T triple) { 123 this(Iterators.forArray(new Triple[] { triple })); 124 } 125 126 /** 127 * Constructor that begins the stream without any triples. 128 */ 129 public RdfStream() { 130 this(NONE); 131 } 132 133 /** 134 * Returns the proffered {@link Triple}s with the context of this RdfStream. 135 * 136 * @param stream the stream 137 * @param <Tr> extends {@link Triple} 138 * @param <T> extends {@link Iterator} 139 * @return proffered Triples with the context of this RDFStream 140 */ 141 public <Tr extends Triple, T extends Iterator<Tr>> RdfStream withThisContext(final T stream) { 142 return new RdfStream(stream).namespaces(namespaces()).topic(topic()); 143 } 144 145 /** 146 * Returns the proffered {@link Triple}s with the context of this RdfStream. 147 * 148 * @param stream the stream 149 * @param <Tr> extends {@link Triple} 150 * @param <T> extends {@link Iterator} 151 * @return proffered Triples with the context of this RDFStream 152 */ 153 public <Tr extends Triple, T extends Iterable<Tr>> RdfStream withThisContext(final T stream) { 154 return new RdfStream(stream).namespaces(namespaces()).topic(topic()); 155 } 156 157 /** 158 * @param newTriples Triples to add. 159 * @return This object for continued use. 160 */ 161 public RdfStream concat(final Iterator<? extends Triple> newTriples) { 162 triples = Iterators.concat(triples, newTriples); 163 return this; 164 } 165 166 /** 167 * @param newTriple Triples to add. 168 * @param <T> extends {@link Triple} 169 * @return This object for continued use. 170 */ 171 public <T extends Triple> RdfStream concat(final T newTriple) { 172 triples = Iterators.concat(triples, singletonIterator(newTriple)); 173 return this; 174 } 175 176 /** 177 * @param newTriples Triples to add. 178 * @param <T> extends {@link Triple} 179 * @return This object for continued use. 180 */ 181 @SuppressWarnings("unchecked") 182 public <T extends Triple> RdfStream concat(final T... newTriples) { 183 triples = Iterators.concat(triples, Iterators.forArray(newTriples)); 184 return this; 185 } 186 187 /** 188 * @param newTriples Triples to add. 189 * @return This object for continued use. 190 */ 191 public RdfStream concat(final Collection<? extends Triple> newTriples) { 192 triples = Iterators.concat(triples, newTriples.iterator()); 193 return this; 194 } 195 196 /** 197 * As {@link Iterators#limit(Iterator, int)} while maintaining context. 198 * 199 * @param limit the limit 200 * @return RDFStream 201 */ 202 public RdfStream limit(final Integer limit) { 203 return (limit == -1) ? this : withThisContext(Iterators.limit(this, limit)); 204 } 205 206 /** 207 * As {@link Iterators#advance(Iterator, int)} while maintaining context. 208 * 209 * @param skipNum the skip number 210 * @return RDFStream 211 */ 212 public RdfStream skip(final Integer skipNum) { 213 Iterators.advance(this, skipNum); 214 return this; 215 } 216 217 /** 218 * Filter the RDF triples while maintaining context. 219 * 220 * @param predicate the predicate 221 * @return RdfStream 222 */ 223 public RdfStream filter(final Predicate<? super Triple> predicate) { 224 return withThisContext(Iterators.filter(this, predicate::test)); 225 } 226 227 /** 228 * Apply a Function to an Iterator. 229 * 230 * @param f the parameter f 231 * @param <ToType> extends {@link Iterator} 232 * @return Iterator 233 */ 234 public <ToType> Iterator<ToType> transform(final Function<? super Triple, ToType> f) { 235 return Iterators.transform(this, f::apply); 236 } 237 238 /** 239 * RdfStream 240 * 241 * @param prefix the prefix 242 * @param uri the uri 243 * @return This object for continued use. 244 */ 245 public RdfStream namespace(final String prefix, final String uri) { 246 namespaces.put(prefix, uri); 247 return this; 248 } 249 250 /** 251 * @param nses the property of nses 252 * @return This object for continued use. 253 */ 254 public RdfStream namespaces(final Map<String, String> nses) { 255 namespaces.putAll(nses); 256 return this; 257 } 258 259 /** 260 * @return The {@link Session} in context 261 */ 262 public Session session() { 263 return this.context; 264 } 265 266 /** 267 * Sets the JCR context of this stream 268 * 269 * @param session The {@link Session} in context 270 * @return the JCR context of this stream 271 */ 272 public RdfStream session(final Session session) { 273 this.context = session; 274 return this; 275 } 276 277 /** 278 * @return The {@link Node} topic in context 279 */ 280 public Node topic() { 281 return this.topic; 282 } 283 284 /** 285 * Sets the topic of this stream 286 * 287 * @param topic The {@link Node} topic in context 288 * @return the stream 289 */ 290 public RdfStream topic(final Node topic) { 291 this.topic = topic; 292 return this; 293 } 294 295 /** 296 * WARNING! This method exhausts the RdfStream on which it is called! 297 * 298 * @return A {@link Model} containing the prefix mappings and triples in this stream of RDF 299 */ 300 public Model asModel() { 301 final Model model = createDefaultModel(); 302 model.setNsPrefixes(namespaces()); 303 for (final Triple t : this.iterable()) { 304 model.add(model.asStatement(t)); 305 } 306 return model; 307 } 308 309 /** 310 * @param model A {@link Model} containing the prefix mappings and triples to be put into this stream of RDF 311 * @return RDFStream 312 */ 313 public static RdfStream fromModel(final Model model) { 314 final Iterator<Triple> triples = Iterators.transform(model.listStatements(), x -> x.asTriple()); 315 return new RdfStream(triples).namespaces(model.getNsPrefixMap()); 316 } 317 318 @Override 319 protected Iterator<Triple> delegate() { 320 return triples; 321 } 322 323 /** 324 * @return an anonymous {@literal Iterable<Triple>} for use with for-each etc. 325 */ 326 public Iterable<Triple> iterable() { 327 return new Iterable<Triple>() { 328 329 @Override 330 public Iterator<Triple> iterator() { 331 return triples; 332 } 333 }; 334 } 335 336 /** 337 * @return Namespaces in scope for this stream. 338 */ 339 public Map<String, String> namespaces() { 340 return namespaces; 341 } 342 343 private static <T extends Triple> Function<T, Triple> cast() { 344 return new Function<T, Triple>() { 345 346 @Override 347 public Triple apply(final T prototriple) { 348 return prototriple; 349 } 350 351 }; 352 } 353 354 protected static <From, To> Iterator<To> flatMap(final Iterator<From> i, final Function<From, Iterator<To>> f) { 355 return Iterators.concat(Iterators.transform(i, f::apply)); 356 } 357 358 /* 359 * We ignore duplicated triples for equality. (non-Javadoc) 360 * @see java.lang.Object#equals(java.lang.Object) 361 */ 362 @Override 363 public boolean equals(final Object o) { 364 if (o == null) { 365 return false; 366 } 367 if (o == this) { 368 return true; 369 } 370 if (!(o instanceof RdfStream)) { 371 return false; 372 } 373 final RdfStream rdfo = (RdfStream) o; 374 375 final boolean triplesEqual = 376 equal(copyOf(rdfo.triples), copyOf(this.triples)); 377 378 final boolean namespaceMappingsEqual = 379 equal(rdfo.namespaces(), this.namespaces()); 380 381 final boolean topicEqual = 382 equal(rdfo.topic(), this.topic()); 383 384 return triplesEqual && namespaceMappingsEqual && topicEqual; 385 386 } 387 388 @Override 389 public int hashCode() { 390 return hash(namespaces(), triples, topic()); 391 } 392 393}