001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.kernel.utils.iterators; 017 018import static com.google.common.base.Objects.equal; 019import static com.google.common.collect.ImmutableSet.copyOf; 020import static com.google.common.collect.Iterators.singletonIterator; 021import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel; 022import static java.util.Objects.hash; 023 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.Map; 028 029import javax.jcr.Session; 030 031import com.google.common.base.Function; 032import com.google.common.base.Predicate; 033import com.google.common.collect.ForwardingIterator; 034import com.google.common.collect.Iterators; 035import com.hp.hpl.jena.graph.Node; 036import com.hp.hpl.jena.graph.Triple; 037import com.hp.hpl.jena.rdf.model.Model; 038import com.hp.hpl.jena.rdf.model.Statement; 039 040/** 041 * A stream of RDF triples along with some useful context. 042 * 043 * @author ajs6f 044 * @since Oct 9, 2013 045 */ 046public class RdfStream extends ForwardingIterator<Triple> { 047 048 private Map<String, String> namespaces = new HashMap<>(); 049 050 protected Iterator<Triple> triples; 051 052 protected Session context; 053 054 protected Node topic; 055 056 private static final Triple[] NONE = new Triple[] {}; 057 058 /** 059 * Constructor that begins the stream with proffered triples. 060 * 061 * @param triples 062 */ 063 public <Tr extends Triple, T extends Iterator<Tr>> RdfStream(final T triples) { 064 super(); 065 this.triples = Iterators.transform(triples, cast()); 066 } 067 068 /** 069 * Constructor that begins the stream with proffered triples. 070 * 071 * @param triples 072 */ 073 public <Tr extends Triple, T extends Iterable<Tr>> RdfStream(final T triples) { 074 this(triples.iterator()); 075 } 076 077 /** 078 * Constructor that begins the stream with proffered triples. 079 * 080 * @param triples 081 */ 082 public <Tr extends Triple, T extends Collection<Tr>> RdfStream( 083 final T triples) { 084 this(triples.iterator()); 085 } 086 087 /** 088 * Constructor that begins the stream with proffered triples. 089 * 090 * @param triples 091 */ 092 @SafeVarargs 093 public <T extends Triple> RdfStream(final T... triples) { 094 this(Iterators.forArray(triples)); 095 } 096 097 /** 098 * Constructor that begins the stream with proffered statements. 099 * 100 * @param statements 101 */ 102 @SafeVarargs 103 public <T extends Statement> RdfStream(final T... statements) { 104 this(Iterators.transform(Iterators.forArray(statements), 105 statement2triple)); 106 } 107 108 /** 109 * Constructor that begins the stream with proffered triple. 110 * 111 * @param triple 112 */ 113 public <T extends Triple> RdfStream(final T triple) { 114 this(Iterators.forArray(new Triple[] { triple })); 115 } 116 117 /** 118 * Constructor that begins the stream without any triples. 119 */ 120 public RdfStream() { 121 this(NONE); 122 } 123 124 /** 125 * Returns the proffered {@link Triple}s with the context of this RdfStream. 126 * 127 * @param stream 128 * @return proffered Triples with the context of this RDFStream 129 */ 130 public <Tr extends Triple, T extends Iterator<Tr>> RdfStream withThisContext(final T stream) { 131 return new RdfStream(stream).namespaces(namespaces()).topic(topic()); 132 } 133 134 /** 135 * Returns the proffered {@link Triple}s with the context of this RdfStream. 136 * 137 * @param stream 138 * @return proffered Triples with the context of this RDFStream 139 */ 140 public <Tr extends Triple, T extends Iterable<Tr>> RdfStream withThisContext(final T stream) { 141 return new RdfStream(stream).namespaces(namespaces()).topic(topic()); 142 } 143 144 /** 145 * @param newTriples Triples to add. 146 * @return This object for continued use. 147 */ 148 public RdfStream concat(final Iterator<? extends Triple> newTriples) { 149 triples = Iterators.concat(triples, newTriples); 150 return this; 151 } 152 153 /** 154 * @param newTriple Triples to add. 155 * @return This object for continued use. 156 */ 157 public <T extends Triple> RdfStream concat(final T newTriple) { 158 triples = Iterators.concat(triples, singletonIterator(newTriple)); 159 return this; 160 } 161 162 /** 163 * @param newTriples Triples to add. 164 * @return This object for continued use. 165 */ 166 public <T extends Triple> RdfStream concat(@SuppressWarnings("unchecked") final T... newTriples) { 167 triples = Iterators.concat(triples, Iterators.forArray(newTriples)); 168 return this; 169 } 170 171 /** 172 * @param newTriples Triples to add. 173 * @return This object for continued use. 174 */ 175 public RdfStream concat(final Collection<? extends Triple> newTriples) { 176 triples = Iterators.concat(triples, newTriples.iterator()); 177 return this; 178 } 179 180 /** 181 * As {@link Iterators#limit(Iterator, int)} while maintaining context. 182 * 183 * @param limit 184 * @return RDFStream 185 */ 186 public RdfStream limit(final Integer limit) { 187 return (limit == -1) ? this : withThisContext(Iterators.limit(this, limit)); 188 } 189 190 /** 191 * As {@link Iterators#advance(Iterator, int)} while maintaining context. 192 * 193 * @param skipNum 194 * @return RDFStream 195 */ 196 public RdfStream skip(final Integer skipNum) { 197 Iterators.advance(this, skipNum); 198 return this; 199 } 200 201 /** 202 * As {@link Iterators#filter(Iterator, Predicate)} while maintaining context. 203 * 204 * @param predicate 205 * @return RdfStream 206 */ 207 public RdfStream filter(final Predicate<? super Triple> predicate) { 208 return withThisContext(Iterators.filter(this, predicate)); 209 } 210 211 /** 212 * As {@link Iterators#transform(Iterator, Function)}. 213 * 214 * @param f 215 * @return Iterator 216 */ 217 public <ToType> Iterator<ToType> transform(final Function<? super Triple, ToType> f) { 218 return Iterators.transform(this, f); 219 } 220 221 /** 222 * RdfStream 223 * 224 * @param prefix 225 * @param uri 226 * @return This object for continued use. 227 */ 228 public RdfStream namespace(final String prefix, final String uri) { 229 namespaces.put(prefix, uri); 230 return this; 231 } 232 233 /** 234 * @param nses 235 * @return This object for continued use. 236 */ 237 public RdfStream namespaces(final Map<String, String> nses) { 238 namespaces.putAll(nses); 239 return this; 240 } 241 242 /** 243 * @return The {@link Session} in context 244 */ 245 public Session session() { 246 return this.context; 247 } 248 249 /** 250 * Sets the JCR context of this stream 251 * 252 * @param session The {@link Session} in context 253 */ 254 public RdfStream session(final Session session) { 255 this.context = session; 256 return this; 257 } 258 259 /** 260 * @return The {@link Node} topic in context 261 */ 262 public Node topic() { 263 return this.topic; 264 } 265 266 /** 267 * Sets the topic of this stream 268 * 269 * @param topic The {@link Node} topic in context 270 */ 271 public RdfStream topic(final Node topic) { 272 this.topic = topic; 273 return this; 274 } 275 276 /** 277 * WARNING! This method exhausts the RdfStream on which it is called! 278 * 279 * @return A {@link Model} containing the prefix mappings and triples in this stream of RDF 280 */ 281 public Model asModel() { 282 final Model model = createDefaultModel(); 283 model.setNsPrefixes(namespaces()); 284 for (final Triple t : this.iterable()) { 285 model.add(model.asStatement(t)); 286 } 287 return model; 288 } 289 290 /** 291 * @param model A {@link Model} containing the prefix mappings and triples to be put into this stream of RDF 292 * @return RDFStream 293 */ 294 public static RdfStream fromModel(final Model model) { 295 final Iterator<Triple> triples = Iterators.transform(model.listStatements(), statement2triple); 296 return new RdfStream(triples).namespaces(model.getNsPrefixMap()); 297 } 298 299 private static Function<Statement, Triple> statement2triple = new Function<Statement, Triple>() { 300 301 @Override 302 public Triple apply(final Statement s) { 303 return s.asTriple(); 304 } 305 306 }; 307 308 @Override 309 protected Iterator<Triple> delegate() { 310 return triples; 311 } 312 313 /** 314 * @return an anonymous Iterable<Triple> for use with for-each etc. 315 */ 316 public Iterable<Triple> iterable() { 317 return new Iterable<Triple>() { 318 319 @Override 320 public Iterator<Triple> iterator() { 321 return triples; 322 } 323 }; 324 } 325 326 /** 327 * @return Namespaces in scope for this stream. 328 */ 329 public Map<String, String> namespaces() { 330 return namespaces; 331 } 332 333 private static <T extends Triple> Function<T, Triple> cast() { 334 return new Function<T, Triple>() { 335 336 @Override 337 public Triple apply(final T prototriple) { 338 return prototriple; 339 } 340 341 }; 342 } 343 344 /* 345 * We ignore duplicated triples for equality. (non-Javadoc) 346 * @see java.lang.Object#equals(java.lang.Object) 347 */ 348 @Override 349 public boolean equals(final Object o) { 350 if (o == null) { 351 return false; 352 } 353 if (o == this) { 354 return true; 355 } 356 if (!(o instanceof RdfStream)) { 357 return false; 358 } 359 final RdfStream rdfo = (RdfStream) o; 360 361 final boolean triplesEqual = 362 equal(copyOf(rdfo.triples), copyOf(this.triples)); 363 364 final boolean namespaceMappingsEqual = 365 equal(rdfo.namespaces(), this.namespaces()); 366 367 final boolean topicEqual = 368 equal(rdfo.topic(), this.topic()); 369 370 return triplesEqual && namespaceMappingsEqual && topicEqual; 371 372 } 373 374 @Override 375 public int hashCode() { 376 return hash(namespaces(), triples, topic()); 377 } 378 379}