001/*
002 * The contents of this file are subject to the license and copyright
003 * detailed in the LICENSE and NOTICE files at the root of the source
004 * tree.
005 */
006package org.fcrepo.kernel.impl.services;
007
008import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_ID_PREFIX;
009import static org.slf4j.LoggerFactory.getLogger;
010
011import java.util.List;
012import java.util.Map;
013import java.util.function.Predicate;
014import java.util.stream.Collectors;
015import java.util.stream.Stream;
016
017import javax.annotation.Nonnull;
018import javax.annotation.PostConstruct;
019import javax.inject.Inject;
020import javax.sql.DataSource;
021
022import org.fcrepo.kernel.api.ContainmentIndex;
023import org.fcrepo.kernel.api.RdfStream;
024import org.fcrepo.kernel.api.Transaction;
025import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
026import org.fcrepo.kernel.api.identifiers.FedoraId;
027import org.fcrepo.kernel.api.models.FedoraResource;
028import org.fcrepo.kernel.api.models.NonRdfSourceDescription;
029import org.fcrepo.kernel.api.observer.EventAccumulator;
030import org.fcrepo.kernel.api.rdf.DefaultRdfStream;
031import org.fcrepo.kernel.api.services.ReferenceService;
032import org.fcrepo.kernel.impl.operations.ReferenceOperation;
033import org.fcrepo.kernel.impl.operations.ReferenceOperationBuilder;
034
035import org.apache.jena.graph.Node;
036import org.apache.jena.graph.NodeFactory;
037import org.apache.jena.graph.Triple;
038import org.apache.jena.sparql.core.Quad;
039import org.slf4j.Logger;
040import org.springframework.beans.factory.annotation.Autowired;
041import org.springframework.beans.factory.annotation.Qualifier;
042import org.springframework.jdbc.core.RowMapper;
043import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
044import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
045import org.springframework.stereotype.Component;
046import org.springframework.transaction.annotation.Propagation;
047import org.springframework.transaction.annotation.Transactional;
048
049/**
050 * Implementation of reference service.
051 * @author whikloj
052 * @since 6.0.0
053 */
054@Component("referenceServiceImpl")
055public class ReferenceServiceImpl implements ReferenceService {
056
057    private static final Logger LOGGER = getLogger(ReferenceServiceImpl.class);
058
059    @Inject
060    private DataSource dataSource;
061
062    @Inject
063    private EventAccumulator eventAccumulator;
064
065    @Autowired
066    @Qualifier("containmentIndex")
067    private ContainmentIndex containmentIndex;
068
069    private NamedParameterJdbcTemplate jdbcTemplate;
070
071    private static final String TABLE_NAME = "reference";
072
073    private static final String TRANSACTION_TABLE = "reference_transaction_operations";
074
075    private static final String RESOURCE_COLUMN = "fedora_id";
076
077    private static final String SUBJECT_COLUMN = "subject_id";
078
079    private static final String PROPERTY_COLUMN = "property";
080
081    private static final String TARGET_COLUMN = "target_id";
082
083    private static final String OPERATION_COLUMN = "operation";
084
085    private static final String TRANSACTION_COLUMN = "transaction_id";
086
087    private static final String SELECT_INBOUND = "SELECT " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + " FROM " +
088            TABLE_NAME + " WHERE " + TARGET_COLUMN + " = :targetId";
089
090    private static final String SELECT_INBOUND_IN_TRANSACTION = "SELECT x." + SUBJECT_COLUMN + ", x." +
091            PROPERTY_COLUMN + " FROM " + "(SELECT " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + " FROM " + TABLE_NAME +
092            " WHERE " + TARGET_COLUMN + " = :targetId UNION " + "SELECT " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN +
093            " FROM " + TRANSACTION_TABLE + " WHERE " + TARGET_COLUMN + " = :targetId AND "
094            + TRANSACTION_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " = 'add') x WHERE NOT EXISTS " +
095            "(SELECT 1 FROM " + TRANSACTION_TABLE + " WHERE " + TARGET_COLUMN + " = :targetId AND " +
096            OPERATION_COLUMN + " = 'delete')";
097
098    private static final String SELECT_OUTBOUND = "SELECT " + SUBJECT_COLUMN + ", " + TARGET_COLUMN + ", " +
099            PROPERTY_COLUMN + " FROM " + TABLE_NAME + " WHERE " + RESOURCE_COLUMN + " = :resourceId";
100
101    private static final String SELECT_OUTBOUND_IN_TRANSACTION = "SELECT x." + SUBJECT_COLUMN + ", x." + TARGET_COLUMN +
102            ", x." + PROPERTY_COLUMN + " FROM " + "(SELECT " + SUBJECT_COLUMN + ", " + TARGET_COLUMN + ", " +
103            PROPERTY_COLUMN + " FROM " + TABLE_NAME + " WHERE " + RESOURCE_COLUMN + " = :resourceId UNION " +
104            "SELECT " + SUBJECT_COLUMN + ", " + TARGET_COLUMN + ", " + PROPERTY_COLUMN + " FROM " + TRANSACTION_TABLE +
105            " WHERE " + RESOURCE_COLUMN + " = :resourceId " + "AND " + TRANSACTION_COLUMN + " = :transactionId AND " +
106            OPERATION_COLUMN + " = 'add') x WHERE NOT EXISTS (SELECT 1 FROM " + TRANSACTION_TABLE + " WHERE " +
107            RESOURCE_COLUMN + " = :resourceId AND " + OPERATION_COLUMN + " = 'delete')";
108
109    private static final String INSERT_REFERENCE_IN_TRANSACTION = "INSERT INTO " + TRANSACTION_TABLE + "(" +
110            RESOURCE_COLUMN + ", " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + ", " +
111            TRANSACTION_COLUMN + ", " + OPERATION_COLUMN + ") VALUES (:resourceId, :subjectId, :property, :targetId, " +
112            ":transactionId, 'add')";
113
114    private static final String INSERT_REFERENCE_DIRECT = "INSERT INTO " + TABLE_NAME + "(" +
115            RESOURCE_COLUMN + ", " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN +
116            ") VALUES (:resourceId, :subjectId, :property, :targetId)";
117
118    private static final String UNDO_INSERT_REFERENCE_IN_TRANSACTION = "DELETE FROM " + TRANSACTION_TABLE + " WHERE " +
119            RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " + PROPERTY_COLUMN +
120            " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN + " = :transactionId AND " +
121            OPERATION_COLUMN + " = 'add'";
122
123    private static final String DELETE_REFERENCE_IN_TRANSACTION = "INSERT INTO " + TRANSACTION_TABLE + "(" +
124            RESOURCE_COLUMN + ", " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + ", " +
125            TRANSACTION_COLUMN + ", " + OPERATION_COLUMN + ") VALUES (:resourceId, :subjectId, :property, :targetId, " +
126            ":transactionId, 'delete')";
127
128    private static final String DELETE_REFERENCE_DIRECT = "DELETE FROM reference" +
129            " WHERE fedora_id = :resourceId AND subject_id = :subjectId" +
130            " AND property = :property AND target_id = :targetId";
131
132    private static final String UNDO_DELETE_REFERENCE_IN_TRANSACTION = "DELETE FROM " + TRANSACTION_TABLE + " WHERE " +
133            RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " + PROPERTY_COLUMN +
134            " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN + " = :transactionId AND " +
135            OPERATION_COLUMN + " = 'delete'";
136
137    private static final String IS_REFERENCE_ADDED_IN_TRANSACTION = "SELECT TRUE FROM " + TRANSACTION_TABLE + " WHERE "
138            + RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " + PROPERTY_COLUMN +
139            " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN + " = :transactionId AND " +
140            OPERATION_COLUMN + " = 'add'";
141
142    private static final String IS_REFERENCE_DELETED_IN_TRANSACTION = "SELECT TRUE FROM " + TRANSACTION_TABLE +
143            " WHERE " + RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " +
144            PROPERTY_COLUMN + " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN +
145            " = :transactionId AND " + OPERATION_COLUMN + " = 'delete'";
146
147    private static final String COMMIT_ADD_RECORDS = "INSERT INTO " + TABLE_NAME + " ( " + RESOURCE_COLUMN + ", " +
148            SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + " ) SELECT " + RESOURCE_COLUMN + ", " +
149            SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + " FROM " + TRANSACTION_TABLE + " WHERE " +
150            TRANSACTION_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " = 'add'";
151
152    private static final String COMMIT_DELETE_RECORDS = "DELETE FROM " + TABLE_NAME + " WHERE " +
153            "EXISTS (SELECT * FROM " + TRANSACTION_TABLE + " t WHERE t." +
154            TRANSACTION_COLUMN + " = :transactionId AND t." +  OPERATION_COLUMN + " = 'delete' AND" +
155            " t." + RESOURCE_COLUMN + " = " + TABLE_NAME + "." + RESOURCE_COLUMN + " AND" +
156            " t." + SUBJECT_COLUMN + " = " + TABLE_NAME + "." + SUBJECT_COLUMN +
157            " AND t." + PROPERTY_COLUMN + " = " + TABLE_NAME + "." + PROPERTY_COLUMN +
158            " AND t." + TARGET_COLUMN + " = " + TABLE_NAME + "." + TARGET_COLUMN + ")";
159
160    private static final String DELETE_TRANSACTION = "DELETE FROM " + TRANSACTION_TABLE + " WHERE " +
161            TRANSACTION_COLUMN + " = :transactionId";
162
163    private static final String TRUNCATE_TABLE = "TRUNCATE TABLE " + TABLE_NAME;
164
165    @PostConstruct
166    public void setUp() {
167        jdbcTemplate = new NamedParameterJdbcTemplate(getDataSource());
168    }
169
170    @Override
171    public RdfStream getInboundReferences(@Nonnull final Transaction tx, final FedoraResource resource) {
172        final String resourceId = resource.getFedoraId().getFullId();
173        final Node subject = NodeFactory.createURI(resourceId);
174        final Stream<Triple> stream = getReferencesInternal(tx, resourceId);
175        if (resource instanceof NonRdfSourceDescription) {
176            final Stream<Triple> stream2 = getReferencesInternal(tx, resource.getFedoraId().getBaseId());
177            return new DefaultRdfStream(subject, Stream.concat(stream, stream2));
178        }
179        return new DefaultRdfStream(subject, stream);
180    }
181
182    /**
183     * Get the inbound references for the resource Id and the transaction id.
184     * @param tx transaction or null for none.
185     * @param targetId the id that will be the target of references.
186     * @return RDF stream of inbound references
187     */
188    private Stream<Triple> getReferencesInternal(final Transaction tx, final String targetId) {
189        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
190        parameterSource.addValue("targetId", targetId);
191        final Node targetNode = NodeFactory.createURI(targetId);
192
193        final RowMapper<Triple> inboundMapper = (rs, rowNum) ->
194                Triple.create(NodeFactory.createURI(rs.getString(SUBJECT_COLUMN)),
195                        NodeFactory.createURI(rs.getString(PROPERTY_COLUMN)),
196                        targetNode);
197
198        final String query;
199
200        if (tx.isOpenLongRunning()) {
201            // we are in a transaction
202            parameterSource.addValue("transactionId", tx.getId());
203            query = SELECT_INBOUND_IN_TRANSACTION;
204        } else {
205            // not in a transaction
206            query = SELECT_INBOUND;
207        }
208
209        final var references = jdbcTemplate.query(query, parameterSource, inboundMapper);
210
211        LOGGER.debug("getInboundReferences for {} in transaction {} found {} references",
212                targetId, tx, references.size());
213        return references.stream();
214    }
215
216    @Override
217    public void deleteAllReferences(@Nonnull final Transaction tx, final FedoraId resourceId) {
218        final List<Quad> deleteReferences = getOutboundReferences(tx, resourceId);
219        if (resourceId.isDescription()) {
220            // Also get the binary references
221            deleteReferences.addAll(getOutboundReferences(tx, resourceId.asBaseId()));
222        }
223        // Remove all the existing references.
224        deleteReferences.forEach(t -> removeReference(tx, t));
225    }
226
227    /**
228     * Get a stream of quads of resources being referenced from the provided resource, the graph of the quad is the
229     * URI of the resource the reference is from.
230     * @param tx transaction Id or null if none.
231     * @param resourceId the resource Id.
232     * @return list of Quads
233     */
234    private List<Quad> getOutboundReferences(final Transaction tx, final FedoraId resourceId) {
235        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
236        parameterSource.addValue("resourceId", resourceId.getFullId());
237        final Node subjectNode = NodeFactory.createURI(resourceId.getFullId());
238
239        final RowMapper<Quad> outboundMapper = (rs, rowNum) ->
240                Quad.create(subjectNode,
241                        NodeFactory.createURI(rs.getString(SUBJECT_COLUMN)),
242                        NodeFactory.createURI(rs.getString(PROPERTY_COLUMN)),
243                        NodeFactory.createURI(rs.getString(TARGET_COLUMN)));
244
245        final String query;
246
247        if (tx.isOpenLongRunning()) {
248            // we are in a long-running transaction
249            parameterSource.addValue("transactionId", tx.getId());
250            query = SELECT_OUTBOUND_IN_TRANSACTION;
251        } else {
252            // not in a transaction or in a short-lived transaction
253            query = SELECT_OUTBOUND;
254        }
255
256        final var references = jdbcTemplate.query(query, parameterSource, outboundMapper);
257
258        LOGGER.debug("getOutboundReferences for {} in transaction {} found {} references",
259                resourceId, tx, references.size());
260        return references;
261    }
262
263    @Override
264    public void updateReferences(@Nonnull final Transaction tx, final FedoraId resourceId, final String userPrincipal,
265                                 final RdfStream rdfStream) {
266        try {
267            final List<Triple> addReferences = getReferencesFromRdf(rdfStream).collect(Collectors.toList());
268            // This predicate checks for items we are adding, so we don't bother to delete and then re-add them.
269            final Predicate<Quad> notInAdds = q -> !addReferences.contains(q.asTriple());
270            // References from this resource.
271            final List<Quad> existingReferences = getOutboundReferences(tx, resourceId);
272            if (resourceId.isDescription()) {
273                // Resource is a binary description so also get the binary references.
274                existingReferences.addAll(getOutboundReferences(tx, resourceId.asBaseId()));
275            }
276            // Remove any existing references not being re-added.
277            existingReferences.stream().filter(notInAdds).forEach(t -> removeReference(tx, t));
278            final Node resourceNode = NodeFactory.createURI(resourceId.getFullId());
279            // This predicate checks for references that didn't already exist in the database.
280            final Predicate<Triple> alreadyExists = t -> !existingReferences.contains(Quad.create(resourceNode, t));
281            // Add the new references.
282            addReferences.stream().filter(alreadyExists).forEach(r ->
283                    addReference(tx, Quad.create(resourceNode, r), userPrincipal));
284        } catch (final Exception e) {
285            LOGGER.warn("Unable to update reference index for resource {} in transaction {}: {}",
286                    resourceId.getFullId(), tx.getId(), e.getMessage());
287            throw new RepositoryRuntimeException("Unable to update reference index", e);
288        }
289    }
290
291    @Override
292    public void commitTransaction(final Transaction tx) {
293        if (!tx.isShortLived()) {
294            tx.ensureCommitting();
295            try {
296                final Map<String, String> parameterSource = Map.of("transactionId", tx.getId());
297                jdbcTemplate.update(COMMIT_DELETE_RECORDS, parameterSource);
298                jdbcTemplate.update(COMMIT_ADD_RECORDS, parameterSource);
299                jdbcTemplate.update(DELETE_TRANSACTION, parameterSource);
300            } catch (final Exception e) {
301                LOGGER.warn("Unable to commit reference index transaction {}: {}", tx, e.getMessage());
302                throw new RepositoryRuntimeException("Unable to commit reference index transaction", e);
303            }
304        }
305    }
306
307    @Transactional(propagation = Propagation.NOT_SUPPORTED)
308    @Override
309    public void rollbackTransaction(final Transaction tx) {
310        if (!tx.isShortLived()) {
311            try {
312                final Map<String, String> parameterSource = Map.of("transactionId", tx.getId());
313                jdbcTemplate.update(DELETE_TRANSACTION, parameterSource);
314            } catch (final Exception e) {
315                LOGGER.warn("Unable to rollback reference index transaction {}: {}", tx, e.getMessage());
316                throw new RepositoryRuntimeException("Unable to rollback reference index transaction", e);
317            }
318        }
319    }
320
321    @Override
322    public void reset() {
323        try {
324            jdbcTemplate.update(TRUNCATE_TABLE, Map.of());
325        } catch (final Exception e) {
326            LOGGER.warn("Unable to reset reference index: {}", e.getMessage());
327            throw new RepositoryRuntimeException("Unable to reset reference index", e);
328        }
329    }
330
331    /**
332     * Remove a reference.
333     * @param tx the transaction
334     * @param reference the quad with the reference, is Quad(resourceId, subjectId, propertyId, targetId)
335     */
336    private void removeReference(final Transaction tx, final Quad reference) {
337        tx.doInTx(() -> {
338            final var parameterSource = new MapSqlParameterSource();
339            parameterSource.addValue("resourceId", reference.getGraph().getURI());
340            parameterSource.addValue("subjectId", reference.getSubject().getURI());
341            parameterSource.addValue("property", reference.getPredicate().getURI());
342            parameterSource.addValue("targetId", reference.getObject().getURI());
343
344            if (!tx.isShortLived()) {
345                parameterSource.addValue("transactionId", tx.getId());
346                final boolean addedInTx = !jdbcTemplate.queryForList(IS_REFERENCE_ADDED_IN_TRANSACTION, parameterSource)
347                        .isEmpty();
348                if (addedInTx) {
349                    jdbcTemplate.update(UNDO_INSERT_REFERENCE_IN_TRANSACTION, parameterSource);
350                } else {
351                    jdbcTemplate.update(DELETE_REFERENCE_IN_TRANSACTION, parameterSource);
352                }
353            } else {
354                jdbcTemplate.update(DELETE_REFERENCE_DIRECT, parameterSource);
355            }
356        });
357    }
358
359    /**
360     * Add a reference
361     * @param transaction the transaction Id.
362     * @param reference the quad with the reference, is is Quad(resourceId, subjectId, propertyId, targetId)
363     * @param userPrincipal the user adding the reference.
364     */
365    private void addReference(@Nonnull final Transaction transaction, final Quad reference,
366                              final String userPrincipal) {
367        transaction.doInTx(() -> {
368            final String targetId = reference.getObject().getURI();
369
370            final var parameterSource = new MapSqlParameterSource();
371            parameterSource.addValue("resourceId", reference.getGraph().getURI());
372            parameterSource.addValue("subjectId", reference.getSubject().getURI());
373            parameterSource.addValue("property", reference.getPredicate().getURI());
374            parameterSource.addValue("targetId", targetId);
375
376            if (!transaction.isShortLived()) {
377                parameterSource.addValue("transactionId", transaction.getId());
378                final boolean addedInTx = !jdbcTemplate.queryForList(
379                        IS_REFERENCE_DELETED_IN_TRANSACTION, parameterSource)
380                        .isEmpty();
381                if (addedInTx) {
382                    jdbcTemplate.update(UNDO_DELETE_REFERENCE_IN_TRANSACTION, parameterSource);
383                } else {
384                    jdbcTemplate.update(INSERT_REFERENCE_IN_TRANSACTION, parameterSource);
385                    recordEvent(transaction, targetId, userPrincipal);
386                }
387            } else {
388                jdbcTemplate.update(INSERT_REFERENCE_DIRECT, parameterSource);
389                recordEvent(transaction, targetId, userPrincipal);
390            }
391        });
392    }
393
394    /**
395     * Record the inbound reference event if the target exists.
396     * @param transaction the transaction.
397     * @param resourceId the id of the target of the inbound reference.
398     * @param userPrincipal the user making the reference.
399     */
400    private void recordEvent(final Transaction transaction, final String resourceId, final String userPrincipal) {
401        final FedoraId fedoraId = FedoraId.create(resourceId);
402        if (this.containmentIndex.resourceExists(transaction, fedoraId, false)) {
403            this.eventAccumulator.recordEventForOperation(transaction, fedoraId, getOperation(transaction, fedoraId,
404                    userPrincipal));
405        }
406    }
407
408    /**
409     * Create a ReferenceOperation for the current add.
410     * @param tx the transaction for the current operation.
411     * @param id the target resource of the reference.
412     * @param user the user making the change
413     * @return a ReferenceOperation
414     */
415    private static ReferenceOperation getOperation(final Transaction tx, final FedoraId id, final String user) {
416        final ReferenceOperationBuilder builder = new ReferenceOperationBuilder(tx, id);
417        builder.userPrincipal(user);
418        return builder.build();
419    }
420
421    /**
422     * Utility to filter a RDFStream to just the URIs from subjects and objects within the repository.
423     * @param stream the provided stream
424     * @return stream of triples with internal references.
425     */
426    private Stream<Triple> getReferencesFromRdf(final RdfStream stream) {
427        final Predicate<Triple> isInternalReference = t -> {
428            final Node s = t.getSubject();
429            final Node o = t.getObject();
430            return (s.isURI() && s.getURI().startsWith(FEDORA_ID_PREFIX) && o.isURI() &&
431                    o.getURI().startsWith(FEDORA_ID_PREFIX));
432        };
433        return stream.filter(isInternalReference);
434    }
435
436    /**
437     * Set the JDBC datastore.
438     * @param dataSource the dataStore.
439     */
440    public void setDataSource(final DataSource dataSource) {
441        this.dataSource = dataSource;
442    }
443
444    /**
445     * Get the JDBC datastore.
446     * @return the dataStore.
447     */
448    public DataSource getDataSource() {
449        return dataSource;
450    }
451}