001/** 002 * Copyright 2014 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.fcrepo.kernel.utils.iterators; 018 019import static com.google.common.base.Objects.equal; 020import static com.google.common.collect.ImmutableSet.copyOf; 021import static com.google.common.collect.Iterators.singletonIterator; 022import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel; 023import static java.util.Objects.hash; 024 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.Map; 029 030import javax.jcr.Session; 031 032import com.google.common.base.Function; 033import com.google.common.base.Predicate; 034import com.google.common.collect.ForwardingIterator; 035import com.google.common.collect.Iterators; 036import com.hp.hpl.jena.graph.Node; 037import com.hp.hpl.jena.graph.Triple; 038import com.hp.hpl.jena.rdf.model.Model; 039import com.hp.hpl.jena.rdf.model.Statement; 040 041/** 042 * A stream of RDF triples along with some useful context. 043 * 044 * @author ajs6f 045 * @since Oct 9, 2013 046 */ 047public class RdfStream extends ForwardingIterator<Triple> { 048 049 private Map<String, String> namespaces = new HashMap<>(); 050 051 protected Iterator<Triple> triples; 052 053 protected Session context; 054 055 protected Node topic; 056 057 private static final Triple[] NONE = new Triple[] {}; 058 059 /** 060 * Constructor that begins the stream with proffered triples. 061 * 062 * @param triples 063 */ 064 public <Tr extends Triple, T extends Iterator<Tr>> RdfStream(final T triples) { 065 super(); 066 this.triples = Iterators.transform(triples, cast()); 067 } 068 069 /** 070 * Constructor that begins the stream with proffered triples. 071 * 072 * @param triples 073 */ 074 public <Tr extends Triple, T extends Iterable<Tr>> RdfStream(final T triples) { 075 this(triples.iterator()); 076 } 077 078 /** 079 * Constructor that begins the stream with proffered triples. 080 * 081 * @param triples 082 */ 083 public <Tr extends Triple, T extends Collection<Tr>> RdfStream( 084 final T triples) { 085 this(triples.iterator()); 086 } 087 088 /** 089 * Constructor that begins the stream with proffered triples. 090 * 091 * @param triples 092 */ 093 @SafeVarargs 094 public <T extends Triple> RdfStream(final T... triples) { 095 this(Iterators.forArray(triples)); 096 } 097 098 /** 099 * Constructor that begins the stream with proffered statements. 100 * 101 * @param statements 102 */ 103 @SafeVarargs 104 public <T extends Statement> RdfStream(final T... statements) { 105 this(Iterators.transform(Iterators.forArray(statements), 106 statement2triple)); 107 } 108 109 /** 110 * Constructor that begins the stream with proffered triple. 111 * 112 * @param triple 113 */ 114 public <T extends Triple> RdfStream(final T triple) { 115 this(Iterators.forArray(new Triple[] { triple })); 116 } 117 118 /** 119 * Constructor that begins the stream without any triples. 120 */ 121 public RdfStream() { 122 this(NONE); 123 } 124 125 /** 126 * Returns the proffered {@link Triple}s with the context of this RdfStream. 127 * 128 * @param stream 129 * @return proffered Triples with the context of this RDFStream 130 */ 131 public <Tr extends Triple, T extends Iterator<Tr>> RdfStream withThisContext(final T stream) { 132 return new RdfStream(stream).namespaces(namespaces()).topic(topic()); 133 } 134 135 /** 136 * Returns the proffered {@link Triple}s with the context of this RdfStream. 137 * 138 * @param stream 139 * @return proffered Triples with the context of this RDFStream 140 */ 141 public <Tr extends Triple, T extends Iterable<Tr>> RdfStream withThisContext(final T stream) { 142 return new RdfStream(stream).namespaces(namespaces()).topic(topic()); 143 } 144 145 /** 146 * @param newTriples Triples to add. 147 * @return This object for continued use. 148 */ 149 public RdfStream concat(final Iterator<? extends Triple> newTriples) { 150 triples = Iterators.concat(triples, newTriples); 151 return this; 152 } 153 154 /** 155 * @param newTriple Triples to add. 156 * @return This object for continued use. 157 */ 158 public <T extends Triple> RdfStream concat(final T newTriple) { 159 triples = Iterators.concat(triples, singletonIterator(newTriple)); 160 return this; 161 } 162 163 /** 164 * @param newTriples Triples to add. 165 * @return This object for continued use. 166 */ 167 public <T extends Triple> RdfStream concat(@SuppressWarnings("unchecked") final T... newTriples) { 168 triples = Iterators.concat(triples, Iterators.forArray(newTriples)); 169 return this; 170 } 171 172 /** 173 * @param newTriples Triples to add. 174 * @return This object for continued use. 175 */ 176 public RdfStream concat(final Collection<? extends Triple> newTriples) { 177 triples = Iterators.concat(triples, newTriples.iterator()); 178 return this; 179 } 180 181 /** 182 * As {@link Iterators#limit(Iterator, int)} while maintaining context. 183 * 184 * @param limit 185 * @return RDFStream 186 */ 187 public RdfStream limit(final Integer limit) { 188 return (limit == -1) ? this : withThisContext(Iterators.limit(this, limit)); 189 } 190 191 /** 192 * As {@link Iterators#advance(Iterator, int)} while maintaining context. 193 * 194 * @param skipNum 195 * @return RDFStream 196 */ 197 public RdfStream skip(final Integer skipNum) { 198 Iterators.advance(this, skipNum); 199 return this; 200 } 201 202 /** 203 * As {@link Iterators#filter(Iterator, Predicate)} while maintaining context. 204 * 205 * @param predicate 206 * @return RdfStream 207 */ 208 public RdfStream filter(final Predicate<? super Triple> predicate) { 209 return withThisContext(Iterators.filter(this, predicate)); 210 } 211 212 /** 213 * As {@link Iterators#transform(Iterator, Function)}. 214 * 215 * @param f 216 * @return Iterator 217 */ 218 public <ToType> Iterator<ToType> transform(final Function<? super Triple, ToType> f) { 219 return Iterators.transform(this, f); 220 } 221 222 /** 223 * RdfStream 224 * 225 * @param prefix 226 * @param uri 227 * @return This object for continued use. 228 */ 229 public RdfStream namespace(final String prefix, final String uri) { 230 namespaces.put(prefix, uri); 231 return this; 232 } 233 234 /** 235 * @param nses 236 * @return This object for continued use. 237 */ 238 public RdfStream namespaces(final Map<String, String> nses) { 239 namespaces.putAll(nses); 240 return this; 241 } 242 243 /** 244 * @return The {@link Session} in context 245 */ 246 public Session session() { 247 return this.context; 248 } 249 250 /** 251 * Sets the JCR context of this stream 252 * 253 * @param session The {@link Session} in context 254 */ 255 public RdfStream session(final Session session) { 256 this.context = session; 257 return this; 258 } 259 260 /** 261 * @return The {@link Node} topic in context 262 */ 263 public Node topic() { 264 return this.topic; 265 } 266 267 /** 268 * Sets the topic of this stream 269 * 270 * @param topic The {@link Node} topic in context 271 */ 272 public RdfStream topic(final Node topic) { 273 this.topic = topic; 274 return this; 275 } 276 277 /** 278 * WARNING! This method exhausts the RdfStream on which it is called! 279 * 280 * @return A {@link Model} containing the prefix mappings and triples in this stream of RDF 281 */ 282 public Model asModel() { 283 final Model model = createDefaultModel(); 284 model.setNsPrefixes(namespaces()); 285 for (final Triple t : this.iterable()) { 286 model.add(model.asStatement(t)); 287 } 288 return model; 289 } 290 291 /** 292 * @param model A {@link Model} containing the prefix mappings and triples to be put into this stream of RDF 293 * @return RDFStream 294 */ 295 public static RdfStream fromModel(final Model model) { 296 final Iterator<Triple> triples = Iterators.transform(model.listStatements(), statement2triple); 297 return new RdfStream(triples).namespaces(model.getNsPrefixMap()); 298 } 299 300 private static Function<Statement, Triple> statement2triple = new Function<Statement, Triple>() { 301 302 @Override 303 public Triple apply(final Statement s) { 304 return s.asTriple(); 305 } 306 307 }; 308 309 @Override 310 protected Iterator<Triple> delegate() { 311 return triples; 312 } 313 314 /** 315 * @return an anonymous Iterable<Triple> for use with for-each etc. 316 */ 317 public Iterable<Triple> iterable() { 318 return new Iterable<Triple>() { 319 320 @Override 321 public Iterator<Triple> iterator() { 322 return triples; 323 } 324 }; 325 } 326 327 /** 328 * @return Namespaces in scope for this stream. 329 */ 330 public Map<String, String> namespaces() { 331 return namespaces; 332 } 333 334 private static <T extends Triple> Function<T, Triple> cast() { 335 return new Function<T, Triple>() { 336 337 @Override 338 public Triple apply(final T prototriple) { 339 return prototriple; 340 } 341 342 }; 343 } 344 345 /* 346 * We ignore duplicated triples for equality. (non-Javadoc) 347 * @see java.lang.Object#equals(java.lang.Object) 348 */ 349 @Override 350 public boolean equals(final Object o) { 351 if (o == null) { 352 return false; 353 } 354 if (o == this) { 355 return true; 356 } 357 if (!(o instanceof RdfStream)) { 358 return false; 359 } 360 final RdfStream rdfo = (RdfStream) o; 361 362 final boolean triplesEqual = 363 equal(copyOf(rdfo.triples), copyOf(this.triples)); 364 365 final boolean namespaceMappingsEqual = 366 equal(rdfo.namespaces(), this.namespaces()); 367 368 final boolean topicEqual = 369 equal(rdfo.topic(), this.topic()); 370 371 return triplesEqual && namespaceMappingsEqual && topicEqual; 372 373 } 374 375 @Override 376 public int hashCode() { 377 return hash(namespaces(), triples, topic()); 378 } 379 380}