001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.kernel.impl;
019
020import org.fcrepo.common.db.DbTransactionExecutor;
021import org.fcrepo.config.FedoraPropsConfig;
022import org.fcrepo.kernel.api.ContainmentIndex;
023import org.fcrepo.kernel.api.Transaction;
024import org.fcrepo.kernel.api.TransactionManager;
025import org.fcrepo.kernel.api.exception.TransactionClosedException;
026import org.fcrepo.kernel.api.exception.TransactionNotFoundException;
027import org.fcrepo.kernel.api.lock.ResourceLockManager;
028import org.fcrepo.kernel.api.observer.EventAccumulator;
029import org.fcrepo.kernel.api.services.MembershipService;
030import org.fcrepo.kernel.api.services.ReferenceService;
031import org.fcrepo.persistence.api.PersistentStorageSessionManager;
032import org.fcrepo.search.api.SearchIndex;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.springframework.beans.factory.annotation.Autowired;
036import org.springframework.beans.factory.annotation.Qualifier;
037import org.springframework.scheduling.annotation.Scheduled;
038import org.springframework.stereotype.Component;
039
040import javax.inject.Inject;
041import java.util.Map;
042import java.util.concurrent.ConcurrentHashMap;
043
044import static java.util.UUID.randomUUID;
045
046/**
047 * The Fedora Transaction Manager implementation
048 *
049 * @author mohideen
050 */
051@Component
052public class TransactionManagerImpl implements TransactionManager {
053
054    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionManagerImpl.class);
055
056    private final Map<String, Transaction> transactions;
057
058    @Autowired
059    @Qualifier("containmentIndex")
060    private ContainmentIndex containmentIndex;
061
062    @Inject
063    private PersistentStorageSessionManager pSessionManager;
064
065    @Inject
066    private EventAccumulator eventAccumulator;
067
068    @Autowired
069    @Qualifier("referenceService")
070    private ReferenceService referenceService;
071
072    @Inject
073    private MembershipService membershipService;
074
075    @Inject
076    @Qualifier("searchIndex")
077    private SearchIndex searchIndex;
078
079    @Inject
080    private ResourceLockManager resourceLockManager;
081
082    @Inject
083    private FedoraPropsConfig fedoraPropsConfig;
084
085    @Inject
086    private DbTransactionExecutor dbTransactionExecutor;
087
088    TransactionManagerImpl() {
089        transactions = new ConcurrentHashMap<>();
090    }
091
092    /**
093     * Periodically scan for closed transactions for cleanup
094     */
095    @Scheduled(fixedDelayString = "#{fedoraPropsConfig.sessionTimeout}")
096    public void cleanupClosedTransactions() {
097        LOGGER.debug("Cleaning up expired transactions");
098
099        final var txIt = transactions.entrySet().iterator();
100        while (txIt.hasNext()) {
101            final var txEntry = txIt.next();
102            final var tx = txEntry.getValue();
103
104            // Cleanup if transaction is closed and past its expiration time
105            if (tx.isCommitted() || tx.isRolledBack()) {
106                if (tx.hasExpired()) {
107                    txIt.remove();
108                }
109            } else if (tx.hasExpired()) {
110                LOGGER.debug("Rolling back expired transaction {}", tx.getId());
111                try {
112                    // If the tx has expired but is not already closed, then rollback
113                    // but don't immediately remove it from the list of transactions
114                    // so that the rolled back status can be checked
115                    tx.rollback();
116                } catch (final RuntimeException e) {
117                    LOGGER.error("Failed to rollback expired transaction {}", tx.getId(), e);
118                }
119            }
120
121            if (tx.hasExpired()) {
122                // By this point the session as already been committed or rolledback by the transaction
123                pSessionManager.removeSession(tx.getId());
124            }
125        }
126    }
127
128    @Override
129    public synchronized Transaction create() {
130        String txId = randomUUID().toString();
131        while (transactions.containsKey(txId)) {
132            txId = randomUUID().toString();
133        }
134        final Transaction tx = new TransactionImpl(txId, this, fedoraPropsConfig.getSessionTimeout());
135        transactions.put(txId, tx);
136        return tx;
137    }
138
139    @Override
140    public Transaction get(final String transactionId) {
141        if (transactions.containsKey(transactionId)) {
142            final Transaction transaction = transactions.get(transactionId);
143            if (transaction.hasExpired()) {
144                transaction.rollback();
145                throw new TransactionClosedException("Transaction with transactionId: " + transactionId +
146                    " expired at " + transaction.getExpires() + "!");
147            }
148            if (transaction.isCommitted()) {
149                throw new TransactionClosedException("Transaction with transactionId: " + transactionId +
150                        " has already been committed.");
151            }
152            if (transaction.isRolledBack()) {
153                throw new TransactionClosedException("Transaction with transactionId: " + transactionId +
154                        " has already been rolled back.");
155            }
156            return transaction;
157        } else {
158            throw new TransactionNotFoundException("No Transaction found with transactionId: " + transactionId);
159        }
160    }
161
162    protected PersistentStorageSessionManager getPersistentStorageSessionManager() {
163        return pSessionManager;
164    }
165
166    protected ContainmentIndex getContainmentIndex() {
167        return containmentIndex;
168    }
169
170    protected SearchIndex getSearchIndex() {
171        return searchIndex;
172    }
173
174
175    protected EventAccumulator getEventAccumulator() {
176        return eventAccumulator;
177    }
178
179    protected ReferenceService getReferenceService() {
180        return referenceService;
181    }
182
183    protected MembershipService getMembershipService() {
184        return membershipService;
185    }
186
187    protected ResourceLockManager getResourceLockManager() {
188        return resourceLockManager;
189    }
190
191    public DbTransactionExecutor getDbTransactionExecutor() {
192        return dbTransactionExecutor;
193    }
194}