001/* 002 * The contents of this file are subject to the license and copyright 003 * detailed in the LICENSE and NOTICE files at the root of the source 004 * tree. 005 */ 006package org.fcrepo.kernel.impl.services; 007 008import static org.slf4j.LoggerFactory.getLogger; 009 010import java.sql.ResultSet; 011import java.sql.SQLException; 012import java.sql.Timestamp; 013import java.time.Instant; 014import java.util.Map; 015import java.util.NoSuchElementException; 016import java.util.Queue; 017import java.util.Spliterator; 018import java.util.Spliterators; 019import java.util.concurrent.ConcurrentLinkedQueue; 020import java.util.function.Consumer; 021import java.util.stream.Stream; 022import java.util.stream.StreamSupport; 023 024import javax.annotation.PostConstruct; 025import javax.inject.Inject; 026import javax.sql.DataSource; 027 028import org.fcrepo.common.db.DbPlatform; 029import org.fcrepo.kernel.api.Transaction; 030import org.fcrepo.kernel.api.identifiers.FedoraId; 031 032import org.apache.jena.graph.Node; 033import org.apache.jena.graph.NodeFactory; 034import org.apache.jena.graph.Triple; 035import org.slf4j.Logger; 036import org.springframework.jdbc.core.RowCallbackHandler; 037import org.springframework.jdbc.core.RowMapper; 038import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; 039import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; 040import org.springframework.stereotype.Component; 041import org.springframework.transaction.annotation.Propagation; 042import org.springframework.transaction.annotation.Transactional; 043 044/** 045 * Manager for the membership index 046 * 047 * @author bbpennel 048 */ 049@Component 050public class MembershipIndexManager { 051 private static final Logger log = getLogger(MembershipIndexManager.class); 052 053 private static final Timestamp NO_END_TIMESTAMP = Timestamp.from(MembershipServiceImpl.NO_END_INSTANT); 054 private static final Timestamp NO_START_TIMESTAMP = Timestamp.from(Instant.parse("1000-01-01T00:00:00.000Z")); 055 056 private static final String ADD_OPERATION = "add"; 057 private static final String DELETE_OPERATION = "delete"; 058 private static final String FORCE_FLAG = "force"; 059 060 private static final String TX_ID_PARAM = "txId"; 061 private static final String SUBJECT_ID_PARAM = "subjectId"; 062 private static final String NO_END_TIME_PARAM = "noEndTime"; 063 private static final String ADD_OP_PARAM = "addOp"; 064 private static final String DELETE_OP_PARAM = "deleteOp"; 065 private static final String MEMENTO_TIME_PARAM = "mementoTime"; 066 private static final String PROPERTY_PARAM = "property"; 067 private static final String TARGET_ID_PARAM = "targetId"; 068 private static final String SOURCE_ID_PARAM = "sourceId"; 069 private static final String PROXY_ID_PARAM = "proxyId"; 070 private static final String START_TIME_PARAM = "startTime"; 071 private static final String END_TIME_PARAM = "endTime"; 072 private static final String LAST_UPDATED_PARAM = "lastUpdated"; 073 private static final String OPERATION_PARAM = "operation"; 074 private static final String FORCE_PARAM = "forceFlag"; 075 private static final String LIMIT_PARAM = "limit"; 076 private static final String OFFSET_PARAM = "offSet"; 077 078 private static final String SELECT_ALL_MEMBERSHIP = "SELECT * FROM membership"; 079 080 private static final String SELECT_ALL_OPERATIONS = "SELECT * FROM membership_tx_operations"; 081 082 private static final String SELECT_MEMBERSHIP_IN_TX = 083 "SELECT property, object_id" + 084 " FROM membership m" + 085 " WHERE subject_id = :subjectId" + 086 " AND end_time = :noEndTime" + 087 " AND NOT EXISTS (" + 088 " SELECT 1" + 089 " FROM membership_tx_operations mto" + 090 " WHERE mto.subject_id = :subjectId" + 091 " AND mto.source_id = m.source_id" + 092 " AND mto.object_id = m.object_id" + 093 " AND mto.tx_id = :txId" + 094 " AND mto.operation = :deleteOp)" + 095 " UNION" + 096 " SELECT property, object_id" + 097 " FROM membership_tx_operations" + 098 " WHERE subject_id = :subjectId" + 099 " AND tx_id = :txId" + 100 " AND end_time = :noEndTime" + 101 " AND operation = :addOp" + 102 " ORDER BY property, object_id" + 103 " LIMIT :limit OFFSET :offSet"; 104 105 private static final String DIRECT_SELECT_MEMBERSHIP = 106 "SELECT property, object_id" + 107 " FROM membership" + 108 " WHERE subject_id = :subjectId" + 109 " AND end_time = :noEndTime" + 110 " ORDER BY property, object_id" + 111 " LIMIT :limit OFFSET :offSet"; 112 113 private static final String SELECT_MEMBERSHIP_MEMENTO_IN_TX = 114 "SELECT property, object_id" + 115 " FROM membership m" + 116 " WHERE m.subject_id = :subjectId" + 117 " AND m.start_time <= :mementoTime" + 118 " AND m.end_time > :mementoTime" + 119 " AND NOT EXISTS (" + 120 " SELECT 1" + 121 " FROM membership_tx_operations mto" + 122 " WHERE mto.subject_id = :subjectId" + 123 " AND mto.source_id = m.source_id" + 124 " AND mto.property = m.property" + 125 " AND mto.object_id = m.object_id" + 126 " AND mto.end_time <= :mementoTime" + 127 " AND mto.tx_id = :txId" + 128 " AND mto.operation = :deleteOp)" + 129 " UNION" + 130 " SELECT property, object_id" + 131 " FROM membership_tx_operations" + 132 " WHERE subject_id = :subjectId" + 133 " AND tx_id = :txId" + 134 " AND start_time <= :mementoTime" + 135 " AND end_time > :mementoTime" + 136 " AND operation = :addOp" + 137 " ORDER BY property, object_id" + 138 " LIMIT :limit OFFSET :offSet"; 139 140 private static final String DIRECT_SELECT_MEMBERSHIP_MEMENTO = 141 "SELECT property, object_id" + 142 " FROM membership" + 143 " WHERE subject_id = :subjectId" + 144 " AND start_time <= :mementoTime" + 145 " AND end_time > :mementoTime" + 146 " ORDER BY property, object_id" + 147 " LIMIT :limit OFFSET :offSet"; 148 149 private static final String SELECT_LAST_UPDATED = 150 "SELECT max(last_updated) as last_updated" + 151 " FROM membership" + 152 " WHERE subject_id = :subjectId"; 153 154 // For mementos, use the start_time instead of last_updated as the 155 // end_time reflects when the next version starts 156 private static final String SELECT_LAST_UPDATED_MEMENTO = 157 "SELECT max(start_time)" + 158 " FROM membership" + 159 " WHERE subject_id = :subjectId" + 160 " AND start_time <= :mementoTime" + 161 " AND end_time > :mementoTime"; 162 163 private static final String SELECT_LAST_UPDATED_IN_TX = 164 "SELECT max(combined.updated) as last_updated" + 165 " FROM (" + 166 " SELECT max(last_updated) as updated" + 167 " FROM membership m" + 168 " WHERE subject_id = :subjectId" + 169 " AND NOT EXISTS (" + 170 " SELECT 1" + 171 " FROM membership_tx_operations mto" + 172 " WHERE mto.subject_id = :subjectId" + 173 " AND mto.source_id = m.source_id" + 174 " AND mto.object_id = m.object_id" + 175 " AND mto.tx_id = :txId" + 176 " AND mto.operation = :deleteOp)" + 177 " UNION" + 178 " SELECT max(last_updated) as updated" + 179 " FROM membership_tx_operations" + 180 " WHERE subject_id = :subjectId" + 181 " AND tx_id = :txId" + 182 ") combined"; 183 184 private static final String INSERT_MEMBERSHIP_IN_TX = 185 "INSERT INTO membership_tx_operations (subject_id, property, object_id, source_id," + 186 " proxy_id, start_time, end_time, last_updated, tx_id, operation)" + 187 " VALUES (:subjectId, :property, :targetId, :sourceId," + 188 " :proxyId, :startTime, :endTime, :lastUpdated, :txId, :operation)"; 189 190 private static final String DIRECT_INSERT_MEMBERSHIP = 191 "INSERT INTO membership (subject_id, property, object_id, source_id," + 192 " proxy_id, start_time, end_time, last_updated)" + 193 " VALUES (:subjectId, :property, :targetId, :sourceId," + 194 " :proxyId, :startTime, :endTime, :lastUpdated)"; 195 196 private static final String END_EXISTING_MEMBERSHIP = 197 "INSERT INTO membership_tx_operations (subject_id, property, object_id, source_id," + 198 " proxy_id, start_time, end_time, last_updated, tx_id, operation)" + 199 " SELECT m.subject_id, m.property, m.object_id, m.source_id, m.proxy_id, m.start_time," + 200 " :endTime, :endTime, :txId, :deleteOp" + 201 " FROM membership m" + 202 " WHERE m.source_id = :sourceId" + 203 " AND m.proxy_id = :proxyId" + 204 " AND m.end_time = :noEndTime"; 205 206 private static final String DIRECT_END_EXISTING_MEMBERSHIP = 207 "UPDATE membership SET end_time = :endTime, last_updated = :endTime" + 208 " WHERE source_id = :sourceId" + 209 " AND proxy_id = :proxyId" + 210 " AND end_time = :noEndTime"; 211 212 private static final String CLEAR_FOR_PROXY_IN_TX = 213 "DELETE FROM membership_tx_operations" + 214 " WHERE source_id = :sourceId" + 215 " AND tx_id = :txId" + 216 " AND proxy_id = :proxyId" + 217 " AND force_flag IS NULL"; 218 219 private static final String CLEAR_ALL_ADDED_FOR_SOURCE_IN_TX = 220 "DELETE FROM membership_tx_operations" + 221 " WHERE source_id = :sourceId" + 222 " AND tx_id = :txId" + 223 " AND operation = :addOp"; 224 225 // Add "delete" entries for all existing membership from the given source, if not already deleted 226 private static final String END_EXISTING_FOR_SOURCE = 227 "INSERT INTO membership_tx_operations (subject_id, property, object_id, source_id," + 228 " proxy_id, start_time, end_time, last_updated, tx_id, operation)" + 229 " SELECT subject_id, property, object_id, source_id," + 230 " proxy_id, start_time, :endTime, :endTime, :txId, :deleteOp" + 231 " FROM membership m" + 232 " WHERE source_id = :sourceId" + 233 " AND end_time = :noEndTime" + 234 " AND NOT EXISTS (" + 235 " SELECT TRUE" + 236 " FROM membership_tx_operations mtx" + 237 " WHERE mtx.subject_id = m.subject_id" + 238 " AND mtx.property = m.property" + 239 " AND mtx.object_id = m.object_id" + 240 " AND mtx.source_id = m.source_id" + 241 " AND mtx.proxy_id = m.proxy_id" + 242 " AND mtx.operation = :deleteOp" + 243 ")"; 244 245 private static final String DIRECT_END_EXISTING_FOR_SOURCE = 246 "UPDATE membership SET end_time = :endTime, last_updated = :endTime" + 247 " WHERE source_id = :sourceId" + 248 " AND end_time = :noEndTime"; 249 250 private static final String DELETE_EXISTING_FOR_SOURCE_AFTER = 251 "INSERT INTO membership_tx_operations(subject_id, property, object_id, source_id," + 252 " proxy_id, start_time, end_time, last_updated, tx_id, operation, force_flag)" + 253 " SELECT subject_id, property, object_id, source_id, proxy_id, start_time, end_time," + 254 " last_updated, :txId, :deleteOp, :forceFlag" + 255 " FROM membership m" + 256 " WHERE m.source_id = :sourceId" + 257 " AND (m.start_time >= :startTime" + 258 " OR m.end_time >= :startTime)"; 259 260 private static final String DIRECT_DELETE_EXISTING_FOR_SOURCE_AFTER = 261 "DELETE FROM membership" + 262 " WHERE source_id = :sourceId" + 263 " AND (start_time >= :startTime" + 264 " OR end_time >= :startTime)"; 265 266 private static final String DELETE_EXISTING_FOR_PROXY_AFTER = 267 "INSERT INTO membership_tx_operations(subject_id, property, object_id, source_id," + 268 " proxy_id, start_time, end_time, last_updated, tx_id, operation, force_flag)" + 269 " SELECT subject_id, property, object_id, source_id, proxy_id, start_time, end_time," + 270 " last_updated, :txId, :deleteOp, :forceFlag" + 271 " FROM membership m" + 272 " WHERE m.proxy_id = :proxyId" + 273 " AND (m.start_time >= :startTime" + 274 " OR m.end_time >= :startTime)"; 275 276 private static final String DIRECT_DELETE_EXISTING_FOR_PROXY_AFTER = 277 "UPDATE membership SET end_time = :endTime, last_updated = :endTime" + 278 " WHERE proxy_id = :proxyId" + 279 " AND (start_time >= :endTime" + 280 " OR end_time >= :endTime)"; 281 282 private static final String PURGE_ALL_REFERENCES_MEMBERSHIP = 283 "DELETE from membership" + 284 " where source_id = :targetId" + 285 " OR subject_id = :targetId" + 286 " OR object_id = :targetId"; 287 288 private static final String PURGE_ALL_REFERENCES_TRANSACTION = 289 "DELETE from membership_tx_operations" + 290 " WHERE tx_id = :txId" + 291 " AND (source_id = :targetId" + 292 " OR subject_id = :targetId" + 293 " OR object_id = :targetId)"; 294 295 private static final String COMMIT_DELETES = 296 "DELETE from membership" + 297 " WHERE EXISTS (" + 298 " SELECT TRUE" + 299 " FROM membership_tx_operations mto" + 300 " WHERE mto.tx_id = :txId" + 301 " AND mto.operation = :deleteOp" + 302 " AND mto.force_flag = :forceFlag" + 303 " AND membership.source_id = mto.source_id" + 304 " AND membership.proxy_id = mto.proxy_id" + 305 " AND membership.subject_id = mto.subject_id" + 306 " AND membership.property = mto.property" + 307 " AND membership.object_id = mto.object_id" + 308 " )"; 309 310 private static final String COMMIT_ENDS_H2 = 311 "UPDATE membership m" + 312 " SET end_time = (" + 313 " SELECT mto.end_time" + 314 " FROM membership_tx_operations mto" + 315 " WHERE mto.tx_id = :txId" + 316 " AND m.source_id = mto.source_id" + 317 " AND m.proxy_id = mto.proxy_id" + 318 " AND m.subject_id = mto.subject_id" + 319 " AND m.property = mto.property" + 320 " AND m.object_id = mto.object_id" + 321 " AND mto.operation = :deleteOp" + 322 " )," + 323 " last_updated = (" + 324 " SELECT mto.end_time" + 325 " FROM membership_tx_operations mto" + 326 " WHERE mto.tx_id = :txId" + 327 " AND m.source_id = mto.source_id" + 328 " AND m.proxy_id = mto.proxy_id" + 329 " AND m.subject_id = mto.subject_id" + 330 " AND m.property = mto.property" + 331 " AND m.object_id = mto.object_id" + 332 " AND mto.operation = :deleteOp" + 333 " )" + 334 " WHERE EXISTS (" + 335 "SELECT TRUE" + 336 " FROM membership_tx_operations mto" + 337 " WHERE mto.tx_id = :txId" + 338 " AND mto.operation = :deleteOp" + 339 " AND m.source_id = mto.source_id" + 340 " AND m.proxy_id = mto.proxy_id" + 341 " AND m.subject_id = mto.subject_id" + 342 " AND m.property = mto.property" + 343 " AND m.object_id = mto.object_id" + 344 " )"; 345 346 private static final String COMMIT_ENDS_POSTGRES = 347 "UPDATE membership" + 348 " SET end_time = mto.end_time, last_updated = mto.end_time" + 349 " FROM membership_tx_operations mto" + 350 " WHERE mto.tx_id = :txId" + 351 " AND mto.operation = :deleteOp" + 352 " AND membership.source_id = mto.source_id" + 353 " AND membership.proxy_id = mto.proxy_id" + 354 " AND membership.subject_id = mto.subject_id" + 355 " AND membership.property = mto.property" + 356 " AND membership.object_id = mto.object_id"; 357 358 private static final String COMMIT_ENDS_MYSQL = 359 "UPDATE membership m" + 360 " INNER JOIN membership_tx_operations mto ON" + 361 " m.source_id = mto.source_id" + 362 " AND m.proxy_id = mto.proxy_id" + 363 " AND m.subject_id = mto.subject_id" + 364 " AND m.property = mto.property" + 365 " AND m.object_id = mto.object_id" + 366 " SET m.end_time = mto.end_time, m.last_updated = mto.end_time" + 367 " WHERE mto.tx_id = :txId" + 368 " AND mto.operation = :deleteOp"; 369 370 private static final Map<DbPlatform, String> COMMIT_ENDS_MAP = Map.of( 371 DbPlatform.MYSQL, COMMIT_ENDS_MYSQL, 372 DbPlatform.MARIADB, COMMIT_ENDS_MYSQL, 373 DbPlatform.POSTGRESQL, COMMIT_ENDS_POSTGRES, 374 DbPlatform.H2, COMMIT_ENDS_H2 375 ); 376 377 // Transfer all "add" operations from tx to committed membership, unless the entry already exists 378 private static final String COMMIT_ADDS = 379 "INSERT INTO membership" + 380 " (subject_id, property, object_id, source_id, proxy_id, start_time, end_time, last_updated)" + 381 " SELECT subject_id, property, object_id, source_id, proxy_id, start_time, end_time, last_updated" + 382 " FROM membership_tx_operations mto" + 383 " WHERE mto.tx_id = :txId" + 384 " AND mto.operation = :addOp" + 385 " AND NOT EXISTS (" + 386 " SELECT TRUE" + 387 " FROM membership m" + 388 " WHERE m.source_id = mto.source_id" + 389 " AND m.proxy_id = mto.proxy_id" + 390 " AND m.subject_id = mto.subject_id" + 391 " AND m.property = mto.property" + 392 " AND m.object_id = mto.object_id" + 393 " AND m.start_time = mto.start_time" + 394 " AND m.end_time = mto.end_time" + 395 " )"; 396 397 private static final String DELETE_TRANSACTION = 398 "DELETE FROM membership_tx_operations" + 399 " WHERE tx_id = :txId"; 400 401 private static final String TRUNCATE_MEMBERSHIP = "TRUNCATE TABLE membership"; 402 403 private static final String TRUNCATE_MEMBERSHIP_TX = "TRUNCATE TABLE membership_tx_operations"; 404 405 @Inject 406 private DataSource dataSource; 407 408 private NamedParameterJdbcTemplate jdbcTemplate; 409 410 private DbPlatform dbPlatform; 411 412 private static final int MEMBERSHIP_LIMIT = 50000; 413 414 @PostConstruct 415 public void setUp() { 416 jdbcTemplate = new NamedParameterJdbcTemplate(getDataSource()); 417 dbPlatform = DbPlatform.fromDataSource(dataSource); 418 } 419 420 /** 421 * End a membership from the child of a Direct/IndirectContainer, setting an end time if committed, 422 * or clearing from the current tx if it was newly added. 423 * 424 * @param tx transaction 425 * @param sourceId ID of the direct/indirect container whose membership should be ended 426 * @param proxyId ID of the proxy producing this membership, when applicable 427 * @param endTime the time the resource was deleted, generally its last modified 428 */ 429 public void endMembershipFromChild(final Transaction tx, final FedoraId sourceId, final FedoraId proxyId, 430 final Instant endTime) { 431 tx.doInTx(() -> { 432 if (!tx.isShortLived()) { 433 final MapSqlParameterSource parameterSource = new MapSqlParameterSource(); 434 parameterSource.addValue(TX_ID_PARAM, tx.getId()); 435 parameterSource.addValue(SOURCE_ID_PARAM, sourceId.getFullId()); 436 parameterSource.addValue(PROXY_ID_PARAM, proxyId.getFullId()); 437 438 final int affected = jdbcTemplate.update(CLEAR_FOR_PROXY_IN_TX, parameterSource); 439 440 // If no rows were deleted, then assume we need to delete permanent entry 441 if (affected == 0) { 442 final MapSqlParameterSource parameterSource2 = new MapSqlParameterSource(); 443 parameterSource2.addValue(TX_ID_PARAM, tx.getId()); 444 parameterSource2.addValue(SOURCE_ID_PARAM, sourceId.getFullId()); 445 parameterSource2.addValue(PROXY_ID_PARAM, proxyId.getFullId()); 446 parameterSource2.addValue(END_TIME_PARAM, formatInstant(endTime)); 447 parameterSource2.addValue(NO_END_TIME_PARAM, NO_END_TIMESTAMP); 448 parameterSource2.addValue(DELETE_OP_PARAM, DELETE_OPERATION); 449 jdbcTemplate.update(END_EXISTING_MEMBERSHIP, parameterSource2); 450 } 451 } else { 452 final MapSqlParameterSource parameterSource = new MapSqlParameterSource(); 453 parameterSource.addValue(SOURCE_ID_PARAM, sourceId.getFullId()); 454 parameterSource.addValue(PROXY_ID_PARAM, proxyId.getFullId()); 455 parameterSource.addValue(END_TIME_PARAM, formatInstant(endTime)); 456 parameterSource.addValue(NO_END_TIME_PARAM, NO_END_TIMESTAMP); 457 jdbcTemplate.update(DIRECT_END_EXISTING_MEMBERSHIP, parameterSource); 458 } 459 }); 460 } 461 462 public void deleteMembershipForProxyAfter(final Transaction tx, 463 final FedoraId sourceId, 464 final FedoraId proxyId, 465 final Instant afterTime) { 466 tx.doInTx(() -> { 467 final var afterTimestamp = afterTime == null ? NO_START_TIMESTAMP : formatInstant(afterTime); 468 469 if (!tx.isShortLived()) { 470 // Clear all membership added in this transaction 471 final var parameterSource = Map.of( 472 TX_ID_PARAM, tx.getId(), 473 SOURCE_ID_PARAM, sourceId.getFullId(), 474 PROXY_ID_PARAM, proxyId.getFullId(), 475 OPERATION_PARAM, ADD_OPERATION); 476 477 jdbcTemplate.update(CLEAR_FOR_PROXY_IN_TX, parameterSource); 478 479 // Delete all existing membership entries that start after or end after the given timestamp 480 final Map<String, Object> parameterSource2 = Map.of( 481 TX_ID_PARAM, tx.getId(), 482 PROXY_ID_PARAM, proxyId.getFullId(), 483 START_TIME_PARAM, afterTimestamp, 484 FORCE_PARAM, FORCE_FLAG, 485 DELETE_OP_PARAM, DELETE_OPERATION); 486 jdbcTemplate.update(DELETE_EXISTING_FOR_PROXY_AFTER, parameterSource2); 487 } else { 488 final Map<String, Object> parameterSource = Map.of( 489 PROXY_ID_PARAM, proxyId.getFullId(), 490 END_TIME_PARAM, afterTimestamp); 491 jdbcTemplate.update(DIRECT_DELETE_EXISTING_FOR_PROXY_AFTER, parameterSource); 492 } 493 }); 494 } 495 496 /** 497 * End all membership properties resulting from the specified source container 498 * @param tx transaction 499 * @param sourceId ID of the direct/indirect container whose membership should be ended 500 * @param endTime the time the resource was deleted, generally its last modified 501 */ 502 public void endMembershipForSource(final Transaction tx, final FedoraId sourceId, final Instant endTime) { 503 tx.doInTx(() -> { 504 if (!tx.isShortLived()) { 505 final Map<String, Object> parameterSource = Map.of( 506 TX_ID_PARAM, tx.getId(), 507 SOURCE_ID_PARAM, sourceId.getFullId(), 508 ADD_OP_PARAM, ADD_OPERATION); 509 510 jdbcTemplate.update(CLEAR_ALL_ADDED_FOR_SOURCE_IN_TX, parameterSource); 511 512 final Map<String, Object> parameterSource2 = Map.of( 513 TX_ID_PARAM, tx.getId(), 514 SOURCE_ID_PARAM, sourceId.getFullId(), 515 END_TIME_PARAM, formatInstant(endTime), 516 NO_END_TIME_PARAM, NO_END_TIMESTAMP, 517 DELETE_OP_PARAM, DELETE_OPERATION); 518 jdbcTemplate.update(END_EXISTING_FOR_SOURCE, parameterSource2); 519 } else { 520 final Map<String, Object> parameterSource = Map.of( 521 SOURCE_ID_PARAM, sourceId.getFullId(), 522 END_TIME_PARAM, formatInstant(endTime), 523 NO_END_TIME_PARAM, NO_END_TIMESTAMP); 524 jdbcTemplate.update(DIRECT_END_EXISTING_FOR_SOURCE, parameterSource); 525 } 526 }); 527 } 528 529 /** 530 * Delete membership entries that are active at or after the given timestamp for the specified source 531 * @param tx transaction 532 * @param sourceId ID of the direct/indirect container 533 * @param afterTime time at or after which membership should be removed 534 */ 535 public void deleteMembershipForSourceAfter(final Transaction tx, final FedoraId sourceId, final Instant afterTime) { 536 tx.doInTx(() -> { 537 final var afterTimestamp = afterTime == null ? NO_START_TIMESTAMP : formatInstant(afterTime); 538 539 if (!tx.isShortLived()) { 540 // Clear all membership added in this transaction 541 final Map<String, Object> parameterSource = Map.of( 542 TX_ID_PARAM, tx.getId(), 543 SOURCE_ID_PARAM, sourceId.getFullId(), 544 ADD_OP_PARAM, ADD_OPERATION); 545 546 jdbcTemplate.update(CLEAR_ALL_ADDED_FOR_SOURCE_IN_TX, parameterSource); 547 548 // Delete all existing membership entries that start after or end after the given timestamp 549 final Map<String, Object> parameterSource2 = Map.of( 550 TX_ID_PARAM, tx.getId(), 551 SOURCE_ID_PARAM, sourceId.getFullId(), 552 START_TIME_PARAM, afterTimestamp, 553 FORCE_PARAM, FORCE_FLAG, 554 DELETE_OP_PARAM, DELETE_OPERATION); 555 jdbcTemplate.update(DELETE_EXISTING_FOR_SOURCE_AFTER, parameterSource2); 556 } else { 557 final Map<String, Object> parameterSource = Map.of( 558 SOURCE_ID_PARAM, sourceId.getFullId(), 559 START_TIME_PARAM, afterTimestamp); 560 jdbcTemplate.update(DIRECT_DELETE_EXISTING_FOR_SOURCE_AFTER, parameterSource); 561 } 562 }); 563 } 564 565 /** 566 * Clean up any references to the target id, in transactions and outside 567 * @param txId transaction id 568 * @param targetId identifier of the resource to cleanup membership references for 569 */ 570 public void deleteMembershipReferences(final String txId, final FedoraId targetId) { 571 final Map<String, Object> parameterSource = Map.of( 572 TARGET_ID_PARAM, targetId.getFullId(), 573 TX_ID_PARAM, txId); 574 575 jdbcTemplate.update(PURGE_ALL_REFERENCES_TRANSACTION, parameterSource); 576 jdbcTemplate.update(PURGE_ALL_REFERENCES_MEMBERSHIP, parameterSource); 577 } 578 579 /** 580 * Add new membership property to the index, clearing any delete 581 * operations for the property if necessary. 582 * @param tx transaction 583 * @param sourceId ID of the direct/indirect container which produced the membership 584 * @param proxyId ID of the proxy producing this membership, when applicable 585 * @param membership membership triple 586 * @param startTime time the membership triple was added 587 */ 588 public void addMembership(final Transaction tx, final FedoraId sourceId, final FedoraId proxyId, 589 final Triple membership, final Instant startTime) { 590 if (membership == null) { 591 return; 592 } 593 addMembership(tx, sourceId, proxyId, membership, startTime, null); 594 } 595 596 /** 597 * Add new membership property to the index 598 * @param tx transaction 599 * @param sourceId ID of the direct/indirect container which produced the membership 600 * @param proxyId ID of the proxy producing this membership, when applicable 601 * @param membership membership triple 602 * @param startTime time the membership triple was added 603 * @param endTime time the membership triple ends, or never if not provided 604 */ 605 public void addMembership(final Transaction tx, final FedoraId sourceId, final FedoraId proxyId, 606 final Triple membership, final Instant startTime, final Instant endTime) { 607 tx.doInTx(() -> { 608 final Timestamp endTimestamp; 609 final Timestamp lastUpdated; 610 final Timestamp startTimestamp = formatInstant(startTime); 611 if (endTime == null) { 612 endTimestamp = NO_END_TIMESTAMP; 613 lastUpdated = startTimestamp; 614 } else { 615 endTimestamp = formatInstant(endTime); 616 lastUpdated = endTimestamp; 617 } 618 // Add the new membership operation 619 final MapSqlParameterSource parameterSource = new MapSqlParameterSource(); 620 parameterSource.addValue(SUBJECT_ID_PARAM, membership.getSubject().getURI()); 621 parameterSource.addValue(PROPERTY_PARAM, membership.getPredicate().getURI()); 622 parameterSource.addValue(TARGET_ID_PARAM, membership.getObject().getURI()); 623 parameterSource.addValue(SOURCE_ID_PARAM, sourceId.getFullId()); 624 parameterSource.addValue(PROXY_ID_PARAM, proxyId.getFullId()); 625 parameterSource.addValue(START_TIME_PARAM, startTimestamp); 626 parameterSource.addValue(END_TIME_PARAM, endTimestamp); 627 parameterSource.addValue(LAST_UPDATED_PARAM, lastUpdated); 628 629 if (!tx.isShortLived()) { 630 parameterSource.addValue(TX_ID_PARAM, tx.getId()); 631 parameterSource.addValue(OPERATION_PARAM, ADD_OPERATION); 632 jdbcTemplate.update(INSERT_MEMBERSHIP_IN_TX, parameterSource); 633 } else { 634 jdbcTemplate.update(DIRECT_INSERT_MEMBERSHIP, parameterSource); 635 } 636 }); 637 } 638 639 /** 640 * Get a stream of membership triples with 641 * @param tx transaction from which membership will be retrieved, or null for no transaction 642 * @param subjectId ID of the subject 643 * @return Stream of membership triples 644 */ 645 public Stream<Triple> getMembership(final Transaction tx, final FedoraId subjectId) { 646 final Node subjectNode = NodeFactory.createURI(subjectId.getBaseId()); 647 648 final RowMapper<Triple> membershipMapper = (rs, rowNum) -> 649 Triple.create(subjectNode, 650 NodeFactory.createURI(rs.getString("property")), 651 NodeFactory.createURI(rs.getString("object_id"))); 652 653 final MapSqlParameterSource parameterSource = new MapSqlParameterSource(); 654 final String query; 655 656 if (subjectId.isMemento()) { 657 parameterSource.addValue(SUBJECT_ID_PARAM, subjectId.getBaseId()); 658 parameterSource.addValue(MEMENTO_TIME_PARAM, formatInstant(subjectId.getMementoInstant())); 659 } else { 660 parameterSource.addValue(SUBJECT_ID_PARAM, subjectId.getFullId()); 661 parameterSource.addValue(NO_END_TIME_PARAM, NO_END_TIMESTAMP); 662 } 663 664 if (tx.isOpenLongRunning()) { 665 parameterSource.addValue(TX_ID_PARAM, tx.getId()); 666 667 if (subjectId.isMemento()) { 668 query = SELECT_MEMBERSHIP_MEMENTO_IN_TX; 669 } else { 670 query = SELECT_MEMBERSHIP_IN_TX; 671 } 672 } else { 673 if (subjectId.isMemento()) { 674 query = DIRECT_SELECT_MEMBERSHIP_MEMENTO; 675 } else { 676 query = DIRECT_SELECT_MEMBERSHIP; 677 } 678 } 679 680 return StreamSupport.stream(new MembershipIterator(query, parameterSource, membershipMapper), false); 681 } 682 683 public Instant getLastUpdated(final Transaction transaction, final FedoraId subjectId) { 684 final MapSqlParameterSource parameterSource = new MapSqlParameterSource(); 685 686 parameterSource.addValue(NO_END_TIME_PARAM, NO_END_TIMESTAMP); 687 final String lastUpdatedQuery; 688 if (subjectId.isMemento()) { 689 lastUpdatedQuery = SELECT_LAST_UPDATED_MEMENTO; 690 parameterSource.addValue(SUBJECT_ID_PARAM, subjectId.getBaseId()); 691 parameterSource.addValue(MEMENTO_TIME_PARAM, formatInstant(subjectId.getMementoInstant())); 692 } else if (transaction.isOpenLongRunning()) { 693 lastUpdatedQuery = SELECT_LAST_UPDATED_IN_TX; 694 parameterSource.addValue(SUBJECT_ID_PARAM, subjectId.getFullId()); 695 parameterSource.addValue(TX_ID_PARAM, transaction.getId()); 696 parameterSource.addValue(DELETE_OP_PARAM, DELETE_OPERATION); 697 } else { 698 lastUpdatedQuery = SELECT_LAST_UPDATED; 699 parameterSource.addValue(SUBJECT_ID_PARAM, subjectId.getFullId()); 700 } 701 702 final var updated = jdbcTemplate.queryForObject(lastUpdatedQuery, parameterSource, Timestamp.class); 703 if (updated != null) { 704 return updated.toInstant(); 705 } 706 return null; 707 } 708 709 /** 710 * Perform a commit of operations stored in the specified transaction 711 * @param tx transaction 712 */ 713 public void commitTransaction(final Transaction tx) { 714 if (!tx.isShortLived()) { 715 tx.ensureCommitting(); 716 final Map<String, String> parameterSource = Map.of( 717 TX_ID_PARAM, tx.getId(), 718 ADD_OP_PARAM, ADD_OPERATION, 719 DELETE_OP_PARAM, DELETE_OPERATION, 720 FORCE_PARAM, FORCE_FLAG); 721 722 jdbcTemplate.update(COMMIT_DELETES, parameterSource); 723 final int ends = jdbcTemplate.update(COMMIT_ENDS_MAP.get(this.dbPlatform), parameterSource); 724 final int adds = jdbcTemplate.update(COMMIT_ADDS, parameterSource); 725 final int cleaned = jdbcTemplate.update(DELETE_TRANSACTION, parameterSource); 726 727 log.debug("Completed commit, {} ended, {} adds, {} operations", ends, adds, cleaned); 728 } 729 } 730 731 /** 732 * Delete all entries related to a transaction 733 * @param tx transaction 734 */ 735 @Transactional(propagation = Propagation.NOT_SUPPORTED) 736 public void deleteTransaction(final Transaction tx) { 737 if (!tx.isShortLived()) { 738 final Map<String, String> parameterSource = Map.of(TX_ID_PARAM, tx.getId()); 739 jdbcTemplate.update(DELETE_TRANSACTION, parameterSource); 740 } 741 } 742 743 /** 744 * Format an instant to a timestamp without milliseconds, due to precision 745 * issues with memento datetimes. 746 * @param instant the instant 747 * @return a Timestamp 748 */ 749 private Timestamp formatInstant(final Instant instant) { 750 final var timestamp = Timestamp.from(instant); 751 timestamp.setNanos(0); 752 return timestamp; 753 } 754 755 /** 756 * Clear all entries from the index 757 */ 758 public void clearIndex() { 759 jdbcTemplate.update(TRUNCATE_MEMBERSHIP, Map.of()); 760 jdbcTemplate.update(TRUNCATE_MEMBERSHIP_TX, Map.of()); 761 } 762 763 /** 764 * Log all membership entries, for debugging usage only 765 */ 766 protected void logMembership() { 767 log.info("source_id, proxy_id, subject_id, property, object_id, start_time, end_time, last_updated"); 768 jdbcTemplate.query(SELECT_ALL_MEMBERSHIP, new RowCallbackHandler() { 769 @Override 770 public void processRow(final ResultSet rs) throws SQLException { 771 log.info("{}, {}, {}, {}, {}, {}, {}, {}", 772 rs.getString("source_id"), rs.getString("proxy_id"), rs.getString("subject_id"), 773 rs.getString("property"), rs.getString("object_id"), rs.getTimestamp("start_time"), 774 rs.getTimestamp("end_time"), rs.getTimestamp("last_updated")); 775 } 776 }); 777 } 778 779 /** 780 * Log all membership operations, for debugging usage only 781 */ 782 protected void logOperations() { 783 log.info("source_id, proxy_id, subject_id, property, object_id, start_time, end_time," 784 + " last_updated, tx_id, operation, force_flag"); 785 jdbcTemplate.query(SELECT_ALL_OPERATIONS, new RowCallbackHandler() { 786 @Override 787 public void processRow(final ResultSet rs) throws SQLException { 788 log.info("{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}", 789 rs.getString("source_id"), rs.getString("proxy_id"), rs.getString("subject_id"), 790 rs.getString("property"), rs.getString("object_id"), rs.getTimestamp("start_time"), 791 rs.getTimestamp("end_time"), rs.getTimestamp("last_updated"), rs.getString("tx_id"), 792 rs.getString("operation"), rs.getString("force_flag")); 793 } 794 }); 795 } 796 797 /** 798 * Set the JDBC datastore. 799 * @param dataSource the dataStore. 800 */ 801 public void setDataSource(final DataSource dataSource) { 802 this.dataSource = dataSource; 803 } 804 805 /** 806 * Get the JDBC datastore. 807 * @return the dataStore. 808 */ 809 public DataSource getDataSource() { 810 return dataSource; 811 } 812 813 /** 814 * Private class to back a stream with a paged DB query. 815 * 816 * If this needs to be run in parallel we will have to override trySplit() and determine a good method to split on. 817 */ 818 private class MembershipIterator extends Spliterators.AbstractSpliterator<Triple> { 819 final Queue<Triple> children = new ConcurrentLinkedQueue<>(); 820 int numOffsets = 0; 821 final String queryToUse; 822 final MapSqlParameterSource parameterSource; 823 final RowMapper<Triple> rowMapper; 824 825 public MembershipIterator(final String query, final MapSqlParameterSource parameters, 826 final RowMapper<Triple> mapper) { 827 super(Long.MAX_VALUE, Spliterator.ORDERED); 828 queryToUse = query; 829 parameterSource = parameters; 830 rowMapper = mapper; 831 parameterSource.addValue(ADD_OP_PARAM, ADD_OPERATION); 832 parameterSource.addValue(DELETE_OP_PARAM, DELETE_OPERATION); 833 parameterSource.addValue(LIMIT_PARAM, MEMBERSHIP_LIMIT); 834 } 835 836 @Override 837 public boolean tryAdvance(final Consumer<? super Triple> action) { 838 try { 839 action.accept(children.remove()); 840 } catch (final NoSuchElementException e) { 841 parameterSource.addValue(OFFSET_PARAM, numOffsets * MEMBERSHIP_LIMIT); 842 numOffsets += 1; 843 children.addAll(jdbcTemplate.query(queryToUse, parameterSource, rowMapper)); 844 if (children.size() == 0) { 845 // no more elements. 846 return false; 847 } 848 action.accept(children.remove()); 849 } 850 return true; 851 } 852 } 853}