001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.http.commons.responses; 019 020import static javax.ws.rs.core.Response.Status.NOT_ACCEPTABLE; 021import static org.apache.jena.riot.Lang.JSONLD; 022import static org.apache.jena.riot.Lang.RDFXML; 023import static org.apache.jena.riot.RDFLanguages.contentTypeToLang; 024import static org.apache.jena.riot.RDFLanguages.getRegisteredLanguages; 025import static org.apache.jena.riot.RDFFormat.RDFXML_PLAIN; 026import static org.apache.jena.riot.RDFFormat.JSONLD_COMPACT_FLAT; 027import static org.apache.jena.riot.RDFFormat.JSONLD_EXPAND_FLAT; 028import static org.apache.jena.riot.RDFFormat.JSONLD_FLATTEN_FLAT; 029import static org.apache.jena.riot.system.StreamRDFWriter.defaultSerialization; 030import static org.apache.jena.riot.system.StreamRDFWriter.getWriterStream; 031import static org.fcrepo.kernel.api.RdfCollectors.toModel; 032import static org.slf4j.LoggerFactory.getLogger; 033import static org.fcrepo.kernel.api.RdfLexicon.RDF_NAMESPACE; 034 035import java.io.OutputStream; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Map.Entry; 041import java.util.stream.Collectors; 042import java.util.Optional; 043import java.util.Set; 044import javax.ws.rs.WebApplicationException; 045import javax.ws.rs.core.MediaType; 046import javax.ws.rs.core.StreamingOutput; 047 048import com.google.common.util.concurrent.AbstractFuture; 049import org.apache.jena.riot.RiotException; 050import org.apache.jena.graph.Triple; 051import org.apache.jena.rdf.model.Model; 052import org.apache.jena.rdf.model.NsIterator; 053import org.apache.jena.riot.Lang; 054import org.apache.jena.riot.RDFDataMgr; 055import org.apache.jena.riot.RDFFormat; 056import org.apache.jena.riot.system.StreamRDF; 057import org.fcrepo.kernel.api.RdfStream; 058import org.slf4j.Logger; 059 060/** 061 * Serializes an {@link RdfStream}. 062 * 063 * @author ajs6f 064 * @since Oct 30, 2013 065 */ 066public class RdfStreamStreamingOutput extends AbstractFuture<Void> implements 067 StreamingOutput { 068 069 private static final Logger LOGGER = getLogger(RdfStreamStreamingOutput.class); 070 071 private static final String JSONLD_COMPACTED = "http://www.w3.org/ns/json-ld#compacted"; 072 073 private static final String JSONLD_FLATTENED = "http://www.w3.org/ns/json-ld#flattened"; 074 075 private static final String RDF_TYPE = RDF_NAMESPACE + "type"; 076 077 private final Lang format; 078 079 private final MediaType mediaType; 080 081 private final RdfStream rdfStream; 082 083 private final Map<String, String> namespaces; 084 085 /** 086 * Normal constructor 087 * 088 * @param rdfStream the rdf stream 089 * @param namespaces a namespace mapping 090 * @param mediaType the media type 091 */ 092 public RdfStreamStreamingOutput(final RdfStream rdfStream, final Map<String, String> namespaces, 093 final MediaType mediaType) { 094 super(); 095 096 if (LOGGER.isDebugEnabled()) { 097 getRegisteredLanguages().forEach(format -> { 098 LOGGER.debug("Discovered RDF writer writeableFormats: {} with mimeTypes: {}", 099 format.getName(), String.join(" ", format.getAltContentTypes())); 100 }); 101 } 102 final Lang format = contentTypeToLang(mediaType.getType() + "/" + mediaType.getSubtype()); 103 if (format != null) { 104 this.format = format; 105 this.mediaType = mediaType; 106 LOGGER.debug("Setting up to serialize to: {}", format); 107 } else { 108 throw new WebApplicationException(NOT_ACCEPTABLE); 109 } 110 111 this.rdfStream = rdfStream; 112 this.namespaces = namespaces; 113 } 114 115 @Override 116 public void write(final OutputStream output) { 117 try { 118 LOGGER.debug("Serializing RDF stream in: {}", format); 119 write(rdfStream, output, format, mediaType, namespaces); 120 } catch (final RiotException e) { 121 setException(e); 122 LOGGER.debug("Error serializing RDF", e.getMessage()); 123 throw new WebApplicationException(e); 124 } 125 } 126 127 private static void write(final RdfStream rdfStream, 128 final OutputStream output, 129 final Lang dataFormat, 130 final MediaType dataMediaType, 131 final Map<String, String> nsPrefixes) { 132 133 final RDFFormat format = defaultSerialization(dataFormat); 134 135 // For formats that can be block-streamed (n-triples, turtle) 136 if (format != null) { 137 LOGGER.debug("Stream-based serialization of {}", dataFormat.toString()); 138 if (RDFFormat.NTRIPLES.equals(format)) { 139 serializeNTriples(rdfStream, format, output); 140 } else { 141 serializeBlockStreamed(rdfStream, output, format, nsPrefixes); 142 } 143 // For formats that require analysis of the entire model and cannot be streamed directly (rdfxml, n3) 144 } else { 145 LOGGER.debug("Non-stream serialization of {}", dataFormat.toString()); 146 serializeNonStreamed(rdfStream, output, dataFormat, dataMediaType, nsPrefixes); 147 } 148 } 149 150 private static void serializeNTriples(final RdfStream rdfStream, final RDFFormat format, 151 final OutputStream output) { 152 final StreamRDF stream = new SynchonizedStreamRDFWrapper(getWriterStream(output, format)); 153 stream.start(); 154 rdfStream.forEach(stream::triple); 155 stream.finish(); 156 } 157 158 private static void serializeBlockStreamed(final RdfStream rdfStream, final OutputStream output, 159 final RDFFormat format, final Map<String, String> nsPrefixes) { 160 161 final Set<String> namespacesPresent = new HashSet<>(); 162 163 final StreamRDF stream = new SynchonizedStreamRDFWrapper(getWriterStream(output, format)); 164 stream.start(); 165 // Must read the rdf stream before writing out ns prefixes, otherwise the prefixes come after the triples 166 final List<Triple> tripleList = rdfStream.peek(t -> { 167 // Collect the namespaces present in the RDF stream, using the same 168 // criteria for where to look that jena's model.listNameSpaces() does 169 namespacesPresent.add(t.getPredicate().getNameSpace()); 170 if (RDF_TYPE.equals(t.getPredicate().getURI()) && t.getObject().isURI()) { 171 namespacesPresent.add(t.getObject().getNameSpace()); 172 } 173 }).collect(Collectors.toList()); 174 175 nsPrefixes.forEach((prefix, uri) -> { 176 // Only add namespace prefixes if the namespace is present in the rdf stream 177 if (namespacesPresent.contains(uri)) { 178 stream.prefix(prefix, uri); 179 } 180 }); 181 tripleList.forEach(stream::triple); 182 stream.finish(); 183 } 184 185 private static void serializeNonStreamed(final RdfStream rdfStream, final OutputStream output, 186 final Lang dataFormat, final MediaType dataMediaType, final Map<String, String> nsPrefixes) { 187 final Model model = rdfStream.collect(toModel()); 188 189 model.setNsPrefixes(filterNamespacesToPresent(model, nsPrefixes)); 190 // use block output streaming for RDFXML 191 if (RDFXML.equals(dataFormat)) { 192 RDFDataMgr.write(output, model.getGraph(), RDFXML_PLAIN); 193 } else if (JSONLD.equals(dataFormat)) { 194 final RDFFormat jsonldFormat = getFormatFromMediaType(dataMediaType); 195 RDFDataMgr.write(output, model.getGraph(), jsonldFormat); 196 } else { 197 RDFDataMgr.write(output, model.getGraph(), dataFormat); 198 } 199 } 200 201 /** 202 * Filters the map of namespace prefix mappings to just those containing namespace URIs present in the model 203 * 204 * @param model model 205 * @param nsPrefixes map of namespace to uris 206 * @return nsPrefixes filtered to namespaces found in the model 207 */ 208 private static Map<String, String> filterNamespacesToPresent(final Model model, 209 final Map<String, String> nsPrefixes) { 210 final Map<String, String> resultNses = new HashMap<>(); 211 final Set<Entry<String, String>> nsSet = nsPrefixes.entrySet(); 212 final NsIterator nsIt = model.listNameSpaces(); 213 while (nsIt.hasNext()) { 214 final String ns = nsIt.next(); 215 216 final Optional<Entry<String, String>> nsOpt = nsSet.stream() 217 .filter(nsEntry -> nsEntry.getValue().equals(ns)) 218 .findFirst(); 219 if (nsOpt.isPresent()) { 220 final Entry<String, String> nsMatch = nsOpt.get(); 221 resultNses.put(nsMatch.getKey(), nsMatch.getValue()); 222 } 223 } 224 225 return resultNses; 226 } 227 228 private static RDFFormat getFormatFromMediaType(final MediaType mediaType) { 229 final String profile = mediaType.getParameters().getOrDefault("profile", ""); 230 if (profile.equals(JSONLD_COMPACTED)) { 231 return JSONLD_COMPACT_FLAT; 232 } else if (profile.equals(JSONLD_FLATTENED)) { 233 return JSONLD_FLATTEN_FLAT; 234 } 235 return JSONLD_EXPAND_FLAT; 236 } 237}