001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.http.commons.responses;
019
020import static javax.ws.rs.core.Response.Status.NOT_ACCEPTABLE;
021import static org.openrdf.model.impl.ValueFactoryImpl.getInstance;
022import static org.openrdf.model.util.Literals.createLiteral;
023import static org.slf4j.LoggerFactory.getLogger;
024
025import java.io.OutputStream;
026import java.util.Map;
027import java.util.function.Function;
028import java.util.stream.Stream;
029
030import javax.ws.rs.WebApplicationException;
031import javax.ws.rs.core.MediaType;
032import javax.ws.rs.core.StreamingOutput;
033
034import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
035import org.fcrepo.kernel.api.RdfStream;
036
037import org.openrdf.model.Resource;
038import org.openrdf.model.Statement;
039import org.openrdf.model.URI;
040import org.openrdf.model.Value;
041import org.openrdf.model.ValueFactory;
042import org.openrdf.rio.RDFFormat;
043import org.openrdf.rio.RDFHandlerException;
044import org.openrdf.rio.RDFWriter;
045import org.openrdf.rio.RDFWriterRegistry;
046import org.openrdf.rio.Rio;
047import org.openrdf.rio.WriterConfig;
048import org.slf4j.Logger;
049
050import com.google.common.util.concurrent.AbstractFuture;
051import com.hp.hpl.jena.graph.Node;
052import com.hp.hpl.jena.graph.Triple;
053
054/**
055 * Serializes an {@link RdfStream}.
056 *
057 * @author ajs6f
058 * @since Oct 30, 2013
059 */
060public class RdfStreamStreamingOutput extends AbstractFuture<Void> implements
061        StreamingOutput {
062
063    private static final Logger LOGGER =
064        getLogger(RdfStreamStreamingOutput.class);
065
066    private static ValueFactory vfactory = getInstance();
067
068    /**
069     * This field is used to determine the correct {@link org.openrdf.rio.RDFWriter} created for the
070     * {@link javax.ws.rs.core.StreamingOutput}.
071     */
072    private final RDFFormat format;
073
074    /**
075     * This field is used to determine the {@link org.openrdf.rio.WriterConfig} details used by the created
076     * {@link org.openrdf.rio.RDFWriter}.
077     */
078    private final MediaType mediaType;
079
080    private final RdfStream rdfStream;
081
082    private final Map<String, String> namespaces;
083
084    /**
085     * Normal constructor
086     *
087     * @param rdfStream the rdf stream
088     * @param namespaces a namespace mapping
089     * @param mediaType the media type
090     */
091    public RdfStreamStreamingOutput(final RdfStream rdfStream, final Map<String, String> namespaces,
092            final MediaType mediaType) {
093        super();
094
095        if (LOGGER.isDebugEnabled()) {
096            for (final RDFFormat writeableFormats : RDFWriterRegistry.getInstance().getKeys()) {
097                LOGGER.debug("Discovered RDF writer writeableFormats: {} with mimeTypes: {}",
098                        writeableFormats.getName(), String.join(" ", writeableFormats.getMIMETypes()));
099            }
100        }
101        final RDFFormat format = Rio.getWriterFormatForMIMEType(mediaType.toString());
102        if (format != null) {
103            this.format = format;
104            this.mediaType = mediaType;
105            LOGGER.debug("Setting up to serialize to: {}", format);
106        } else {
107            throw new WebApplicationException(NOT_ACCEPTABLE);
108        }
109
110        this.rdfStream = rdfStream;
111        this.namespaces = namespaces;
112    }
113
114    @Override
115    public void write(final OutputStream output) {
116        LOGGER.debug("Serializing RDF stream in: {}", format);
117        try {
118            write(rdfStream.map(toStatement), output, format, mediaType, namespaces);
119        } catch (final RDFHandlerException e) {
120            setException(e);
121            LOGGER.debug("Error serializing RDF", e);
122            throw new WebApplicationException(e);
123        }
124    }
125
126    private static void write(final Stream<Statement> model,
127                       final OutputStream output,
128                       final RDFFormat dataFormat,
129                       final MediaType dataMediaType,
130                       final Map<String, String> nsPrefixes)
131            throws RDFHandlerException {
132        final WriterConfig settings = WriterConfigHelper.apply(dataMediaType);
133        final RDFWriter writer = Rio.createWriter(dataFormat, output);
134        writer.setWriterConfig(settings);
135
136        /**
137         * We exclude:
138         *  - xmlns, which Sesame helpfully serializes, but normal parsers may complain
139         *     about in some serializations (e.g. RDF/XML where xmlns:xmlns is forbidden by XML);
140         */
141        nsPrefixes.entrySet().stream().filter(e -> !e.getKey().equals("xmlns"))
142                .forEach(x -> {
143                    try {
144                        writer.handleNamespace(x.getKey(), x.getValue());
145                    } catch (final RDFHandlerException e) {
146                        throw new RepositoryRuntimeException(e);
147                    }
148                });
149
150        Rio.write((Iterable<Statement>)model::iterator, writer);
151    }
152
153    protected static final Function<? super Triple, Statement> toStatement = t -> {
154        final Resource subject = getResourceForSubject(t.getSubject());
155        final URI predicate = vfactory.createURI(t.getPredicate().getURI());
156        final Value object = getValueForObject(t.getObject());
157        return vfactory.createStatement(subject, predicate, object);
158    };
159
160    private static Resource getResourceForSubject(final Node subjectNode) {
161        return subjectNode.isBlank() ? vfactory.createBNode(subjectNode.getBlankNodeLabel())
162                : vfactory.createURI(subjectNode.getURI());
163    }
164
165    protected static Value getValueForObject(final Node object) {
166        if (object.isURI()) {
167            return vfactory.createURI(object.getURI());
168        } else if (object.isBlank()) {
169            return vfactory.createBNode(object.getBlankNodeLabel());
170        } else if (object.isLiteral()) {
171            final String literalValue = object.getLiteralLexicalForm();
172
173            final String literalDatatypeURI = object.getLiteralDatatypeURI();
174
175            if (!object.getLiteralLanguage().isEmpty()) {
176                return vfactory.createLiteral(literalValue, object.getLiteralLanguage());
177            } else if (literalDatatypeURI != null) {
178                final URI uri = vfactory.createURI(literalDatatypeURI);
179                return vfactory.createLiteral(literalValue, uri);
180            } else {
181                return createLiteral(vfactory, object.getLiteralValue());
182            }
183        }
184        throw new AssertionError("Unable to convert " + object +
185                " to a value, it is neither URI, blank, nor literal!");
186    }
187}