001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.kernel.impl.services; 019 020import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_ID_PREFIX; 021import static org.slf4j.LoggerFactory.getLogger; 022 023import java.util.List; 024import java.util.Map; 025import java.util.function.Predicate; 026import java.util.stream.Collectors; 027import java.util.stream.Stream; 028 029import javax.annotation.Nonnull; 030import javax.annotation.PostConstruct; 031import javax.inject.Inject; 032import javax.sql.DataSource; 033 034import org.fcrepo.kernel.api.ContainmentIndex; 035import org.fcrepo.kernel.api.RdfStream; 036import org.fcrepo.kernel.api.Transaction; 037import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 038import org.fcrepo.kernel.api.identifiers.FedoraId; 039import org.fcrepo.kernel.api.models.FedoraResource; 040import org.fcrepo.kernel.api.models.NonRdfSourceDescription; 041import org.fcrepo.kernel.api.observer.EventAccumulator; 042import org.fcrepo.kernel.api.rdf.DefaultRdfStream; 043import org.fcrepo.kernel.api.services.ReferenceService; 044import org.fcrepo.kernel.impl.operations.ReferenceOperation; 045import org.fcrepo.kernel.impl.operations.ReferenceOperationBuilder; 046 047import org.apache.jena.graph.Node; 048import org.apache.jena.graph.NodeFactory; 049import org.apache.jena.graph.Triple; 050import org.apache.jena.sparql.core.Quad; 051import org.slf4j.Logger; 052import org.springframework.beans.factory.annotation.Autowired; 053import org.springframework.beans.factory.annotation.Qualifier; 054import org.springframework.jdbc.core.RowMapper; 055import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; 056import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; 057import org.springframework.stereotype.Component; 058import org.springframework.transaction.annotation.Propagation; 059import org.springframework.transaction.annotation.Transactional; 060 061/** 062 * Implementation of reference service. 063 * @author whikloj 064 * @since 6.0.0 065 */ 066@Component("referenceServiceImpl") 067public class ReferenceServiceImpl implements ReferenceService { 068 069 private static final Logger LOGGER = getLogger(ReferenceServiceImpl.class); 070 071 @Inject 072 private DataSource dataSource; 073 074 @Inject 075 private EventAccumulator eventAccumulator; 076 077 @Autowired 078 @Qualifier("containmentIndex") 079 private ContainmentIndex containmentIndex; 080 081 private NamedParameterJdbcTemplate jdbcTemplate; 082 083 private static final String TABLE_NAME = "reference"; 084 085 private static final String TRANSACTION_TABLE = "reference_transaction_operations"; 086 087 private static final String RESOURCE_COLUMN = "fedora_id"; 088 089 private static final String SUBJECT_COLUMN = "subject_id"; 090 091 private static final String PROPERTY_COLUMN = "property"; 092 093 private static final String TARGET_COLUMN = "target_id"; 094 095 private static final String OPERATION_COLUMN = "operation"; 096 097 private static final String TRANSACTION_COLUMN = "transaction_id"; 098 099 private static final String SELECT_INBOUND = "SELECT " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + " FROM " + 100 TABLE_NAME + " WHERE " + TARGET_COLUMN + " = :targetId"; 101 102 private static final String SELECT_INBOUND_IN_TRANSACTION = "SELECT x." + SUBJECT_COLUMN + ", x." + 103 PROPERTY_COLUMN + " FROM " + "(SELECT " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + " FROM " + TABLE_NAME + 104 " WHERE " + TARGET_COLUMN + " = :targetId UNION " + "SELECT " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + 105 " FROM " + TRANSACTION_TABLE + " WHERE " + TARGET_COLUMN + " = :targetId AND " 106 + TRANSACTION_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " = 'add') x WHERE NOT EXISTS " + 107 "(SELECT 1 FROM " + TRANSACTION_TABLE + " WHERE " + TARGET_COLUMN + " = :targetId AND " + 108 OPERATION_COLUMN + " = 'delete')"; 109 110 private static final String SELECT_OUTBOUND = "SELECT " + SUBJECT_COLUMN + ", " + TARGET_COLUMN + ", " + 111 PROPERTY_COLUMN + " FROM " + TABLE_NAME + " WHERE " + RESOURCE_COLUMN + " = :resourceId"; 112 113 private static final String SELECT_OUTBOUND_IN_TRANSACTION = "SELECT x." + SUBJECT_COLUMN + ", x." + TARGET_COLUMN + 114 ", x." + PROPERTY_COLUMN + " FROM " + "(SELECT " + SUBJECT_COLUMN + ", " + TARGET_COLUMN + ", " + 115 PROPERTY_COLUMN + " FROM " + TABLE_NAME + " WHERE " + RESOURCE_COLUMN + " = :resourceId UNION " + 116 "SELECT " + SUBJECT_COLUMN + ", " + TARGET_COLUMN + ", " + PROPERTY_COLUMN + " FROM " + TRANSACTION_TABLE + 117 " WHERE " + RESOURCE_COLUMN + " = :resourceId " + "AND " + TRANSACTION_COLUMN + " = :transactionId AND " + 118 OPERATION_COLUMN + " = 'add') x WHERE NOT EXISTS (SELECT 1 FROM " + TRANSACTION_TABLE + " WHERE " + 119 RESOURCE_COLUMN + " = :resourceId AND " + OPERATION_COLUMN + " = 'delete')"; 120 121 private static final String INSERT_REFERENCE_IN_TRANSACTION = "INSERT INTO " + TRANSACTION_TABLE + "(" + 122 RESOURCE_COLUMN + ", " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + ", " + 123 TRANSACTION_COLUMN + ", " + OPERATION_COLUMN + ") VALUES (:resourceId, :subjectId, :property, :targetId, " + 124 ":transactionId, 'add')"; 125 126 private static final String INSERT_REFERENCE_DIRECT = "INSERT INTO " + TABLE_NAME + "(" + 127 RESOURCE_COLUMN + ", " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + 128 ") VALUES (:resourceId, :subjectId, :property, :targetId)"; 129 130 private static final String UNDO_INSERT_REFERENCE_IN_TRANSACTION = "DELETE FROM " + TRANSACTION_TABLE + " WHERE " + 131 RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " + PROPERTY_COLUMN + 132 " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN + " = :transactionId AND " + 133 OPERATION_COLUMN + " = 'add'"; 134 135 private static final String DELETE_REFERENCE_IN_TRANSACTION = "INSERT INTO " + TRANSACTION_TABLE + "(" + 136 RESOURCE_COLUMN + ", " + SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + ", " + 137 TRANSACTION_COLUMN + ", " + OPERATION_COLUMN + ") VALUES (:resourceId, :subjectId, :property, :targetId, " + 138 ":transactionId, 'delete')"; 139 140 private static final String DELETE_REFERENCE_DIRECT = "DELETE FROM reference" + 141 " WHERE fedora_id = :resourceId AND subject_id = :subjectId" + 142 " AND property = :property AND target_id = :targetId"; 143 144 private static final String UNDO_DELETE_REFERENCE_IN_TRANSACTION = "DELETE FROM " + TRANSACTION_TABLE + " WHERE " + 145 RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " + PROPERTY_COLUMN + 146 " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN + " = :transactionId AND " + 147 OPERATION_COLUMN + " = 'delete'"; 148 149 private static final String IS_REFERENCE_ADDED_IN_TRANSACTION = "SELECT TRUE FROM " + TRANSACTION_TABLE + " WHERE " 150 + RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " + PROPERTY_COLUMN + 151 " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN + " = :transactionId AND " + 152 OPERATION_COLUMN + " = 'add'"; 153 154 private static final String IS_REFERENCE_DELETED_IN_TRANSACTION = "SELECT TRUE FROM " + TRANSACTION_TABLE + 155 " WHERE " + RESOURCE_COLUMN + " = :resourceId AND " + SUBJECT_COLUMN + " = :subjectId AND " + 156 PROPERTY_COLUMN + " = :property AND " + TARGET_COLUMN + " = :targetId AND " + TRANSACTION_COLUMN + 157 " = :transactionId AND " + OPERATION_COLUMN + " = 'delete'"; 158 159 private static final String COMMIT_ADD_RECORDS = "INSERT INTO " + TABLE_NAME + " ( " + RESOURCE_COLUMN + ", " + 160 SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + " ) SELECT " + RESOURCE_COLUMN + ", " + 161 SUBJECT_COLUMN + ", " + PROPERTY_COLUMN + ", " + TARGET_COLUMN + " FROM " + TRANSACTION_TABLE + " WHERE " + 162 TRANSACTION_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " = 'add'"; 163 164 private static final String COMMIT_DELETE_RECORDS = "DELETE FROM " + TABLE_NAME + " WHERE " + 165 "EXISTS (SELECT * FROM " + TRANSACTION_TABLE + " t WHERE t." + 166 TRANSACTION_COLUMN + " = :transactionId AND t." + OPERATION_COLUMN + " = 'delete' AND" + 167 " t." + RESOURCE_COLUMN + " = " + TABLE_NAME + "." + RESOURCE_COLUMN + " AND" + 168 " t." + SUBJECT_COLUMN + " = " + TABLE_NAME + "." + SUBJECT_COLUMN + 169 " AND t." + PROPERTY_COLUMN + " = " + TABLE_NAME + "." + PROPERTY_COLUMN + 170 " AND t." + TARGET_COLUMN + " = " + TABLE_NAME + "." + TARGET_COLUMN + ")"; 171 172 private static final String DELETE_TRANSACTION = "DELETE FROM " + TRANSACTION_TABLE + " WHERE " + 173 TRANSACTION_COLUMN + " = :transactionId"; 174 175 private static final String TRUNCATE_TABLE = "TRUNCATE TABLE " + TABLE_NAME; 176 177 @PostConstruct 178 public void setUp() { 179 jdbcTemplate = new NamedParameterJdbcTemplate(getDataSource()); 180 } 181 182 @Override 183 public RdfStream getInboundReferences(@Nonnull final Transaction tx, final FedoraResource resource) { 184 final String resourceId = resource.getFedoraId().getFullId(); 185 final Node subject = NodeFactory.createURI(resourceId); 186 final Stream<Triple> stream = getReferencesInternal(tx, resourceId); 187 if (resource instanceof NonRdfSourceDescription) { 188 final Stream<Triple> stream2 = getReferencesInternal(tx, resource.getFedoraId().getBaseId()); 189 return new DefaultRdfStream(subject, Stream.concat(stream, stream2)); 190 } 191 return new DefaultRdfStream(subject, stream); 192 } 193 194 /** 195 * Get the inbound references for the resource Id and the transaction id. 196 * @param tx transaction or null for none. 197 * @param targetId the id that will be the target of references. 198 * @return RDF stream of inbound references 199 */ 200 private Stream<Triple> getReferencesInternal(final Transaction tx, final String targetId) { 201 final MapSqlParameterSource parameterSource = new MapSqlParameterSource(); 202 parameterSource.addValue("targetId", targetId); 203 final Node targetNode = NodeFactory.createURI(targetId); 204 205 final RowMapper<Triple> inboundMapper = (rs, rowNum) -> 206 Triple.create(NodeFactory.createURI(rs.getString(SUBJECT_COLUMN)), 207 NodeFactory.createURI(rs.getString(PROPERTY_COLUMN)), 208 targetNode); 209 210 final String query; 211 212 if (tx.isOpenLongRunning()) { 213 // we are in a transaction 214 parameterSource.addValue("transactionId", tx.getId()); 215 query = SELECT_INBOUND_IN_TRANSACTION; 216 } else { 217 // not in a transaction 218 query = SELECT_INBOUND; 219 } 220 221 final var references = jdbcTemplate.query(query, parameterSource, inboundMapper); 222 223 LOGGER.debug("getInboundReferences for {} in transaction {} found {} references", 224 targetId, tx, references.size()); 225 return references.stream(); 226 } 227 228 @Override 229 public void deleteAllReferences(@Nonnull final Transaction tx, final FedoraId resourceId) { 230 final List<Quad> deleteReferences = getOutboundReferences(tx, resourceId); 231 if (resourceId.isDescription()) { 232 // Also get the binary references 233 deleteReferences.addAll(getOutboundReferences(tx, resourceId.asBaseId())); 234 } 235 // Remove all the existing references. 236 deleteReferences.forEach(t -> removeReference(tx, t)); 237 } 238 239 /** 240 * Get a stream of quads of resources being referenced from the provided resource, the graph of the quad is the 241 * URI of the resource the reference is from. 242 * @param tx transaction Id or null if none. 243 * @param resourceId the resource Id. 244 * @return list of Quads 245 */ 246 private List<Quad> getOutboundReferences(final Transaction tx, final FedoraId resourceId) { 247 final MapSqlParameterSource parameterSource = new MapSqlParameterSource(); 248 parameterSource.addValue("resourceId", resourceId.getFullId()); 249 final Node subjectNode = NodeFactory.createURI(resourceId.getFullId()); 250 251 final RowMapper<Quad> outboundMapper = (rs, rowNum) -> 252 Quad.create(subjectNode, 253 NodeFactory.createURI(rs.getString(SUBJECT_COLUMN)), 254 NodeFactory.createURI(rs.getString(PROPERTY_COLUMN)), 255 NodeFactory.createURI(rs.getString(TARGET_COLUMN))); 256 257 final String query; 258 259 if (tx.isOpenLongRunning()) { 260 // we are in a long-running transaction 261 parameterSource.addValue("transactionId", tx.getId()); 262 query = SELECT_OUTBOUND_IN_TRANSACTION; 263 } else { 264 // not in a transaction or in a short-lived transaction 265 query = SELECT_OUTBOUND; 266 } 267 268 final var references = jdbcTemplate.query(query, parameterSource, outboundMapper); 269 270 LOGGER.debug("getOutboundReferences for {} in transaction {} found {} references", 271 resourceId, tx, references.size()); 272 return references; 273 } 274 275 @Override 276 public void updateReferences(@Nonnull final Transaction tx, final FedoraId resourceId, final String userPrincipal, 277 final RdfStream rdfStream) { 278 try { 279 final List<Triple> addReferences = getReferencesFromRdf(rdfStream).collect(Collectors.toList()); 280 // This predicate checks for items we are adding, so we don't bother to delete and then re-add them. 281 final Predicate<Quad> notInAdds = q -> !addReferences.contains(q.asTriple()); 282 // References from this resource. 283 final List<Quad> existingReferences = getOutboundReferences(tx, resourceId); 284 if (resourceId.isDescription()) { 285 // Resource is a binary description so also get the binary references. 286 existingReferences.addAll(getOutboundReferences(tx, resourceId.asBaseId())); 287 } 288 // Remove any existing references not being re-added. 289 existingReferences.stream().filter(notInAdds).forEach(t -> removeReference(tx, t)); 290 final Node resourceNode = NodeFactory.createURI(resourceId.getFullId()); 291 // This predicate checks for references that didn't already exist in the database. 292 final Predicate<Triple> alreadyExists = t -> !existingReferences.contains(Quad.create(resourceNode, t)); 293 // Add the new references. 294 addReferences.stream().filter(alreadyExists).forEach(r -> 295 addReference(tx, Quad.create(resourceNode, r), userPrincipal)); 296 } catch (final Exception e) { 297 LOGGER.warn("Unable to update reference index for resource {} in transaction {}: {}", 298 resourceId.getFullId(), tx.getId(), e.getMessage()); 299 throw new RepositoryRuntimeException("Unable to update reference index", e); 300 } 301 } 302 303 @Override 304 public void commitTransaction(final Transaction tx) { 305 if (!tx.isShortLived()) { 306 tx.ensureCommitting(); 307 try { 308 final Map<String, String> parameterSource = Map.of("transactionId", tx.getId()); 309 jdbcTemplate.update(COMMIT_DELETE_RECORDS, parameterSource); 310 jdbcTemplate.update(COMMIT_ADD_RECORDS, parameterSource); 311 jdbcTemplate.update(DELETE_TRANSACTION, parameterSource); 312 } catch (final Exception e) { 313 LOGGER.warn("Unable to commit reference index transaction {}: {}", tx, e.getMessage()); 314 throw new RepositoryRuntimeException("Unable to commit reference index transaction", e); 315 } 316 } 317 } 318 319 @Transactional(propagation = Propagation.NOT_SUPPORTED) 320 @Override 321 public void rollbackTransaction(final Transaction tx) { 322 if (!tx.isShortLived()) { 323 try { 324 final Map<String, String> parameterSource = Map.of("transactionId", tx.getId()); 325 jdbcTemplate.update(DELETE_TRANSACTION, parameterSource); 326 } catch (final Exception e) { 327 LOGGER.warn("Unable to rollback reference index transaction {}: {}", tx, e.getMessage()); 328 throw new RepositoryRuntimeException("Unable to rollback reference index transaction", e); 329 } 330 } 331 } 332 333 @Override 334 public void reset() { 335 try { 336 jdbcTemplate.update(TRUNCATE_TABLE, Map.of()); 337 } catch (final Exception e) { 338 LOGGER.warn("Unable to reset reference index: {}", e.getMessage()); 339 throw new RepositoryRuntimeException("Unable to reset reference index", e); 340 } 341 } 342 343 /** 344 * Remove a reference. 345 * @param tx the transaction 346 * @param reference the quad with the reference, is Quad(resourceId, subjectId, propertyId, targetId) 347 */ 348 private void removeReference(final Transaction tx, final Quad reference) { 349 tx.doInTx(() -> { 350 final var parameterSource = new MapSqlParameterSource(); 351 parameterSource.addValue("resourceId", reference.getGraph().getURI()); 352 parameterSource.addValue("subjectId", reference.getSubject().getURI()); 353 parameterSource.addValue("property", reference.getPredicate().getURI()); 354 parameterSource.addValue("targetId", reference.getObject().getURI()); 355 356 if (!tx.isShortLived()) { 357 parameterSource.addValue("transactionId", tx.getId()); 358 final boolean addedInTx = !jdbcTemplate.queryForList(IS_REFERENCE_ADDED_IN_TRANSACTION, parameterSource) 359 .isEmpty(); 360 if (addedInTx) { 361 jdbcTemplate.update(UNDO_INSERT_REFERENCE_IN_TRANSACTION, parameterSource); 362 } else { 363 jdbcTemplate.update(DELETE_REFERENCE_IN_TRANSACTION, parameterSource); 364 } 365 } else { 366 jdbcTemplate.update(DELETE_REFERENCE_DIRECT, parameterSource); 367 } 368 }); 369 } 370 371 /** 372 * Add a reference 373 * @param transaction the transaction Id. 374 * @param reference the quad with the reference, is is Quad(resourceId, subjectId, propertyId, targetId) 375 * @param userPrincipal the user adding the reference. 376 */ 377 private void addReference(@Nonnull final Transaction transaction, final Quad reference, 378 final String userPrincipal) { 379 transaction.doInTx(() -> { 380 final String targetId = reference.getObject().getURI(); 381 382 final var parameterSource = new MapSqlParameterSource(); 383 parameterSource.addValue("resourceId", reference.getGraph().getURI()); 384 parameterSource.addValue("subjectId", reference.getSubject().getURI()); 385 parameterSource.addValue("property", reference.getPredicate().getURI()); 386 parameterSource.addValue("targetId", targetId); 387 388 if (!transaction.isShortLived()) { 389 parameterSource.addValue("transactionId", transaction.getId()); 390 final boolean addedInTx = !jdbcTemplate.queryForList( 391 IS_REFERENCE_DELETED_IN_TRANSACTION, parameterSource) 392 .isEmpty(); 393 if (addedInTx) { 394 jdbcTemplate.update(UNDO_DELETE_REFERENCE_IN_TRANSACTION, parameterSource); 395 } else { 396 jdbcTemplate.update(INSERT_REFERENCE_IN_TRANSACTION, parameterSource); 397 recordEvent(transaction, targetId, userPrincipal); 398 } 399 } else { 400 jdbcTemplate.update(INSERT_REFERENCE_DIRECT, parameterSource); 401 recordEvent(transaction, targetId, userPrincipal); 402 } 403 }); 404 } 405 406 /** 407 * Record the inbound reference event if the target exists. 408 * @param transaction the transaction. 409 * @param resourceId the id of the target of the inbound reference. 410 * @param userPrincipal the user making the reference. 411 */ 412 private void recordEvent(final Transaction transaction, final String resourceId, final String userPrincipal) { 413 final FedoraId fedoraId = FedoraId.create(resourceId); 414 if (this.containmentIndex.resourceExists(transaction, fedoraId, false)) { 415 this.eventAccumulator.recordEventForOperation(transaction, fedoraId, getOperation(transaction, fedoraId, 416 userPrincipal)); 417 } 418 } 419 420 /** 421 * Create a ReferenceOperation for the current add. 422 * @param tx the transaction for the current operation. 423 * @param id the target resource of the reference. 424 * @param user the user making the change 425 * @return a ReferenceOperation 426 */ 427 private static ReferenceOperation getOperation(final Transaction tx, final FedoraId id, final String user) { 428 final ReferenceOperationBuilder builder = new ReferenceOperationBuilder(tx, id); 429 builder.userPrincipal(user); 430 return builder.build(); 431 } 432 433 /** 434 * Utility to filter a RDFStream to just the URIs from subjects and objects within the repository. 435 * @param stream the provided stream 436 * @return stream of triples with internal references. 437 */ 438 private Stream<Triple> getReferencesFromRdf(final RdfStream stream) { 439 final Predicate<Triple> isInternalReference = t -> { 440 final Node s = t.getSubject(); 441 final Node o = t.getObject(); 442 return (s.isURI() && s.getURI().startsWith(FEDORA_ID_PREFIX) && o.isURI() && 443 o.getURI().startsWith(FEDORA_ID_PREFIX)); 444 }; 445 return stream.filter(isInternalReference); 446 } 447 448 /** 449 * Set the JDBC datastore. 450 * @param dataSource the dataStore. 451 */ 452 public void setDataSource(final DataSource dataSource) { 453 this.dataSource = dataSource; 454 } 455 456 /** 457 * Get the JDBC datastore. 458 * @return the dataStore. 459 */ 460 public DataSource getDataSource() { 461 return dataSource; 462 } 463}