001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.kernel.impl.services;
019
020import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_ID_PREFIX;
021import static org.slf4j.LoggerFactory.getLogger;
022
023import java.util.List;
024import java.util.Map;
025import java.util.function.Predicate;
026import java.util.stream.Collectors;
027import java.util.stream.Stream;
028
029import javax.annotation.Nonnull;
030import javax.annotation.PostConstruct;
031import javax.inject.Inject;
032import javax.sql.DataSource;
033
034import org.fcrepo.kernel.api.ContainmentIndex;
035import org.fcrepo.kernel.api.RdfStream;
036import org.fcrepo.kernel.api.Transaction;
037import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
038import org.fcrepo.kernel.api.identifiers.FedoraId;
039import org.fcrepo.kernel.api.models.FedoraResource;
040import org.fcrepo.kernel.api.models.NonRdfSourceDescription;
041import org.fcrepo.kernel.api.observer.EventAccumulator;
042import org.fcrepo.kernel.api.rdf.DefaultRdfStream;
043import org.fcrepo.kernel.api.services.ReferenceService;
044import org.fcrepo.kernel.impl.operations.ReferenceOperation;
045import org.fcrepo.kernel.impl.operations.ReferenceOperationBuilder;
046
047import org.apache.jena.graph.Node;
048import org.apache.jena.graph.NodeFactory;
049import org.apache.jena.graph.Triple;
050import org.apache.jena.sparql.core.Quad;
051import org.slf4j.Logger;
052import org.springframework.beans.factory.annotation.Autowired;
053import org.springframework.beans.factory.annotation.Qualifier;
054import org.springframework.jdbc.core.RowMapper;
055import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
056import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
057import org.springframework.stereotype.Component;
058import org.springframework.transaction.annotation.Propagation;
059import org.springframework.transaction.annotation.Transactional;
060
061/**
062 * Implementation of reference service.
063 * @author whikloj
064 * @since 6.0.0
065 */
066@Component("referenceServiceImpl")
067public class ReferenceServiceImpl implements ReferenceService {
068
069    private static final Logger LOGGER = getLogger(ReferenceServiceImpl.class);
070
071    @Inject
072    private DataSource dataSource;
073
074    @Inject
075    private EventAccumulator eventAccumulator;
076
077    @Autowired
078    @Qualifier("containmentIndex")
079    private ContainmentIndex containmentIndex;
080
081    private NamedParameterJdbcTemplate jdbcTemplate;
082
083    private static final String TABLE_NAME = "reference";
084
085    private static final String TRANSACTION_TABLE = "reference_transaction_operations";
086
087    private static final String RESOURCE_COLUMN = "fedora_id";
088
089    private static final String SUBJECT_COLUMN = "subject_id";
090
091    private static final String PROPERTY_COLUMN = "property";
092
093    private static final String TARGET_COLUMN = "target_id";
094
095    private static final String OPERATION_COLUMN = "operation";
096
097    private static final String TRANSACTION_COLUMN = "transaction_id";
098
099    private static final String SELECT_INBOUND = "SELECT " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + " FROM " +
100            TABLE_NAME + " WHERE " + TARGET_COLUMN + " = :targetId";
101
102    private static final String SELECT_INBOUND_IN_TRANSACTION = "SELECT x." + SUBJECT_COLUMN + ", x." +
103            PROPERTY_COLUMN + " FROM " + "(SELECT " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + " FROM " + TABLE_NAME +
104            " WHERE " + TARGET_COLUMN + " = :targetId UNION " + "SELECT " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN +
105            " FROM " + TRANSACTION_TABLE + " WHERE " + TARGET_COLUMN + " = :targetId AND "
106            + TRANSACTION_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " = 'add') x WHERE NOT EXISTS " +
107            "(SELECT 1 FROM " + TRANSACTION_TABLE + " WHERE " + TARGET_COLUMN + " = :targetId AND " +
108            OPERATION_COLUMN + " = 'delete')";
109
110    private static final String SELECT_OUTBOUND = "SELECT " + SUBJECT_COLUMN + ", " + TARGET_COLUMN + ", " +
111            PROPERTY_COLUMN + " FROM " + TABLE_NAME + " WHERE " + RESOURCE_COLUMN + " = :resourceId";
112
113    private static final String SELECT_OUTBOUND_IN_TRANSACTION = "SELECT x." + SUBJECT_COLUMN + ", x." + TARGET_COLUMN +
114            ", x." + PROPERTY_COLUMN + " FROM " + "(SELECT " + SUBJECT_COLUMN + ", " + TARGET_COLUMN + ", " +
115            PROPERTY_COLUMN + " FROM " + TABLE_NAME + " WHERE " + RESOURCE_COLUMN + " = :resourceId UNION " +
116            "SELECT " + SUBJECT_COLUMN + ", " + TARGET_COLUMN + ", " + PROPERTY_COLUMN + " FROM " + TRANSACTION_TABLE +
117            " WHERE " + RESOURCE_COLUMN + " = :resourceId " + "AND " + TRANSACTION_COLUMN + " = :transactionId AND " +
118            OPERATION_COLUMN + " = 'add') x WHERE NOT EXISTS (SELECT 1 FROM " + TRANSACTION_TABLE + " WHERE " +
119            RESOURCE_COLUMN + " = :resourceId AND " + OPERATION_COLUMN + " = 'delete')";
120
121    private static final String INSERT_REFERENCE_IN_TRANSACTION = "INSERT INTO " + TRANSACTION_TABLE + "(" +
122            RESOURCE_COLUMN + ", " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + ", " +
123            TRANSACTION_COLUMN + ", " + OPERATION_COLUMN + ") VALUES (:resourceId, :subjectId, :property, :targetId, " +
124            ":transactionId, 'add')";
125
126    private static final String INSERT_REFERENCE_DIRECT = "INSERT INTO " + TABLE_NAME + "(" +
127            RESOURCE_COLUMN + ", " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN +
128            ") VALUES (:resourceId, :subjectId, :property, :targetId)";
129
130    private static final String UNDO_INSERT_REFERENCE_IN_TRANSACTION = "DELETE FROM " + TRANSACTION_TABLE + " WHERE " +
131            RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " + PROPERTY_COLUMN +
132            " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN + " = :transactionId AND " +
133            OPERATION_COLUMN + " = 'add'";
134
135    private static final String DELETE_REFERENCE_IN_TRANSACTION = "INSERT INTO " + TRANSACTION_TABLE + "(" +
136            RESOURCE_COLUMN + ", " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + ", " +
137            TRANSACTION_COLUMN + ", " + OPERATION_COLUMN + ") VALUES (:resourceId, :subjectId, :property, :targetId, " +
138            ":transactionId, 'delete')";
139
140    private static final String DELETE_REFERENCE_DIRECT = "DELETE FROM reference" +
141            " WHERE fedora_id = :resourceId AND subject_id = :subjectId" +
142            " AND property = :property AND target_id = :targetId";
143
144    private static final String UNDO_DELETE_REFERENCE_IN_TRANSACTION = "DELETE FROM " + TRANSACTION_TABLE + " WHERE " +
145            RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " + PROPERTY_COLUMN +
146            " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN + " = :transactionId AND " +
147            OPERATION_COLUMN + " = 'delete'";
148
149    private static final String IS_REFERENCE_ADDED_IN_TRANSACTION = "SELECT TRUE FROM " + TRANSACTION_TABLE + " WHERE "
150            + RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " + PROPERTY_COLUMN +
151            " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN + " = :transactionId AND " +
152            OPERATION_COLUMN + " = 'add'";
153
154    private static final String IS_REFERENCE_DELETED_IN_TRANSACTION = "SELECT TRUE FROM " + TRANSACTION_TABLE +
155            " WHERE " + RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " +
156            PROPERTY_COLUMN + " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN +
157            " = :transactionId AND " + OPERATION_COLUMN + " = 'delete'";
158
159    private static final String COMMIT_ADD_RECORDS = "INSERT INTO " + TABLE_NAME + " ( " + RESOURCE_COLUMN + ", " +
160            SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + " ) SELECT " + RESOURCE_COLUMN + ", " +
161            SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + " FROM " + TRANSACTION_TABLE + " WHERE " +
162            TRANSACTION_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " = 'add'";
163
164    private static final String COMMIT_DELETE_RECORDS = "DELETE FROM " + TABLE_NAME + " WHERE " +
165            "EXISTS (SELECT * FROM " + TRANSACTION_TABLE + " t WHERE t." +
166            TRANSACTION_COLUMN + " = :transactionId AND t." +  OPERATION_COLUMN + " = 'delete' AND" +
167            " t." + RESOURCE_COLUMN + " = " + TABLE_NAME + "." + RESOURCE_COLUMN + " AND" +
168            " t." + SUBJECT_COLUMN + " = " + TABLE_NAME + "." + SUBJECT_COLUMN +
169            " AND t." + PROPERTY_COLUMN + " = " + TABLE_NAME + "." + PROPERTY_COLUMN +
170            " AND t." + TARGET_COLUMN + " = " + TABLE_NAME + "." + TARGET_COLUMN + ")";
171
172    private static final String DELETE_TRANSACTION = "DELETE FROM " + TRANSACTION_TABLE + " WHERE " +
173            TRANSACTION_COLUMN + " = :transactionId";
174
175    private static final String TRUNCATE_TABLE = "TRUNCATE TABLE " + TABLE_NAME;
176
177    @PostConstruct
178    public void setUp() {
179        jdbcTemplate = new NamedParameterJdbcTemplate(getDataSource());
180    }
181
182    @Override
183    public RdfStream getInboundReferences(@Nonnull final Transaction tx, final FedoraResource resource) {
184        final String resourceId = resource.getFedoraId().getFullId();
185        final Node subject = NodeFactory.createURI(resourceId);
186        final Stream<Triple> stream = getReferencesInternal(tx, resourceId);
187        if (resource instanceof NonRdfSourceDescription) {
188            final Stream<Triple> stream2 = getReferencesInternal(tx, resource.getFedoraId().getBaseId());
189            return new DefaultRdfStream(subject, Stream.concat(stream, stream2));
190        }
191        return new DefaultRdfStream(subject, stream);
192    }
193
194    /**
195     * Get the inbound references for the resource Id and the transaction id.
196     * @param tx transaction or null for none.
197     * @param targetId the id that will be the target of references.
198     * @return RDF stream of inbound references
199     */
200    private Stream<Triple> getReferencesInternal(final Transaction tx, final String targetId) {
201        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
202        parameterSource.addValue("targetId", targetId);
203        final Node targetNode = NodeFactory.createURI(targetId);
204
205        final RowMapper<Triple> inboundMapper = (rs, rowNum) ->
206                Triple.create(NodeFactory.createURI(rs.getString(SUBJECT_COLUMN)),
207                        NodeFactory.createURI(rs.getString(PROPERTY_COLUMN)),
208                        targetNode);
209
210        final String query;
211
212        if (tx.isOpenLongRunning()) {
213            // we are in a transaction
214            parameterSource.addValue("transactionId", tx.getId());
215            query = SELECT_INBOUND_IN_TRANSACTION;
216        } else {
217            // not in a transaction
218            query = SELECT_INBOUND;
219        }
220
221        final var references = jdbcTemplate.query(query, parameterSource, inboundMapper);
222
223        LOGGER.debug("getInboundReferences for {} in transaction {} found {} references",
224                targetId, tx, references.size());
225        return references.stream();
226    }
227
228    @Override
229    public void deleteAllReferences(@Nonnull final Transaction tx, final FedoraId resourceId) {
230        final List<Quad> deleteReferences = getOutboundReferences(tx, resourceId);
231        if (resourceId.isDescription()) {
232            // Also get the binary references
233            deleteReferences.addAll(getOutboundReferences(tx, resourceId.asBaseId()));
234        }
235        // Remove all the existing references.
236        deleteReferences.forEach(t -> removeReference(tx, t));
237    }
238
239    /**
240     * Get a stream of quads of resources being referenced from the provided resource, the graph of the quad is the
241     * URI of the resource the reference is from.
242     * @param tx transaction Id or null if none.
243     * @param resourceId the resource Id.
244     * @return list of Quads
245     */
246    private List<Quad> getOutboundReferences(final Transaction tx, final FedoraId resourceId) {
247        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
248        parameterSource.addValue("resourceId", resourceId.getFullId());
249        final Node subjectNode = NodeFactory.createURI(resourceId.getFullId());
250
251        final RowMapper<Quad> outboundMapper = (rs, rowNum) ->
252                Quad.create(subjectNode,
253                        NodeFactory.createURI(rs.getString(SUBJECT_COLUMN)),
254                        NodeFactory.createURI(rs.getString(PROPERTY_COLUMN)),
255                        NodeFactory.createURI(rs.getString(TARGET_COLUMN)));
256
257        final String query;
258
259        if (tx.isOpenLongRunning()) {
260            // we are in a long-running transaction
261            parameterSource.addValue("transactionId", tx.getId());
262            query = SELECT_OUTBOUND_IN_TRANSACTION;
263        } else {
264            // not in a transaction or in a short-lived transaction
265            query = SELECT_OUTBOUND;
266        }
267
268        final var references = jdbcTemplate.query(query, parameterSource, outboundMapper);
269
270        LOGGER.debug("getOutboundReferences for {} in transaction {} found {} references",
271                resourceId, tx, references.size());
272        return references;
273    }
274
275    @Override
276    public void updateReferences(@Nonnull final Transaction tx, final FedoraId resourceId, final String userPrincipal,
277                                 final RdfStream rdfStream) {
278        try {
279            final List<Triple> addReferences = getReferencesFromRdf(rdfStream).collect(Collectors.toList());
280            // This predicate checks for items we are adding, so we don't bother to delete and then re-add them.
281            final Predicate<Quad> notInAdds = q -> !addReferences.contains(q.asTriple());
282            // References from this resource.
283            final List<Quad> existingReferences = getOutboundReferences(tx, resourceId);
284            if (resourceId.isDescription()) {
285                // Resource is a binary description so also get the binary references.
286                existingReferences.addAll(getOutboundReferences(tx, resourceId.asBaseId()));
287            }
288            // Remove any existing references not being re-added.
289            existingReferences.stream().filter(notInAdds).forEach(t -> removeReference(tx, t));
290            final Node resourceNode = NodeFactory.createURI(resourceId.getFullId());
291            // This predicate checks for references that didn't already exist in the database.
292            final Predicate<Triple> alreadyExists = t -> !existingReferences.contains(Quad.create(resourceNode, t));
293            // Add the new references.
294            addReferences.stream().filter(alreadyExists).forEach(r ->
295                    addReference(tx, Quad.create(resourceNode, r), userPrincipal));
296        } catch (final Exception e) {
297            LOGGER.warn("Unable to update reference index for resource {} in transaction {}: {}",
298                    resourceId.getFullId(), tx.getId(), e.getMessage());
299            throw new RepositoryRuntimeException("Unable to update reference index", e);
300        }
301    }
302
303    @Override
304    public void commitTransaction(final Transaction tx) {
305        if (!tx.isShortLived()) {
306            tx.ensureCommitting();
307            try {
308                final Map<String, String> parameterSource = Map.of("transactionId", tx.getId());
309                jdbcTemplate.update(COMMIT_DELETE_RECORDS, parameterSource);
310                jdbcTemplate.update(COMMIT_ADD_RECORDS, parameterSource);
311                jdbcTemplate.update(DELETE_TRANSACTION, parameterSource);
312            } catch (final Exception e) {
313                LOGGER.warn("Unable to commit reference index transaction {}: {}", tx, e.getMessage());
314                throw new RepositoryRuntimeException("Unable to commit reference index transaction", e);
315            }
316        }
317    }
318
319    @Transactional(propagation = Propagation.NOT_SUPPORTED)
320    @Override
321    public void rollbackTransaction(final Transaction tx) {
322        if (!tx.isShortLived()) {
323            try {
324                final Map<String, String> parameterSource = Map.of("transactionId", tx.getId());
325                jdbcTemplate.update(DELETE_TRANSACTION, parameterSource);
326            } catch (final Exception e) {
327                LOGGER.warn("Unable to rollback reference index transaction {}: {}", tx, e.getMessage());
328                throw new RepositoryRuntimeException("Unable to rollback reference index transaction", e);
329            }
330        }
331    }
332
333    @Override
334    public void reset() {
335        try {
336            jdbcTemplate.update(TRUNCATE_TABLE, Map.of());
337        } catch (final Exception e) {
338            LOGGER.warn("Unable to reset reference index: {}", e.getMessage());
339            throw new RepositoryRuntimeException("Unable to reset reference index", e);
340        }
341    }
342
343    /**
344     * Remove a reference.
345     * @param tx the transaction
346     * @param reference the quad with the reference, is Quad(resourceId, subjectId, propertyId, targetId)
347     */
348    private void removeReference(final Transaction tx, final Quad reference) {
349        tx.doInTx(() -> {
350            final var parameterSource = new MapSqlParameterSource();
351            parameterSource.addValue("resourceId", reference.getGraph().getURI());
352            parameterSource.addValue("subjectId", reference.getSubject().getURI());
353            parameterSource.addValue("property", reference.getPredicate().getURI());
354            parameterSource.addValue("targetId", reference.getObject().getURI());
355
356            if (!tx.isShortLived()) {
357                parameterSource.addValue("transactionId", tx.getId());
358                final boolean addedInTx = !jdbcTemplate.queryForList(IS_REFERENCE_ADDED_IN_TRANSACTION, parameterSource)
359                        .isEmpty();
360                if (addedInTx) {
361                    jdbcTemplate.update(UNDO_INSERT_REFERENCE_IN_TRANSACTION, parameterSource);
362                } else {
363                    jdbcTemplate.update(DELETE_REFERENCE_IN_TRANSACTION, parameterSource);
364                }
365            } else {
366                jdbcTemplate.update(DELETE_REFERENCE_DIRECT, parameterSource);
367            }
368        });
369    }
370
371    /**
372     * Add a reference
373     * @param transaction the transaction Id.
374     * @param reference the quad with the reference, is is Quad(resourceId, subjectId, propertyId, targetId)
375     * @param userPrincipal the user adding the reference.
376     */
377    private void addReference(@Nonnull final Transaction transaction, final Quad reference,
378                              final String userPrincipal) {
379        transaction.doInTx(() -> {
380            final String targetId = reference.getObject().getURI();
381
382            final var parameterSource = new MapSqlParameterSource();
383            parameterSource.addValue("resourceId", reference.getGraph().getURI());
384            parameterSource.addValue("subjectId", reference.getSubject().getURI());
385            parameterSource.addValue("property", reference.getPredicate().getURI());
386            parameterSource.addValue("targetId", targetId);
387
388            if (!transaction.isShortLived()) {
389                parameterSource.addValue("transactionId", transaction.getId());
390                final boolean addedInTx = !jdbcTemplate.queryForList(
391                        IS_REFERENCE_DELETED_IN_TRANSACTION, parameterSource)
392                        .isEmpty();
393                if (addedInTx) {
394                    jdbcTemplate.update(UNDO_DELETE_REFERENCE_IN_TRANSACTION, parameterSource);
395                } else {
396                    jdbcTemplate.update(INSERT_REFERENCE_IN_TRANSACTION, parameterSource);
397                    recordEvent(transaction, targetId, userPrincipal);
398                }
399            } else {
400                jdbcTemplate.update(INSERT_REFERENCE_DIRECT, parameterSource);
401                recordEvent(transaction, targetId, userPrincipal);
402            }
403        });
404    }
405
406    /**
407     * Record the inbound reference event if the target exists.
408     * @param transaction the transaction.
409     * @param resourceId the id of the target of the inbound reference.
410     * @param userPrincipal the user making the reference.
411     */
412    private void recordEvent(final Transaction transaction, final String resourceId, final String userPrincipal) {
413        final FedoraId fedoraId = FedoraId.create(resourceId);
414        if (this.containmentIndex.resourceExists(transaction, fedoraId, false)) {
415            this.eventAccumulator.recordEventForOperation(transaction, fedoraId, getOperation(transaction, fedoraId,
416                    userPrincipal));
417        }
418    }
419
420    /**
421     * Create a ReferenceOperation for the current add.
422     * @param tx the transaction for the current operation.
423     * @param id the target resource of the reference.
424     * @param user the user making the change
425     * @return a ReferenceOperation
426     */
427    private static ReferenceOperation getOperation(final Transaction tx, final FedoraId id, final String user) {
428        final ReferenceOperationBuilder builder = new ReferenceOperationBuilder(tx, id);
429        builder.userPrincipal(user);
430        return builder.build();
431    }
432
433    /**
434     * Utility to filter a RDFStream to just the URIs from subjects and objects within the repository.
435     * @param stream the provided stream
436     * @return stream of triples with internal references.
437     */
438    private Stream<Triple> getReferencesFromRdf(final RdfStream stream) {
439        final Predicate<Triple> isInternalReference = t -> {
440            final Node s = t.getSubject();
441            final Node o = t.getObject();
442            return (s.isURI() && s.getURI().startsWith(FEDORA_ID_PREFIX) && o.isURI() &&
443                    o.getURI().startsWith(FEDORA_ID_PREFIX));
444        };
445        return stream.filter(isInternalReference);
446    }
447
448    /**
449     * Set the JDBC datastore.
450     * @param dataSource the dataStore.
451     */
452    public void setDataSource(final DataSource dataSource) {
453        this.dataSource = dataSource;
454    }
455
456    /**
457     * Get the JDBC datastore.
458     * @return the dataStore.
459     */
460    public DataSource getDataSource() {
461        return dataSource;
462    }
463}