001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.http.commons.responses;
019
020import static javax.ws.rs.core.Response.Status.NOT_ACCEPTABLE;
021import static org.apache.jena.riot.Lang.JSONLD;
022import static org.apache.jena.riot.Lang.RDFXML;
023import static org.apache.jena.riot.RDFLanguages.contentTypeToLang;
024import static org.apache.jena.riot.RDFLanguages.getRegisteredLanguages;
025import static org.apache.jena.riot.RDFFormat.RDFXML_PLAIN;
026import static org.apache.jena.riot.RDFFormat.JSONLD_COMPACT_FLAT;
027import static org.apache.jena.riot.RDFFormat.JSONLD_EXPAND_FLAT;
028import static org.apache.jena.riot.RDFFormat.JSONLD_FLATTEN_FLAT;
029import static org.apache.jena.riot.system.StreamRDFWriter.defaultSerialization;
030import static org.apache.jena.riot.system.StreamRDFWriter.getWriterStream;
031import static org.fcrepo.kernel.api.RdfCollectors.toModel;
032import static org.slf4j.LoggerFactory.getLogger;
033import static org.fcrepo.kernel.api.RdfLexicon.RDF_NAMESPACE;
034
035import java.io.OutputStream;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Map.Entry;
041import java.util.stream.Collectors;
042import java.util.Optional;
043import java.util.Set;
044import javax.ws.rs.WebApplicationException;
045import javax.ws.rs.core.MediaType;
046import javax.ws.rs.core.StreamingOutput;
047
048import com.google.common.util.concurrent.AbstractFuture;
049import org.apache.jena.riot.RiotException;
050import org.apache.jena.graph.Triple;
051import org.apache.jena.rdf.model.Model;
052import org.apache.jena.rdf.model.NsIterator;
053import org.apache.jena.riot.Lang;
054import org.apache.jena.riot.RDFDataMgr;
055import org.apache.jena.riot.RDFFormat;
056import org.apache.jena.riot.system.StreamRDF;
057import org.fcrepo.kernel.api.RdfStream;
058import org.slf4j.Logger;
059
060/**
061 * Serializes an {@link RdfStream}.
062 *
063 * @author ajs6f
064 * @since Oct 30, 2013
065 */
066public class RdfStreamStreamingOutput extends AbstractFuture<Void> implements
067        StreamingOutput {
068
069    private static final Logger LOGGER = getLogger(RdfStreamStreamingOutput.class);
070
071    private static final String JSONLD_COMPACTED = "http://www.w3.org/ns/json-ld#compacted";
072
073    private static final String JSONLD_FLATTENED = "http://www.w3.org/ns/json-ld#flattened";
074
075    private static final String RDF_TYPE = RDF_NAMESPACE + "type";
076
077    private final Lang format;
078
079    private final MediaType mediaType;
080
081    private final RdfStream rdfStream;
082
083    private final Map<String, String> namespaces;
084
085    /**
086     * Normal constructor
087     *
088     * @param rdfStream the rdf stream
089     * @param namespaces a namespace mapping
090     * @param mediaType the media type
091     */
092    public RdfStreamStreamingOutput(final RdfStream rdfStream, final Map<String, String> namespaces,
093            final MediaType mediaType) {
094        super();
095
096        if (LOGGER.isDebugEnabled()) {
097            getRegisteredLanguages().forEach(format -> {
098                LOGGER.debug("Discovered RDF writer writeableFormats: {} with mimeTypes: {}",
099                        format.getName(), String.join(" ", format.getAltContentTypes()));
100            });
101        }
102        final Lang format = contentTypeToLang(mediaType.getType() + "/" + mediaType.getSubtype());
103        if (format != null) {
104            this.format = format;
105            this.mediaType = mediaType;
106            LOGGER.debug("Setting up to serialize to: {}", format);
107        } else {
108            throw new WebApplicationException(NOT_ACCEPTABLE);
109        }
110
111        this.rdfStream = rdfStream;
112        this.namespaces = namespaces;
113    }
114
115    @Override
116    public void write(final OutputStream output) {
117        try {
118            LOGGER.debug("Serializing RDF stream in: {}", format);
119            write(rdfStream, output, format, mediaType, namespaces);
120        } catch (final RiotException e) {
121            setException(e);
122            LOGGER.debug("Error serializing RDF", e.getMessage());
123            throw new WebApplicationException(e);
124        }
125    }
126
127    private static void write(final RdfStream rdfStream,
128                       final OutputStream output,
129                       final Lang dataFormat,
130                       final MediaType dataMediaType,
131                       final Map<String, String> nsPrefixes) {
132
133        final RDFFormat format = defaultSerialization(dataFormat);
134
135        // For formats that can be block-streamed (n-triples, turtle)
136        if (format != null) {
137            LOGGER.debug("Stream-based serialization of {}", dataFormat.toString());
138            if (RDFFormat.NTRIPLES.equals(format)) {
139                serializeNTriples(rdfStream, format, output);
140            } else {
141                serializeBlockStreamed(rdfStream, output, format, nsPrefixes);
142            }
143        // For formats that require analysis of the entire model and cannot be streamed directly (rdfxml, n3)
144        } else {
145            LOGGER.debug("Non-stream serialization of {}", dataFormat.toString());
146            serializeNonStreamed(rdfStream, output, dataFormat, dataMediaType, nsPrefixes);
147        }
148    }
149
150    private static void serializeNTriples(final RdfStream rdfStream, final RDFFormat format,
151            final OutputStream output) {
152        final StreamRDF stream = new SynchonizedStreamRDFWrapper(getWriterStream(output, format));
153        stream.start();
154        rdfStream.forEach(stream::triple);
155        stream.finish();
156    }
157
158    private static void serializeBlockStreamed(final RdfStream rdfStream, final OutputStream output,
159            final RDFFormat format, final Map<String, String> nsPrefixes) {
160
161        final Set<String> namespacesPresent = new HashSet<>();
162
163        final StreamRDF stream = new SynchonizedStreamRDFWrapper(getWriterStream(output, format));
164        stream.start();
165        // Must read the rdf stream before writing out ns prefixes, otherwise the prefixes come after the triples
166        final List<Triple> tripleList = rdfStream.peek(t -> {
167            // Collect the namespaces present in the RDF stream, using the same
168            // criteria for where to look that jena's model.listNameSpaces() does
169            namespacesPresent.add(t.getPredicate().getNameSpace());
170            if (RDF_TYPE.equals(t.getPredicate().getURI()) && t.getObject().isURI()) {
171                namespacesPresent.add(t.getObject().getNameSpace());
172            }
173        }).collect(Collectors.toList());
174
175        nsPrefixes.forEach((prefix, uri) -> {
176            // Only add namespace prefixes if the namespace is present in the rdf stream
177            if (namespacesPresent.contains(uri)) {
178                stream.prefix(prefix, uri);
179            }
180        });
181        tripleList.forEach(stream::triple);
182        stream.finish();
183    }
184
185    private static void serializeNonStreamed(final RdfStream rdfStream, final OutputStream output,
186            final Lang dataFormat, final MediaType dataMediaType, final Map<String, String> nsPrefixes) {
187        final Model model = rdfStream.collect(toModel());
188
189        model.setNsPrefixes(filterNamespacesToPresent(model, nsPrefixes));
190        // use block output streaming for RDFXML
191        if (RDFXML.equals(dataFormat)) {
192            RDFDataMgr.write(output, model.getGraph(), RDFXML_PLAIN);
193        } else if (JSONLD.equals(dataFormat)) {
194            final RDFFormat jsonldFormat = getFormatFromMediaType(dataMediaType);
195            RDFDataMgr.write(output, model.getGraph(), jsonldFormat);
196        } else {
197            RDFDataMgr.write(output, model.getGraph(), dataFormat);
198        }
199    }
200
201    /**
202     * Filters the map of namespace prefix mappings to just those containing namespace URIs present in the model
203     *
204     * @param model model
205     * @param nsPrefixes map of namespace to uris
206     * @return nsPrefixes filtered to namespaces found in the model
207     */
208    private static Map<String, String> filterNamespacesToPresent(final Model model,
209            final Map<String, String> nsPrefixes) {
210        final Map<String, String> resultNses = new HashMap<>();
211        final Set<Entry<String, String>> nsSet = nsPrefixes.entrySet();
212        final NsIterator nsIt = model.listNameSpaces();
213        while (nsIt.hasNext()) {
214            final String ns = nsIt.next();
215
216            final Optional<Entry<String, String>> nsOpt = nsSet.stream()
217                    .filter(nsEntry -> nsEntry.getValue().equals(ns))
218                    .findFirst();
219            if (nsOpt.isPresent()) {
220                final Entry<String, String> nsMatch = nsOpt.get();
221                resultNses.put(nsMatch.getKey(), nsMatch.getValue());
222            }
223        }
224
225        return resultNses;
226    }
227
228    private static RDFFormat getFormatFromMediaType(final MediaType mediaType) {
229        final String profile = mediaType.getParameters().getOrDefault("profile", "");
230        if (profile.equals(JSONLD_COMPACTED)) {
231            return JSONLD_COMPACT_FLAT;
232        } else if (profile.equals(JSONLD_FLATTENED)) {
233            return JSONLD_FLATTEN_FLAT;
234        }
235        return JSONLD_EXPAND_FLAT;
236    }
237}