001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.http.commons.responses;
019
020import static javax.ws.rs.core.Response.Status.NOT_ACCEPTABLE;
021import static org.apache.jena.riot.Lang.JSONLD;
022import static org.apache.jena.riot.Lang.RDFXML;
023import static org.apache.jena.riot.RDFLanguages.contentTypeToLang;
024import static org.apache.jena.riot.RDFLanguages.getRegisteredLanguages;
025import static org.apache.jena.riot.RDFFormat.RDFXML_PLAIN;
026import static org.apache.jena.riot.RDFFormat.JSONLD_COMPACT_FLAT;
027import static org.apache.jena.riot.RDFFormat.JSONLD_EXPAND_FLAT;
028import static org.apache.jena.riot.RDFFormat.JSONLD_FLATTEN_FLAT;
029import static org.apache.jena.riot.system.StreamRDFWriter.defaultSerialization;
030import static org.apache.jena.riot.system.StreamRDFWriter.getWriterStream;
031import static org.fcrepo.kernel.api.RdfCollectors.toModel;
032import static org.slf4j.LoggerFactory.getLogger;
033
034import java.io.IOException;
035import java.io.OutputStream;
036import java.util.Map;
037
038import javax.ws.rs.WebApplicationException;
039import javax.ws.rs.core.MediaType;
040import javax.ws.rs.core.StreamingOutput;
041
042import com.google.common.util.concurrent.AbstractFuture;
043import org.apache.jena.riot.RiotException;
044import org.apache.jena.rdf.model.Model;
045import org.apache.jena.riot.Lang;
046import org.apache.jena.riot.RDFDataMgr;
047import org.apache.jena.riot.RDFFormat;
048import org.apache.jena.riot.system.StreamRDF;
049import org.fcrepo.kernel.api.RdfStream;
050import org.slf4j.Logger;
051
052/**
053 * Serializes an {@link RdfStream}.
054 *
055 * @author ajs6f
056 * @since Oct 30, 2013
057 */
058public class RdfStreamStreamingOutput extends AbstractFuture<Void> implements
059        StreamingOutput {
060
061    private static final Logger LOGGER = getLogger(RdfStreamStreamingOutput.class);
062
063    private static final String JSONLD_COMPACTED = "http://www.w3.org/ns/json-ld#compacted";
064
065    private static final String JSONLD_FLATTENED = "http://www.w3.org/ns/json-ld#flattened";
066
067    private final Lang format;
068
069    private final MediaType mediaType;
070
071    private final RdfStream rdfStream;
072
073    private final Map<String, String> namespaces;
074
075    /**
076     * Normal constructor
077     *
078     * @param rdfStream the rdf stream
079     * @param namespaces a namespace mapping
080     * @param mediaType the media type
081     */
082    public RdfStreamStreamingOutput(final RdfStream rdfStream, final Map<String, String> namespaces,
083            final MediaType mediaType) {
084        super();
085
086        if (LOGGER.isDebugEnabled()) {
087            getRegisteredLanguages().forEach(format -> {
088                LOGGER.debug("Discovered RDF writer writeableFormats: {} with mimeTypes: {}",
089                        format.getName(), String.join(" ", format.getAltContentTypes()));
090            });
091        }
092        final Lang format = contentTypeToLang(mediaType.getType() + "/" + mediaType.getSubtype());
093        if (format != null) {
094            this.format = format;
095            this.mediaType = mediaType;
096            LOGGER.debug("Setting up to serialize to: {}", format);
097        } else {
098            throw new WebApplicationException(NOT_ACCEPTABLE);
099        }
100
101        this.rdfStream = rdfStream;
102        this.namespaces = namespaces;
103    }
104
105    @Override
106    public void write(final OutputStream output) {
107        try {
108            LOGGER.debug("Serializing RDF stream in: {}", format);
109            write(rdfStream, output, format, mediaType, namespaces);
110        } catch (final IOException | RiotException e) {
111            setException(e);
112            LOGGER.debug("Error serializing RDF", e.getMessage());
113            throw new WebApplicationException(e);
114        }
115    }
116
117    private static void write(final RdfStream rdfStream,
118                       final OutputStream output,
119                       final Lang dataFormat,
120                       final MediaType dataMediaType,
121                       final Map<String, String> nsPrefixes) throws IOException {
122
123        final RDFFormat format = defaultSerialization(dataFormat);
124
125        // For formats that can be block-streamed (n-triples, turtle)
126        if (format != null) {
127            LOGGER.debug("Stream-based serialization of {}", dataFormat.toString());
128            final StreamRDF stream = new SynchonizedStreamRDFWrapper(getWriterStream(output, format));
129            stream.start();
130            nsPrefixes.forEach(stream::prefix);
131            rdfStream.forEach(stream::triple);
132            stream.finish();
133
134        // For formats that require analysis of the entire model and cannot be streamed directly (rdfxml, n3)
135        } else {
136            LOGGER.debug("Non-stream serialization of {}", dataFormat.toString());
137            final Model model = rdfStream.collect(toModel());
138            model.setNsPrefixes(nsPrefixes);
139            // use block output streaming for RDFXML
140            if (RDFXML.equals(dataFormat)) {
141                RDFDataMgr.write(output, model.getGraph(), RDFXML_PLAIN);
142            } else if (JSONLD.equals(dataFormat)) {
143                final RDFFormat jsonldFormat = getFormatFromMediaType(dataMediaType);
144                RDFDataMgr.write(output, model.getGraph(), jsonldFormat);
145            } else {
146                RDFDataMgr.write(output, model.getGraph(), dataFormat);
147            }
148        }
149    }
150
151    private static RDFFormat getFormatFromMediaType(final MediaType mediaType) {
152        final String profile = mediaType.getParameters().getOrDefault("profile", "");
153        if (profile.equals(JSONLD_COMPACTED)) {
154            return JSONLD_COMPACT_FLAT;
155        } else if (profile.equals(JSONLD_FLATTENED)) {
156            return JSONLD_FLATTEN_FLAT;
157        }
158        return JSONLD_EXPAND_FLAT;
159    }
160}