001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.kernel.impl; 019 020import java.time.Duration; 021import java.time.Instant; 022import java.util.concurrent.Phaser; 023 024import org.fcrepo.common.db.DbTransactionExecutor; 025import org.fcrepo.common.lang.CheckedRunnable; 026import org.fcrepo.kernel.api.ContainmentIndex; 027import org.fcrepo.kernel.api.Transaction; 028import org.fcrepo.kernel.api.TransactionState; 029import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 030import org.fcrepo.kernel.api.exception.TransactionClosedException; 031import org.fcrepo.kernel.api.exception.TransactionRuntimeException; 032import org.fcrepo.kernel.api.identifiers.FedoraId; 033import org.fcrepo.kernel.api.lock.ResourceLockManager; 034import org.fcrepo.kernel.api.observer.EventAccumulator; 035import org.fcrepo.kernel.api.services.MembershipService; 036import org.fcrepo.kernel.api.services.ReferenceService; 037import org.fcrepo.persistence.api.PersistentStorageSession; 038import org.fcrepo.search.api.SearchIndex; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * The Fedora Transaction implementation 044 * 045 * @author mohideen 046 */ 047public class TransactionImpl implements Transaction { 048 049 private static final Logger log = LoggerFactory.getLogger(TransactionImpl.class); 050 051 private final String id; 052 053 private final TransactionManagerImpl txManager; 054 055 private TransactionState state; 056 057 private boolean shortLived = true; 058 059 private Instant expiration; 060 061 private boolean expired = false; 062 063 private String baseUri; 064 065 private String userAgent; 066 067 private final Duration sessionTimeout; 068 069 private final Phaser operationPhaser; 070 071 protected TransactionImpl(final String id, 072 final TransactionManagerImpl txManager, 073 final Duration sessionTimeout) { 074 if (id == null || id.isEmpty()) { 075 throw new IllegalArgumentException("Transaction id should not be empty!"); 076 } 077 this.id = id; 078 this.txManager = txManager; 079 this.sessionTimeout = sessionTimeout; 080 this.expiration = Instant.now().plus(sessionTimeout); 081 this.state = TransactionState.OPEN; 082 this.operationPhaser = new Phaser(); 083 } 084 085 @Override 086 public synchronized void commit() { 087 if (state == TransactionState.COMMITTED) { 088 return; 089 } 090 failIfNotOpen(); 091 failIfExpired(); 092 093 updateState(TransactionState.COMMITTING); 094 095 log.debug("Waiting for operations in transaction {} to complete before committing", id); 096 097 operationPhaser.register(); 098 operationPhaser.awaitAdvance(operationPhaser.arriveAndDeregister()); 099 100 log.debug("Committing transaction {}", id); 101 102 try { 103 if (isShortLived()) { 104 doCommitShortLived(); 105 } else { 106 doCommitLongRunning(); 107 } 108 109 updateState(TransactionState.COMMITTED); 110 this.getEventAccumulator().emitEvents(this, baseUri, userAgent); 111 releaseLocks(); 112 log.debug("Committed transaction {}", id); 113 } catch (final Exception ex) { 114 log.error("Failed to commit transaction: {}", id, ex); 115 116 // Rollback on commit failure 117 rollback(); 118 throw new RepositoryRuntimeException("Failed to commit transaction " + id, ex); 119 } 120 } 121 122 @Override 123 public boolean isCommitted() { 124 return state == TransactionState.COMMITTED; 125 } 126 127 @Override 128 public synchronized void rollback() { 129 if (state == TransactionState.ROLLEDBACK || state == TransactionState.ROLLINGBACK) { 130 return; 131 } 132 133 failIfCommitted(); 134 135 log.info("Rolling back transaction {}", id); 136 137 updateState(TransactionState.ROLLINGBACK); 138 139 log.debug("Waiting for operations in transaction {} to complete before rolling back", id); 140 141 operationPhaser.register(); 142 operationPhaser.awaitAdvance(operationPhaser.arriveAndDeregister()); 143 144 execQuietly("Failed to rollback storage in transaction " + id, () -> { 145 this.getPersistentSession().rollback(); 146 }); 147 execQuietly("Failed to rollback index in transaction " + id, () -> { 148 this.getContainmentIndex().rollbackTransaction(this); 149 }); 150 execQuietly("Failed to rollback reference index in transaction " + id, () -> { 151 this.getReferenceService().rollbackTransaction(this); 152 }); 153 execQuietly("Failed to rollback membership index in transaction " + id, () -> { 154 this.getMembershipService().rollbackTransaction(this); 155 }); 156 execQuietly("Failed to rollback search index in transaction " + id, () -> { 157 this.getSearchIndex().rollbackTransaction(this); 158 }); 159 160 execQuietly("Failed to rollback events in transaction " + id, () -> { 161 this.getEventAccumulator().clearEvents(this); 162 }); 163 164 updateState(TransactionState.ROLLEDBACK); 165 166 releaseLocks(); 167 } 168 169 @Override 170 public void doInTx(final Runnable runnable) { 171 operationPhaser.register(); 172 173 try { 174 failIfNotOpen(); 175 failIfExpired(); 176 177 runnable.run(); 178 } finally { 179 operationPhaser.arriveAndDeregister(); 180 } 181 } 182 183 @Override 184 public synchronized void fail() { 185 if (state != TransactionState.OPEN) { 186 log.error("Transaction {} is in state {} and may not be marked as FAILED", id, state); 187 } else { 188 updateState(TransactionState.FAILED); 189 } 190 } 191 192 @Override 193 public boolean isRolledBack() { 194 return state == TransactionState.ROLLEDBACK; 195 } 196 197 @Override 198 public String getId() { 199 return id; 200 } 201 202 @Override 203 public void setShortLived(final boolean shortLived) { 204 this.shortLived = shortLived; 205 } 206 207 @Override 208 public boolean isShortLived() { 209 return this.shortLived; 210 } 211 212 @Override 213 public boolean isOpenLongRunning() { 214 return !this.isShortLived() && !hasExpired() 215 && !(state == TransactionState.COMMITTED 216 || state == TransactionState.ROLLEDBACK 217 || state == TransactionState.FAILED); 218 } 219 220 @Override 221 public boolean isOpen() { 222 return state == TransactionState.OPEN && !hasExpired(); 223 } 224 225 @Override 226 public void ensureCommitting() { 227 if (state != TransactionState.COMMITTING) { 228 throw new TransactionRuntimeException( 229 String.format("Transaction %s must be in state COMMITTING, but was %s", id, state)); 230 } 231 } 232 233 @Override 234 public boolean isReadOnly() { 235 return false; 236 } 237 238 @Override 239 public void expire() { 240 this.expiration = Instant.now(); 241 this.expired = true; 242 } 243 244 @Override 245 public boolean hasExpired() { 246 if (this.expired) { 247 return true; 248 } 249 this.expired = this.expiration.isBefore(Instant.now()); 250 return this.expired; 251 } 252 253 @Override 254 public synchronized Instant updateExpiry(final Duration amountToAdd) { 255 failIfExpired(); 256 failIfCommitted(); 257 failIfNotOpen(); 258 this.expiration = this.expiration.plus(amountToAdd); 259 return this.expiration; 260 } 261 262 @Override 263 public Instant getExpires() { 264 return this.expiration; 265 } 266 267 @Override 268 public void commitIfShortLived() { 269 if (this.isShortLived()) { 270 this.commit(); 271 } 272 } 273 274 @Override 275 public void refresh() { 276 updateExpiry(sessionTimeout); 277 } 278 279 @Override 280 public void lockResource(final FedoraId resourceId) { 281 getResourceLockManger().acquire(getId(), resourceId); 282 } 283 284 @Override 285 public void releaseResourceLocksIfShortLived() { 286 if (isShortLived()) { 287 releaseLocks(); 288 } 289 } 290 291 @Override 292 public void setBaseUri(final String baseUri) { 293 this.baseUri = baseUri; 294 } 295 296 @Override 297 public void setUserAgent(final String userAgent) { 298 this.userAgent = userAgent; 299 } 300 301 private void doCommitShortLived() { 302 // short-lived txs do not write to tx tables and do not need to commit db indexes. 303 this.getPersistentSession().prepare(); 304 this.getPersistentSession().commit(); 305 } 306 307 private void doCommitLongRunning() { 308 getDbTransactionExecutor().doInTxWithRetry(() -> { 309 this.getContainmentIndex().commitTransaction(this); 310 this.getReferenceService().commitTransaction(this); 311 this.getMembershipService().commitTransaction(this); 312 this.getSearchIndex().commitTransaction(this); 313 this.getPersistentSession().prepare(); 314 // The storage session must be committed last because mutable head changes cannot be rolled back. 315 // The db transaction will remain open until all changes have been written to OCFL. If the changes 316 // are large, or are going to S3, this could take some time. In which case, it is possible the 317 // db's connection timeout may need to be adjusted so that the connection is not closed while 318 // waiting for the OCFL changes to be committed. 319 this.getPersistentSession().commit(); 320 }); 321 } 322 323 private void updateState(final TransactionState newState) { 324 this.state = newState; 325 } 326 327 private PersistentStorageSession getPersistentSession() { 328 return this.txManager.getPersistentStorageSessionManager().getSession(this); 329 } 330 331 private void failIfExpired() { 332 if (hasExpired()) { 333 throw new TransactionClosedException("Transaction " + id + " expired!"); 334 } 335 } 336 337 private void failIfCommitted() { 338 if (state == TransactionState.COMMITTED) { 339 throw new TransactionClosedException( 340 String.format("Transaction %s cannot be transitioned because it is already committed!", id)); 341 } 342 } 343 344 private void failIfNotOpen() { 345 if (state == TransactionState.FAILED) { 346 throw new TransactionRuntimeException( 347 String.format("Transaction %s cannot be committed because it is in a failed state!", id)); 348 } else if (state != TransactionState.OPEN) { 349 throw new TransactionClosedException( 350 String.format("Transaction %s cannot be committed because it is in state %s!", id, state)); 351 } 352 } 353 354 private void releaseLocks() { 355 execQuietly("Failed to release resource locks cleanly. You may need to restart Fedora.", () -> { 356 getResourceLockManger().releaseAll(getId()); 357 }); 358 } 359 360 /** 361 * Executes the closure, capturing all exceptions, and logging them as errors. 362 * 363 * @param failureMessage what to print if the closure fails 364 * @param callable closure to execute 365 */ 366 private void execQuietly(final String failureMessage, final CheckedRunnable callable) { 367 try { 368 callable.run(); 369 } catch (final Exception e) { 370 log.error(failureMessage, e); 371 } 372 } 373 374 private ContainmentIndex getContainmentIndex() { 375 return this.txManager.getContainmentIndex(); 376 } 377 378 private EventAccumulator getEventAccumulator() { 379 return this.txManager.getEventAccumulator(); 380 } 381 382 private ReferenceService getReferenceService() { 383 return this.txManager.getReferenceService(); 384 } 385 386 private MembershipService getMembershipService() { 387 return this.txManager.getMembershipService(); 388 } 389 390 private SearchIndex getSearchIndex() { 391 return this.txManager.getSearchIndex(); 392 } 393 394 private DbTransactionExecutor getDbTransactionExecutor() { 395 return this.txManager.getDbTransactionExecutor(); 396 } 397 398 private ResourceLockManager getResourceLockManger() { 399 return this.txManager.getResourceLockManager(); 400 } 401 402 @Override 403 public String toString() { 404 return id; 405 } 406}