001/*
002 * The contents of this file are subject to the license and copyright
003 * detailed in the LICENSE and NOTICE files at the root of the source
004 * tree.
005 */
006package org.fcrepo.kernel.impl;
007
008import static java.util.stream.Collectors.toList;
009
010import java.time.Duration;
011import java.time.Instant;
012import java.util.Arrays;
013import java.util.List;
014import java.util.concurrent.Phaser;
015
016import org.fcrepo.common.db.DbTransactionExecutor;
017import org.fcrepo.common.lang.CheckedRunnable;
018import org.fcrepo.kernel.api.ContainmentIndex;
019import org.fcrepo.kernel.api.Transaction;
020import org.fcrepo.kernel.api.TransactionState;
021import org.fcrepo.kernel.api.cache.UserTypesCache;
022import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
023import org.fcrepo.kernel.api.exception.TransactionClosedException;
024import org.fcrepo.kernel.api.exception.TransactionRuntimeException;
025import org.fcrepo.kernel.api.identifiers.FedoraId;
026import org.fcrepo.kernel.api.lock.ResourceLockManager;
027import org.fcrepo.kernel.api.observer.EventAccumulator;
028import org.fcrepo.kernel.api.services.MembershipService;
029import org.fcrepo.kernel.api.services.ReferenceService;
030import org.fcrepo.persistence.api.PersistentStorageSession;
031import org.fcrepo.search.api.SearchIndex;
032
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * The Fedora Transaction implementation
038 *
039 * @author mohideen
040 */
041public class TransactionImpl implements Transaction {
042
043    private static final Logger log = LoggerFactory.getLogger(TransactionImpl.class);
044
045    private final String id;
046
047    private final TransactionManagerImpl txManager;
048
049    private TransactionState state;
050
051    private boolean shortLived = true;
052
053    private Instant expiration;
054
055    private boolean expired = false;
056
057    private String baseUri;
058
059    private String userAgent;
060
061    private final Duration sessionTimeout;
062
063    private final Phaser operationPhaser;
064
065    private boolean suppressEvents = false;
066
067    protected TransactionImpl(final String id,
068                              final TransactionManagerImpl txManager,
069                              final Duration sessionTimeout) {
070        if (id == null || id.isEmpty()) {
071            throw new IllegalArgumentException("Transaction id should not be empty!");
072        }
073        this.id = id;
074        this.txManager = txManager;
075        this.sessionTimeout = sessionTimeout;
076        this.expiration = Instant.now().plus(sessionTimeout);
077        this.state = TransactionState.OPEN;
078        this.operationPhaser = new Phaser();
079    }
080
081    @Override
082    public synchronized void commit() {
083        if (state == TransactionState.COMMITTED) {
084            return;
085        }
086        failIfNotOpen();
087        failIfExpired();
088
089        updateState(TransactionState.COMMITTING);
090
091        log.debug("Waiting for operations in transaction {} to complete before committing", id);
092
093        operationPhaser.register();
094        operationPhaser.awaitAdvance(operationPhaser.arriveAndDeregister());
095
096        log.debug("Committing transaction {}", id);
097
098        try {
099            if (isShortLived()) {
100                doCommitShortLived();
101            } else {
102                doCommitLongRunning();
103            }
104
105            updateState(TransactionState.COMMITTED);
106            if (!this.suppressEvents) {
107                this.getEventAccumulator().emitEvents(this, baseUri, userAgent);
108            } else {
109                this.getEventAccumulator().clearEvents(this);
110            }
111
112            releaseLocks();
113            log.debug("Committed transaction {}", id);
114        } catch (final Exception ex) {
115            log.error("Failed to commit transaction: {}", id, ex);
116
117            // Rollback on commit failure
118            log.info("Rolling back transaction {}", id);
119            rollback();
120            throw new RepositoryRuntimeException("Failed to commit transaction " + id, ex);
121        }
122    }
123
124    @Override
125    public boolean isCommitted() {
126        return state == TransactionState.COMMITTED;
127    }
128
129    @Override
130    public synchronized void rollback() {
131        if (state == TransactionState.ROLLEDBACK || state == TransactionState.ROLLINGBACK) {
132            return;
133        }
134
135        failIfCommitted();
136
137        updateState(TransactionState.ROLLINGBACK);
138
139        log.debug("Waiting for operations in transaction {} to complete before rolling back", id);
140
141        operationPhaser.register();
142        operationPhaser.awaitAdvance(operationPhaser.arriveAndDeregister());
143
144        execQuietly("Failed to rollback storage in transaction " + id, () -> {
145            this.getPersistentSession().rollback();
146        });
147        execQuietly("Failed to rollback index in transaction " + id, () -> {
148            this.getContainmentIndex().rollbackTransaction(this);
149        });
150        execQuietly("Failed to rollback reference index in transaction " + id, () -> {
151            this.getReferenceService().rollbackTransaction(this);
152        });
153        execQuietly("Failed to rollback membership index in transaction " + id, () -> {
154            this.getMembershipService().rollbackTransaction(this);
155        });
156        execQuietly("Failed to rollback search index in transaction " + id, () -> {
157            this.getSearchIndex().rollbackTransaction(this);
158        });
159
160        execQuietly("Failed to rollback events in transaction " + id, () -> {
161            this.getEventAccumulator().clearEvents(this);
162        });
163
164        execQuietly("Failed to clear user rdf types cache in transaction " + id, () -> {
165            this.getUserTypesCache().dropSessionCache(id);
166        });
167
168        updateState(TransactionState.ROLLEDBACK);
169
170        releaseLocks();
171    }
172
173    @Override
174    public void doInTx(final Runnable runnable) {
175        operationPhaser.register();
176
177        try {
178            failIfNotOpen();
179            failIfExpired();
180
181            runnable.run();
182        } finally {
183            operationPhaser.arriveAndDeregister();
184        }
185    }
186
187    @Override
188    public synchronized void fail() {
189        if (state != TransactionState.OPEN) {
190            log.error("Transaction {} is in state {} and may not be marked as FAILED", id, state);
191        } else {
192            updateState(TransactionState.FAILED);
193        }
194    }
195
196    @Override
197    public boolean isRolledBack() {
198        return state == TransactionState.ROLLEDBACK;
199    }
200
201    @Override
202    public String getId() {
203        return id;
204    }
205
206    @Override
207    public void setShortLived(final boolean shortLived) {
208        this.shortLived = shortLived;
209    }
210
211    @Override
212    public boolean isShortLived() {
213        return this.shortLived;
214    }
215
216    @Override
217    public boolean isOpenLongRunning() {
218        return !this.isShortLived() && !hasExpired()
219                && !(state == TransactionState.COMMITTED
220                || state == TransactionState.ROLLEDBACK
221                || state == TransactionState.FAILED);
222    }
223
224    @Override
225    public boolean isOpen() {
226        return state == TransactionState.OPEN && !hasExpired();
227    }
228
229    @Override
230    public void ensureCommitting() {
231        if (state != TransactionState.COMMITTING) {
232            throw new TransactionRuntimeException(
233                    String.format("Transaction %s must be in state COMMITTING, but was %s", id, state));
234        }
235    }
236
237    @Override
238    public boolean isReadOnly() {
239        return false;
240    }
241
242    @Override
243    public void expire() {
244        this.expiration = Instant.now();
245        this.expired = true;
246    }
247
248    @Override
249    public boolean hasExpired() {
250        if (this.expired) {
251            return true;
252        }
253        this.expired = this.expiration.isBefore(Instant.now());
254        return this.expired;
255    }
256
257    @Override
258    public synchronized Instant updateExpiry(final Duration amountToAdd) {
259        failIfExpired();
260        failIfCommitted();
261        failIfNotOpen();
262        this.expiration = this.expiration.plus(amountToAdd);
263        return this.expiration;
264    }
265
266    @Override
267    public Instant getExpires() {
268        return this.expiration;
269    }
270
271    @Override
272    public void commitIfShortLived() {
273       if (this.isShortLived()) {
274           this.commit();
275       }
276    }
277
278    @Override
279    public void refresh() {
280        updateExpiry(sessionTimeout);
281    }
282
283    @Override
284    public void lockResource(final FedoraId resourceId) {
285        getResourceLockManger().acquire(getId(), resourceId);
286    }
287
288    /**
289     * If you create an object with ghost nodes above it, we need to lock those paths as well to ensure
290     * no other operation alters them while the current transaction is in process.
291     *
292     * @param resourceId the resource we are creating
293     */
294    @Override
295    public void lockResourceAndGhostNodes(final FedoraId resourceId) {
296        getResourceLockManger().acquire(getId(), resourceId);
297        final var resourceIdStr = resourceId.getResourceId();
298        final String estimateParentPath = resourceIdStr.indexOf('/') > -1 ?
299                resourceIdStr.substring(0,resourceIdStr.lastIndexOf('/')) : resourceIdStr;
300        final var actualParent = getContainmentIndex().getContainerIdByPath(this, resourceId, false);
301        if (!estimateParentPath.equals(actualParent.getResourceId())) {
302            // If the expected parent does not match the actual parent, then we have ghost nodes.
303            // Lock them too.
304            final List<String> ghostPaths = Arrays.stream(estimateParentPath
305                    .replace(actualParent.getResourceId(), "")
306                    .split("/")).filter(a -> !a.isBlank()).collect(toList());
307            FedoraId tempParent = actualParent;
308            for (final String part : ghostPaths) {
309                tempParent = tempParent.resolve(part);
310                getResourceLockManger().acquire(getId(), tempParent);
311            }
312        }
313    }
314
315    @Override
316    public void releaseResourceLocksIfShortLived() {
317        if (isShortLived()) {
318            releaseLocks();
319        }
320    }
321
322    @Override
323    public void setBaseUri(final String baseUri) {
324        this.baseUri = baseUri;
325    }
326
327    @Override
328    public void setUserAgent(final String userAgent) {
329        this.userAgent = userAgent;
330    }
331
332    @Override
333    public void suppressEvents() {
334        this.suppressEvents = true;
335    }
336
337    private void doCommitShortLived() {
338        // short-lived txs do not write to tx tables and do not need to commit db indexes.
339        this.getPersistentSession().prepare();
340        this.getPersistentSession().commit();
341        this.getUserTypesCache().mergeSessionCache(id);
342    }
343
344    private void doCommitLongRunning() {
345        getDbTransactionExecutor().doInTxWithRetry(() -> {
346            this.getContainmentIndex().commitTransaction(this);
347            this.getReferenceService().commitTransaction(this);
348            this.getMembershipService().commitTransaction(this);
349            this.getSearchIndex().commitTransaction(this);
350            this.getPersistentSession().prepare();
351            // The storage session must be committed last because mutable head changes cannot be rolled back.
352            // The db transaction will remain open until all changes have been written to OCFL. If the changes
353            // are large, or are going to S3, this could take some time. In which case, it is possible the
354            // db's connection timeout may need to be adjusted so that the connection is not closed while
355            // waiting for the OCFL changes to be committed.
356            this.getPersistentSession().commit();
357            this.getUserTypesCache().mergeSessionCache(id);
358        });
359    }
360
361    private void updateState(final TransactionState newState) {
362        this.state = newState;
363    }
364
365    private PersistentStorageSession getPersistentSession() {
366        return this.txManager.getPersistentStorageSessionManager().getSession(this);
367    }
368
369    private void failIfExpired() {
370        if (hasExpired()) {
371            throw new TransactionClosedException("Transaction " + id + " expired!");
372        }
373    }
374
375    private void failIfCommitted() {
376        if (state == TransactionState.COMMITTED) {
377            throw new TransactionClosedException(
378                    String.format("Transaction %s cannot be transitioned because it is already committed!", id));
379        }
380    }
381
382    private void failIfNotOpen() {
383        if (state == TransactionState.FAILED) {
384            throw new TransactionRuntimeException(
385                    String.format("Transaction %s cannot be committed because it is in a failed state!", id));
386        } else if (state != TransactionState.OPEN) {
387            throw new TransactionClosedException(
388                    String.format("Transaction %s cannot be committed because it is in state %s!", id, state));
389        }
390    }
391
392    private void releaseLocks() {
393        execQuietly("Failed to release resource locks cleanly. You may need to restart Fedora.", () -> {
394            getResourceLockManger().releaseAll(getId());
395        });
396    }
397
398    /**
399     * Executes the closure, capturing all exceptions, and logging them as errors.
400     *
401     * @param failureMessage what to print if the closure fails
402     * @param callable closure to execute
403     */
404    private void execQuietly(final String failureMessage, final CheckedRunnable callable) {
405        try {
406            callable.run();
407        } catch (final Exception e) {
408            log.error(failureMessage, e);
409        }
410    }
411
412    private ContainmentIndex getContainmentIndex() {
413        return this.txManager.getContainmentIndex();
414    }
415
416    private EventAccumulator getEventAccumulator() {
417        return this.txManager.getEventAccumulator();
418    }
419
420    private ReferenceService getReferenceService() {
421        return this.txManager.getReferenceService();
422    }
423
424    private MembershipService getMembershipService() {
425        return this.txManager.getMembershipService();
426    }
427
428    private SearchIndex getSearchIndex() {
429        return this.txManager.getSearchIndex();
430    }
431
432    private DbTransactionExecutor getDbTransactionExecutor() {
433        return this.txManager.getDbTransactionExecutor();
434    }
435
436    private ResourceLockManager getResourceLockManger() {
437        return this.txManager.getResourceLockManager();
438    }
439
440    private UserTypesCache getUserTypesCache() {
441        return this.txManager.getUserTypesCache();
442    }
443
444    @Override
445    public String toString() {
446        return id;
447    }
448}