001/* 002 * The contents of this file are subject to the license and copyright 003 * detailed in the LICENSE and NOTICE files at the root of the source 004 * tree. 005 */ 006package org.fcrepo.kernel.impl; 007 008import static java.util.stream.Collectors.toList; 009 010import java.time.Duration; 011import java.time.Instant; 012import java.util.Arrays; 013import java.util.List; 014import java.util.concurrent.Phaser; 015 016import org.fcrepo.common.db.DbTransactionExecutor; 017import org.fcrepo.common.lang.CheckedRunnable; 018import org.fcrepo.kernel.api.ContainmentIndex; 019import org.fcrepo.kernel.api.Transaction; 020import org.fcrepo.kernel.api.TransactionState; 021import org.fcrepo.kernel.api.cache.UserTypesCache; 022import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 023import org.fcrepo.kernel.api.exception.TransactionClosedException; 024import org.fcrepo.kernel.api.exception.TransactionRuntimeException; 025import org.fcrepo.kernel.api.identifiers.FedoraId; 026import org.fcrepo.kernel.api.lock.ResourceLockManager; 027import org.fcrepo.kernel.api.observer.EventAccumulator; 028import org.fcrepo.kernel.api.services.MembershipService; 029import org.fcrepo.kernel.api.services.ReferenceService; 030import org.fcrepo.persistence.api.PersistentStorageSession; 031import org.fcrepo.search.api.SearchIndex; 032 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * The Fedora Transaction implementation 038 * 039 * @author mohideen 040 */ 041public class TransactionImpl implements Transaction { 042 043 private static final Logger log = LoggerFactory.getLogger(TransactionImpl.class); 044 045 private final String id; 046 047 private final TransactionManagerImpl txManager; 048 049 private TransactionState state; 050 051 private boolean shortLived = true; 052 053 private Instant expiration; 054 055 private boolean expired = false; 056 057 private String baseUri; 058 059 private String userAgent; 060 061 private final Duration sessionTimeout; 062 063 private final Phaser operationPhaser; 064 065 private boolean suppressEvents = false; 066 067 protected TransactionImpl(final String id, 068 final TransactionManagerImpl txManager, 069 final Duration sessionTimeout) { 070 if (id == null || id.isEmpty()) { 071 throw new IllegalArgumentException("Transaction id should not be empty!"); 072 } 073 this.id = id; 074 this.txManager = txManager; 075 this.sessionTimeout = sessionTimeout; 076 this.expiration = Instant.now().plus(sessionTimeout); 077 this.state = TransactionState.OPEN; 078 this.operationPhaser = new Phaser(); 079 } 080 081 @Override 082 public synchronized void commit() { 083 if (state == TransactionState.COMMITTED) { 084 return; 085 } 086 failIfNotOpen(); 087 failIfExpired(); 088 089 updateState(TransactionState.COMMITTING); 090 091 log.debug("Waiting for operations in transaction {} to complete before committing", id); 092 093 operationPhaser.register(); 094 operationPhaser.awaitAdvance(operationPhaser.arriveAndDeregister()); 095 096 log.debug("Committing transaction {}", id); 097 098 try { 099 if (isShortLived()) { 100 doCommitShortLived(); 101 } else { 102 doCommitLongRunning(); 103 } 104 105 updateState(TransactionState.COMMITTED); 106 if (!this.suppressEvents) { 107 this.getEventAccumulator().emitEvents(this, baseUri, userAgent); 108 } else { 109 this.getEventAccumulator().clearEvents(this); 110 } 111 112 releaseLocks(); 113 log.debug("Committed transaction {}", id); 114 } catch (final Exception ex) { 115 log.error("Failed to commit transaction: {}", id, ex); 116 117 // Rollback on commit failure 118 log.info("Rolling back transaction {}", id); 119 rollback(); 120 throw new RepositoryRuntimeException("Failed to commit transaction " + id, ex); 121 } 122 } 123 124 @Override 125 public boolean isCommitted() { 126 return state == TransactionState.COMMITTED; 127 } 128 129 @Override 130 public synchronized void rollback() { 131 if (state == TransactionState.ROLLEDBACK || state == TransactionState.ROLLINGBACK) { 132 return; 133 } 134 135 failIfCommitted(); 136 137 updateState(TransactionState.ROLLINGBACK); 138 139 log.debug("Waiting for operations in transaction {} to complete before rolling back", id); 140 141 operationPhaser.register(); 142 operationPhaser.awaitAdvance(operationPhaser.arriveAndDeregister()); 143 144 execQuietly("Failed to rollback storage in transaction " + id, () -> { 145 this.getPersistentSession().rollback(); 146 }); 147 execQuietly("Failed to rollback index in transaction " + id, () -> { 148 this.getContainmentIndex().rollbackTransaction(this); 149 }); 150 execQuietly("Failed to rollback reference index in transaction " + id, () -> { 151 this.getReferenceService().rollbackTransaction(this); 152 }); 153 execQuietly("Failed to rollback membership index in transaction " + id, () -> { 154 this.getMembershipService().rollbackTransaction(this); 155 }); 156 execQuietly("Failed to rollback search index in transaction " + id, () -> { 157 this.getSearchIndex().rollbackTransaction(this); 158 }); 159 160 execQuietly("Failed to rollback events in transaction " + id, () -> { 161 this.getEventAccumulator().clearEvents(this); 162 }); 163 164 execQuietly("Failed to clear user rdf types cache in transaction " + id, () -> { 165 this.getUserTypesCache().dropSessionCache(id); 166 }); 167 168 updateState(TransactionState.ROLLEDBACK); 169 170 releaseLocks(); 171 } 172 173 @Override 174 public void doInTx(final Runnable runnable) { 175 operationPhaser.register(); 176 177 try { 178 failIfNotOpen(); 179 failIfExpired(); 180 181 runnable.run(); 182 } finally { 183 operationPhaser.arriveAndDeregister(); 184 } 185 } 186 187 @Override 188 public synchronized void fail() { 189 if (state != TransactionState.OPEN) { 190 log.error("Transaction {} is in state {} and may not be marked as FAILED", id, state); 191 } else { 192 updateState(TransactionState.FAILED); 193 } 194 } 195 196 @Override 197 public boolean isRolledBack() { 198 return state == TransactionState.ROLLEDBACK; 199 } 200 201 @Override 202 public String getId() { 203 return id; 204 } 205 206 @Override 207 public void setShortLived(final boolean shortLived) { 208 this.shortLived = shortLived; 209 } 210 211 @Override 212 public boolean isShortLived() { 213 return this.shortLived; 214 } 215 216 @Override 217 public boolean isOpenLongRunning() { 218 return !this.isShortLived() && !hasExpired() 219 && !(state == TransactionState.COMMITTED 220 || state == TransactionState.ROLLEDBACK 221 || state == TransactionState.FAILED); 222 } 223 224 @Override 225 public boolean isOpen() { 226 return state == TransactionState.OPEN && !hasExpired(); 227 } 228 229 @Override 230 public void ensureCommitting() { 231 if (state != TransactionState.COMMITTING) { 232 throw new TransactionRuntimeException( 233 String.format("Transaction %s must be in state COMMITTING, but was %s", id, state)); 234 } 235 } 236 237 @Override 238 public boolean isReadOnly() { 239 return false; 240 } 241 242 @Override 243 public void expire() { 244 this.expiration = Instant.now(); 245 this.expired = true; 246 } 247 248 @Override 249 public boolean hasExpired() { 250 if (this.expired) { 251 return true; 252 } 253 this.expired = this.expiration.isBefore(Instant.now()); 254 return this.expired; 255 } 256 257 @Override 258 public synchronized Instant updateExpiry(final Duration amountToAdd) { 259 failIfExpired(); 260 failIfCommitted(); 261 failIfNotOpen(); 262 this.expiration = this.expiration.plus(amountToAdd); 263 return this.expiration; 264 } 265 266 @Override 267 public Instant getExpires() { 268 return this.expiration; 269 } 270 271 @Override 272 public void commitIfShortLived() { 273 if (this.isShortLived()) { 274 this.commit(); 275 } 276 } 277 278 @Override 279 public void refresh() { 280 updateExpiry(sessionTimeout); 281 } 282 283 @Override 284 public void lockResource(final FedoraId resourceId) { 285 getResourceLockManger().acquire(getId(), resourceId); 286 } 287 288 /** 289 * If you create an object with ghost nodes above it, we need to lock those paths as well to ensure 290 * no other operation alters them while the current transaction is in process. 291 * 292 * @param resourceId the resource we are creating 293 */ 294 @Override 295 public void lockResourceAndGhostNodes(final FedoraId resourceId) { 296 getResourceLockManger().acquire(getId(), resourceId); 297 final var resourceIdStr = resourceId.getResourceId(); 298 final String estimateParentPath = resourceIdStr.indexOf('/') > -1 ? 299 resourceIdStr.substring(0,resourceIdStr.lastIndexOf('/')) : resourceIdStr; 300 final var actualParent = getContainmentIndex().getContainerIdByPath(this, resourceId, false); 301 if (!estimateParentPath.equals(actualParent.getResourceId())) { 302 // If the expected parent does not match the actual parent, then we have ghost nodes. 303 // Lock them too. 304 final List<String> ghostPaths = Arrays.stream(estimateParentPath 305 .replace(actualParent.getResourceId(), "") 306 .split("/")).filter(a -> !a.isBlank()).collect(toList()); 307 FedoraId tempParent = actualParent; 308 for (final String part : ghostPaths) { 309 tempParent = tempParent.resolve(part); 310 getResourceLockManger().acquire(getId(), tempParent); 311 } 312 } 313 } 314 315 @Override 316 public void releaseResourceLocksIfShortLived() { 317 if (isShortLived()) { 318 releaseLocks(); 319 } 320 } 321 322 @Override 323 public void setBaseUri(final String baseUri) { 324 this.baseUri = baseUri; 325 } 326 327 @Override 328 public void setUserAgent(final String userAgent) { 329 this.userAgent = userAgent; 330 } 331 332 @Override 333 public void suppressEvents() { 334 this.suppressEvents = true; 335 } 336 337 private void doCommitShortLived() { 338 // short-lived txs do not write to tx tables and do not need to commit db indexes. 339 this.getPersistentSession().prepare(); 340 this.getPersistentSession().commit(); 341 this.getUserTypesCache().mergeSessionCache(id); 342 } 343 344 private void doCommitLongRunning() { 345 getDbTransactionExecutor().doInTxWithRetry(() -> { 346 this.getContainmentIndex().commitTransaction(this); 347 this.getReferenceService().commitTransaction(this); 348 this.getMembershipService().commitTransaction(this); 349 this.getSearchIndex().commitTransaction(this); 350 this.getPersistentSession().prepare(); 351 // The storage session must be committed last because mutable head changes cannot be rolled back. 352 // The db transaction will remain open until all changes have been written to OCFL. If the changes 353 // are large, or are going to S3, this could take some time. In which case, it is possible the 354 // db's connection timeout may need to be adjusted so that the connection is not closed while 355 // waiting for the OCFL changes to be committed. 356 this.getPersistentSession().commit(); 357 this.getUserTypesCache().mergeSessionCache(id); 358 }); 359 } 360 361 private void updateState(final TransactionState newState) { 362 this.state = newState; 363 } 364 365 private PersistentStorageSession getPersistentSession() { 366 return this.txManager.getPersistentStorageSessionManager().getSession(this); 367 } 368 369 private void failIfExpired() { 370 if (hasExpired()) { 371 throw new TransactionClosedException("Transaction " + id + " expired!"); 372 } 373 } 374 375 private void failIfCommitted() { 376 if (state == TransactionState.COMMITTED) { 377 throw new TransactionClosedException( 378 String.format("Transaction %s cannot be transitioned because it is already committed!", id)); 379 } 380 } 381 382 private void failIfNotOpen() { 383 if (state == TransactionState.FAILED) { 384 throw new TransactionRuntimeException( 385 String.format("Transaction %s cannot be committed because it is in a failed state!", id)); 386 } else if (state != TransactionState.OPEN) { 387 throw new TransactionClosedException( 388 String.format("Transaction %s cannot be committed because it is in state %s!", id, state)); 389 } 390 } 391 392 private void releaseLocks() { 393 execQuietly("Failed to release resource locks cleanly. You may need to restart Fedora.", () -> { 394 getResourceLockManger().releaseAll(getId()); 395 }); 396 } 397 398 /** 399 * Executes the closure, capturing all exceptions, and logging them as errors. 400 * 401 * @param failureMessage what to print if the closure fails 402 * @param callable closure to execute 403 */ 404 private void execQuietly(final String failureMessage, final CheckedRunnable callable) { 405 try { 406 callable.run(); 407 } catch (final Exception e) { 408 log.error(failureMessage, e); 409 } 410 } 411 412 private ContainmentIndex getContainmentIndex() { 413 return this.txManager.getContainmentIndex(); 414 } 415 416 private EventAccumulator getEventAccumulator() { 417 return this.txManager.getEventAccumulator(); 418 } 419 420 private ReferenceService getReferenceService() { 421 return this.txManager.getReferenceService(); 422 } 423 424 private MembershipService getMembershipService() { 425 return this.txManager.getMembershipService(); 426 } 427 428 private SearchIndex getSearchIndex() { 429 return this.txManager.getSearchIndex(); 430 } 431 432 private DbTransactionExecutor getDbTransactionExecutor() { 433 return this.txManager.getDbTransactionExecutor(); 434 } 435 436 private ResourceLockManager getResourceLockManger() { 437 return this.txManager.getResourceLockManager(); 438 } 439 440 private UserTypesCache getUserTypesCache() { 441 return this.txManager.getUserTypesCache(); 442 } 443 444 @Override 445 public String toString() { 446 return id; 447 } 448}