001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.http.commons.responses; 019 020import static javax.ws.rs.core.Response.Status.NOT_ACCEPTABLE; 021import static org.apache.jena.riot.Lang.JSONLD; 022import static org.apache.jena.riot.Lang.RDFXML; 023import static org.apache.jena.riot.RDFLanguages.contentTypeToLang; 024import static org.apache.jena.riot.RDFLanguages.getRegisteredLanguages; 025import static org.apache.jena.riot.RDFFormat.RDFXML_PLAIN; 026import static org.apache.jena.riot.RDFFormat.JSONLD_COMPACT_FLAT; 027import static org.apache.jena.riot.RDFFormat.JSONLD_EXPAND_FLAT; 028import static org.apache.jena.riot.RDFFormat.JSONLD_FLATTEN_FLAT; 029import static org.apache.jena.riot.system.StreamRDFWriter.defaultSerialization; 030import static org.apache.jena.riot.system.StreamRDFWriter.getWriterStream; 031import static org.fcrepo.kernel.api.RdfCollectors.toModel; 032import static org.slf4j.LoggerFactory.getLogger; 033 034import java.io.IOException; 035import java.io.OutputStream; 036import java.util.Map; 037 038import javax.ws.rs.WebApplicationException; 039import javax.ws.rs.core.MediaType; 040import javax.ws.rs.core.StreamingOutput; 041 042import com.google.common.util.concurrent.AbstractFuture; 043import org.apache.jena.riot.RiotException; 044import org.apache.jena.rdf.model.Model; 045import org.apache.jena.riot.Lang; 046import org.apache.jena.riot.RDFDataMgr; 047import org.apache.jena.riot.RDFFormat; 048import org.apache.jena.riot.system.StreamRDF; 049import org.fcrepo.kernel.api.RdfStream; 050import org.slf4j.Logger; 051 052/** 053 * Serializes an {@link RdfStream}. 054 * 055 * @author ajs6f 056 * @since Oct 30, 2013 057 */ 058public class RdfStreamStreamingOutput extends AbstractFuture<Void> implements 059 StreamingOutput { 060 061 private static final Logger LOGGER = getLogger(RdfStreamStreamingOutput.class); 062 063 private static final String JSONLD_COMPACTED = "http://www.w3.org/ns/json-ld#compacted"; 064 065 private static final String JSONLD_FLATTENED = "http://www.w3.org/ns/json-ld#flattened"; 066 067 private final Lang format; 068 069 private final MediaType mediaType; 070 071 private final RdfStream rdfStream; 072 073 private final Map<String, String> namespaces; 074 075 /** 076 * Normal constructor 077 * 078 * @param rdfStream the rdf stream 079 * @param namespaces a namespace mapping 080 * @param mediaType the media type 081 */ 082 public RdfStreamStreamingOutput(final RdfStream rdfStream, final Map<String, String> namespaces, 083 final MediaType mediaType) { 084 super(); 085 086 if (LOGGER.isDebugEnabled()) { 087 getRegisteredLanguages().forEach(format -> { 088 LOGGER.debug("Discovered RDF writer writeableFormats: {} with mimeTypes: {}", 089 format.getName(), String.join(" ", format.getAltContentTypes())); 090 }); 091 } 092 final Lang format = contentTypeToLang(mediaType.getType() + "/" + mediaType.getSubtype()); 093 if (format != null) { 094 this.format = format; 095 this.mediaType = mediaType; 096 LOGGER.debug("Setting up to serialize to: {}", format); 097 } else { 098 throw new WebApplicationException(NOT_ACCEPTABLE); 099 } 100 101 this.rdfStream = rdfStream; 102 this.namespaces = namespaces; 103 } 104 105 @Override 106 public void write(final OutputStream output) { 107 try { 108 LOGGER.debug("Serializing RDF stream in: {}", format); 109 write(rdfStream, output, format, mediaType, namespaces); 110 } catch (final IOException | RiotException e) { 111 setException(e); 112 LOGGER.debug("Error serializing RDF", e.getMessage()); 113 throw new WebApplicationException(e); 114 } 115 } 116 117 private static void write(final RdfStream rdfStream, 118 final OutputStream output, 119 final Lang dataFormat, 120 final MediaType dataMediaType, 121 final Map<String, String> nsPrefixes) throws IOException { 122 123 final RDFFormat format = defaultSerialization(dataFormat); 124 125 // For formats that can be block-streamed (n-triples, turtle) 126 if (format != null) { 127 LOGGER.debug("Stream-based serialization of {}", dataFormat.toString()); 128 final StreamRDF stream = new SynchonizedStreamRDFWrapper(getWriterStream(output, format)); 129 stream.start(); 130 nsPrefixes.forEach(stream::prefix); 131 rdfStream.forEach(stream::triple); 132 stream.finish(); 133 134 // For formats that require analysis of the entire model and cannot be streamed directly (rdfxml, n3) 135 } else { 136 LOGGER.debug("Non-stream serialization of {}", dataFormat.toString()); 137 final Model model = rdfStream.collect(toModel()); 138 model.setNsPrefixes(nsPrefixes); 139 // use block output streaming for RDFXML 140 if (RDFXML.equals(dataFormat)) { 141 RDFDataMgr.write(output, model.getGraph(), RDFXML_PLAIN); 142 } else if (JSONLD.equals(dataFormat)) { 143 final RDFFormat jsonldFormat = getFormatFromMediaType(dataMediaType); 144 RDFDataMgr.write(output, model.getGraph(), jsonldFormat); 145 } else { 146 RDFDataMgr.write(output, model.getGraph(), dataFormat); 147 } 148 } 149 } 150 151 private static RDFFormat getFormatFromMediaType(final MediaType mediaType) { 152 final String profile = mediaType.getParameters().getOrDefault("profile", ""); 153 if (profile.equals(JSONLD_COMPACTED)) { 154 return JSONLD_COMPACT_FLAT; 155 } else if (profile.equals(JSONLD_FLATTENED)) { 156 return JSONLD_FLATTEN_FLAT; 157 } 158 return JSONLD_EXPAND_FLAT; 159 } 160}