001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.kernel.impl;
019
020import java.time.Duration;
021import java.time.Instant;
022import java.util.concurrent.Phaser;
023
024import org.fcrepo.common.db.DbTransactionExecutor;
025import org.fcrepo.common.lang.CheckedRunnable;
026import org.fcrepo.kernel.api.ContainmentIndex;
027import org.fcrepo.kernel.api.Transaction;
028import org.fcrepo.kernel.api.TransactionState;
029import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
030import org.fcrepo.kernel.api.exception.TransactionClosedException;
031import org.fcrepo.kernel.api.exception.TransactionRuntimeException;
032import org.fcrepo.kernel.api.identifiers.FedoraId;
033import org.fcrepo.kernel.api.lock.ResourceLockManager;
034import org.fcrepo.kernel.api.observer.EventAccumulator;
035import org.fcrepo.kernel.api.services.MembershipService;
036import org.fcrepo.kernel.api.services.ReferenceService;
037import org.fcrepo.persistence.api.PersistentStorageSession;
038import org.fcrepo.search.api.SearchIndex;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * The Fedora Transaction implementation
044 *
045 * @author mohideen
046 */
047public class TransactionImpl implements Transaction {
048
049    private static final Logger log = LoggerFactory.getLogger(TransactionImpl.class);
050
051    private final String id;
052
053    private final TransactionManagerImpl txManager;
054
055    private TransactionState state;
056
057    private boolean shortLived = true;
058
059    private Instant expiration;
060
061    private boolean expired = false;
062
063    private String baseUri;
064
065    private String userAgent;
066
067    private final Duration sessionTimeout;
068
069    private final Phaser operationPhaser;
070
071    protected TransactionImpl(final String id,
072                              final TransactionManagerImpl txManager,
073                              final Duration sessionTimeout) {
074        if (id == null || id.isEmpty()) {
075            throw new IllegalArgumentException("Transaction id should not be empty!");
076        }
077        this.id = id;
078        this.txManager = txManager;
079        this.sessionTimeout = sessionTimeout;
080        this.expiration = Instant.now().plus(sessionTimeout);
081        this.state = TransactionState.OPEN;
082        this.operationPhaser = new Phaser();
083    }
084
085    @Override
086    public synchronized void commit() {
087        if (state == TransactionState.COMMITTED) {
088            return;
089        }
090        failIfNotOpen();
091        failIfExpired();
092
093        updateState(TransactionState.COMMITTING);
094
095        log.debug("Waiting for operations in transaction {} to complete before committing", id);
096
097        operationPhaser.register();
098        operationPhaser.awaitAdvance(operationPhaser.arriveAndDeregister());
099
100        log.debug("Committing transaction {}", id);
101
102        try {
103            if (isShortLived()) {
104                doCommitShortLived();
105            } else {
106                doCommitLongRunning();
107            }
108
109            updateState(TransactionState.COMMITTED);
110            this.getEventAccumulator().emitEvents(this, baseUri, userAgent);
111            releaseLocks();
112            log.debug("Committed transaction {}", id);
113        } catch (final Exception ex) {
114            log.error("Failed to commit transaction: {}", id, ex);
115
116            // Rollback on commit failure
117            rollback();
118            throw new RepositoryRuntimeException("Failed to commit transaction " + id, ex);
119        }
120    }
121
122    @Override
123    public boolean isCommitted() {
124        return state == TransactionState.COMMITTED;
125    }
126
127    @Override
128    public synchronized void rollback() {
129        if (state == TransactionState.ROLLEDBACK || state == TransactionState.ROLLINGBACK) {
130            return;
131        }
132
133        failIfCommitted();
134
135        log.info("Rolling back transaction {}", id);
136
137        updateState(TransactionState.ROLLINGBACK);
138
139        log.debug("Waiting for operations in transaction {} to complete before rolling back", id);
140
141        operationPhaser.register();
142        operationPhaser.awaitAdvance(operationPhaser.arriveAndDeregister());
143
144        execQuietly("Failed to rollback storage in transaction " + id, () -> {
145            this.getPersistentSession().rollback();
146        });
147        execQuietly("Failed to rollback index in transaction " + id, () -> {
148            this.getContainmentIndex().rollbackTransaction(this);
149        });
150        execQuietly("Failed to rollback reference index in transaction " + id, () -> {
151            this.getReferenceService().rollbackTransaction(this);
152        });
153        execQuietly("Failed to rollback membership index in transaction " + id, () -> {
154            this.getMembershipService().rollbackTransaction(this);
155        });
156        execQuietly("Failed to rollback search index in transaction " + id, () -> {
157            this.getSearchIndex().rollbackTransaction(this);
158        });
159
160        execQuietly("Failed to rollback events in transaction " + id, () -> {
161            this.getEventAccumulator().clearEvents(this);
162        });
163
164        updateState(TransactionState.ROLLEDBACK);
165
166        releaseLocks();
167    }
168
169    @Override
170    public void doInTx(final Runnable runnable) {
171        operationPhaser.register();
172
173        try {
174            failIfNotOpen();
175            failIfExpired();
176
177            runnable.run();
178        } finally {
179            operationPhaser.arriveAndDeregister();
180        }
181    }
182
183    @Override
184    public synchronized void fail() {
185        if (state != TransactionState.OPEN) {
186            log.error("Transaction {} is in state {} and may not be marked as FAILED", id, state);
187        } else {
188            updateState(TransactionState.FAILED);
189        }
190    }
191
192    @Override
193    public boolean isRolledBack() {
194        return state == TransactionState.ROLLEDBACK;
195    }
196
197    @Override
198    public String getId() {
199        return id;
200    }
201
202    @Override
203    public void setShortLived(final boolean shortLived) {
204        this.shortLived = shortLived;
205    }
206
207    @Override
208    public boolean isShortLived() {
209        return this.shortLived;
210    }
211
212    @Override
213    public boolean isOpenLongRunning() {
214        return !this.isShortLived() && !hasExpired()
215                && !(state == TransactionState.COMMITTED
216                || state == TransactionState.ROLLEDBACK
217                || state == TransactionState.FAILED);
218    }
219
220    @Override
221    public boolean isOpen() {
222        return state == TransactionState.OPEN && !hasExpired();
223    }
224
225    @Override
226    public void ensureCommitting() {
227        if (state != TransactionState.COMMITTING) {
228            throw new TransactionRuntimeException(
229                    String.format("Transaction %s must be in state COMMITTING, but was %s", id, state));
230        }
231    }
232
233    @Override
234    public boolean isReadOnly() {
235        return false;
236    }
237
238    @Override
239    public void expire() {
240        this.expiration = Instant.now();
241        this.expired = true;
242    }
243
244    @Override
245    public boolean hasExpired() {
246        if (this.expired) {
247            return true;
248        }
249        this.expired = this.expiration.isBefore(Instant.now());
250        return this.expired;
251    }
252
253    @Override
254    public synchronized Instant updateExpiry(final Duration amountToAdd) {
255        failIfExpired();
256        failIfCommitted();
257        failIfNotOpen();
258        this.expiration = this.expiration.plus(amountToAdd);
259        return this.expiration;
260    }
261
262    @Override
263    public Instant getExpires() {
264        return this.expiration;
265    }
266
267    @Override
268    public void commitIfShortLived() {
269       if (this.isShortLived()) {
270           this.commit();
271       }
272    }
273
274    @Override
275    public void refresh() {
276        updateExpiry(sessionTimeout);
277    }
278
279    @Override
280    public void lockResource(final FedoraId resourceId) {
281        getResourceLockManger().acquire(getId(), resourceId);
282    }
283
284    @Override
285    public void releaseResourceLocksIfShortLived() {
286        if (isShortLived()) {
287            releaseLocks();
288        }
289    }
290
291    @Override
292    public void setBaseUri(final String baseUri) {
293        this.baseUri = baseUri;
294    }
295
296    @Override
297    public void setUserAgent(final String userAgent) {
298        this.userAgent = userAgent;
299    }
300
301    private void doCommitShortLived() {
302        // short-lived txs do not write to tx tables and do not need to commit db indexes.
303        this.getPersistentSession().prepare();
304        this.getPersistentSession().commit();
305    }
306
307    private void doCommitLongRunning() {
308        getDbTransactionExecutor().doInTxWithRetry(() -> {
309            this.getContainmentIndex().commitTransaction(this);
310            this.getReferenceService().commitTransaction(this);
311            this.getMembershipService().commitTransaction(this);
312            this.getSearchIndex().commitTransaction(this);
313            this.getPersistentSession().prepare();
314            // The storage session must be committed last because mutable head changes cannot be rolled back.
315            // The db transaction will remain open until all changes have been written to OCFL. If the changes
316            // are large, or are going to S3, this could take some time. In which case, it is possible the
317            // db's connection timeout may need to be adjusted so that the connection is not closed while
318            // waiting for the OCFL changes to be committed.
319            this.getPersistentSession().commit();
320        });
321    }
322
323    private void updateState(final TransactionState newState) {
324        this.state = newState;
325    }
326
327    private PersistentStorageSession getPersistentSession() {
328        return this.txManager.getPersistentStorageSessionManager().getSession(this);
329    }
330
331    private void failIfExpired() {
332        if (hasExpired()) {
333            throw new TransactionClosedException("Transaction " + id + " expired!");
334        }
335    }
336
337    private void failIfCommitted() {
338        if (state == TransactionState.COMMITTED) {
339            throw new TransactionClosedException(
340                    String.format("Transaction %s cannot be transitioned because it is already committed!", id));
341        }
342    }
343
344    private void failIfNotOpen() {
345        if (state == TransactionState.FAILED) {
346            throw new TransactionRuntimeException(
347                    String.format("Transaction %s cannot be committed because it is in a failed state!", id));
348        } else if (state != TransactionState.OPEN) {
349            throw new TransactionClosedException(
350                    String.format("Transaction %s cannot be committed because it is in state %s!", id, state));
351        }
352    }
353
354    private void releaseLocks() {
355        execQuietly("Failed to release resource locks cleanly. You may need to restart Fedora.", () -> {
356            getResourceLockManger().releaseAll(getId());
357        });
358    }
359
360    /**
361     * Executes the closure, capturing all exceptions, and logging them as errors.
362     *
363     * @param failureMessage what to print if the closure fails
364     * @param callable closure to execute
365     */
366    private void execQuietly(final String failureMessage, final CheckedRunnable callable) {
367        try {
368            callable.run();
369        } catch (final Exception e) {
370            log.error(failureMessage, e);
371        }
372    }
373
374    private ContainmentIndex getContainmentIndex() {
375        return this.txManager.getContainmentIndex();
376    }
377
378    private EventAccumulator getEventAccumulator() {
379        return this.txManager.getEventAccumulator();
380    }
381
382    private ReferenceService getReferenceService() {
383        return this.txManager.getReferenceService();
384    }
385
386    private MembershipService getMembershipService() {
387        return this.txManager.getMembershipService();
388    }
389
390    private SearchIndex getSearchIndex() {
391        return this.txManager.getSearchIndex();
392    }
393
394    private DbTransactionExecutor getDbTransactionExecutor() {
395        return this.txManager.getDbTransactionExecutor();
396    }
397
398    private ResourceLockManager getResourceLockManger() {
399        return this.txManager.getResourceLockManager();
400    }
401
402    @Override
403    public String toString() {
404        return id;
405    }
406}