001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.http.commons.responses;
017
018import static javax.ws.rs.core.Response.Status.NOT_ACCEPTABLE;
019import static org.openrdf.model.impl.ValueFactoryImpl.getInstance;
020import static org.openrdf.model.util.Literals.createLiteral;
021import static org.slf4j.LoggerFactory.getLogger;
022
023import java.io.OutputStream;
024import java.util.Iterator;
025import java.util.Map;
026
027import javax.ws.rs.WebApplicationException;
028import javax.ws.rs.core.MediaType;
029import javax.ws.rs.core.StreamingOutput;
030
031import com.google.common.base.Predicate;
032import com.google.common.collect.Iterables;
033import org.fcrepo.kernel.utils.iterators.RdfStream;
034import org.openrdf.model.Resource;
035import org.openrdf.model.Statement;
036import org.openrdf.model.URI;
037import org.openrdf.model.Value;
038import org.openrdf.model.ValueFactory;
039import org.openrdf.rio.RDFFormat;
040import org.openrdf.rio.RDFHandlerException;
041import org.openrdf.rio.RDFWriter;
042import org.openrdf.rio.RDFWriterRegistry;
043import org.openrdf.rio.Rio;
044import org.openrdf.rio.WriterConfig;
045import org.slf4j.Logger;
046
047import com.google.common.base.Function;
048import com.google.common.base.Joiner;
049import com.google.common.util.concurrent.AbstractFuture;
050import com.hp.hpl.jena.graph.Node;
051import com.hp.hpl.jena.graph.Triple;
052
053/**
054 * Serializes an {@link RdfStream}.
055 *
056 * @author ajs6f
057 * @since Oct 30, 2013
058 */
059public class RdfStreamStreamingOutput extends AbstractFuture<Void> implements
060        StreamingOutput {
061
062    private static final Logger LOGGER =
063        getLogger(RdfStreamStreamingOutput.class);
064
065    private static ValueFactory vfactory = getInstance();
066
067    private final RDFFormat format;
068
069    private final RdfStream rdfStream;
070
071    /**
072     * Normal constructor
073     *
074     * @param rdfStream the rdf stream
075     * @param mediaType the media type
076     */
077    public RdfStreamStreamingOutput(final RdfStream rdfStream,
078            final MediaType mediaType) {
079        super();
080
081        if (LOGGER.isDebugEnabled()) {
082            for (final RDFFormat writeableFormats : RDFWriterRegistry.getInstance().getKeys()) {
083                LOGGER.debug("Discovered RDF writer writeableFormats: {} with mimeTypes: {}",
084                        writeableFormats.getName(), Joiner.on(" ")
085                                .join(writeableFormats.getMIMETypes()));
086            }
087        }
088        final RDFFormat format = Rio.getWriterFormatForMIMEType(mediaType.toString());
089        if (format != null) {
090            this.format = format;
091            LOGGER.debug("Setting up to serialize to: {}", format);
092        } else {
093            throw new WebApplicationException(NOT_ACCEPTABLE);
094        }
095
096        this.rdfStream = rdfStream;
097    }
098
099    @Override
100    public void write(final OutputStream output) {
101        LOGGER.debug("Serializing RDF stream in: {}", format);
102        try {
103            write(asStatements(), output, format);
104        } catch (final RDFHandlerException e) {
105            setException(e);
106            LOGGER.debug("Error serializing RDF", e);
107            throw new WebApplicationException(e);
108        }
109    }
110
111    private void write(final Iterable<Statement> model,
112                       final OutputStream output,
113                       final RDFFormat dataFormat)
114            throws RDFHandlerException {
115        final WriterConfig settings = new WriterConfig();
116        final RDFWriter writer = Rio.createWriter(dataFormat, output);
117        writer.setWriterConfig(settings);
118
119        for (final Map.Entry<String, String> namespace : excludeProtectedNamespaces(rdfStream.namespaces())) {
120            writer.handleNamespace(namespace.getKey(), namespace.getValue());
121        }
122
123        Rio.write(model, writer);
124    }
125
126    private static Iterable<Map.Entry<String, String>> excludeProtectedNamespaces(
127            final Map<String, String> namespaces) {
128        /**
129         * We exclude:
130         *  - xmlns, which Sesame helpfully serializes, but normal parsers may complain
131         *     about in some serializations (e.g. RDF/XML where xmlns:xmlns is forbidden by XML);
132         */
133        return Iterables.filter(namespaces.entrySet(), new Predicate<Map.Entry<String, String>>() {
134            @Override
135            public boolean apply(final Map.Entry<String, String> input) {
136                return !input.getKey().equals("xmlns");
137            }
138        });
139    }
140
141
142    private Iterable<Statement> asStatements() {
143        return new Iterable<Statement>() {
144
145            @Override
146            public Iterator<Statement> iterator() {
147                return rdfStream.transform(toStatement);
148            }
149        };
150    }
151
152    protected static final Function<? super Triple, Statement> toStatement =
153        new Function<Triple, Statement>() {
154
155            @Override
156            public Statement apply(final Triple t) {
157                final Resource subject = getResourceForSubject(t.getSubject());
158                final URI predicate = vfactory.createURI(t.getPredicate().getURI());
159                final Value object = getValueForObject(t.getObject());
160                return vfactory.createStatement(subject, predicate, object);
161            }
162
163        };
164
165    private static Resource getResourceForSubject(final Node subjectNode) {
166        final Resource subject;
167
168        if (subjectNode.isBlank()) {
169            subject = vfactory.createBNode(subjectNode.getBlankNodeLabel());
170        } else {
171            subject = vfactory.createURI(subjectNode.getURI());
172        }
173
174        return subject;
175    }
176
177    protected static Value getValueForObject(final Node object) {
178        final Value value;
179        if (object.isURI()) {
180            value = vfactory.createURI(object.getURI());
181        } else if (object.isBlank()) {
182            value = vfactory.createBNode(object.getBlankNodeLabel());
183        } else if (object.isLiteral()) {
184            final String literalValue = object.getLiteralLexicalForm();
185
186            final String literalDatatypeURI = object.getLiteralDatatypeURI();
187
188            if (!object.getLiteralLanguage().isEmpty()) {
189                value = vfactory.createLiteral(literalValue, object.getLiteralLanguage());
190            } else if (literalDatatypeURI != null) {
191                final URI uri = vfactory.createURI(literalDatatypeURI);
192                value = vfactory.createLiteral(literalValue, uri);
193            } else {
194                value = createLiteral(vfactory, object.getLiteralValue());
195            }
196        } else {
197            // should not happen..
198            throw new AssertionError("Unable to convert " + object + " to a value");
199        }
200
201        return value;
202    }
203}