001/*
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016/**
017 *
018 */
019
020package org.fcrepo.kernel.modeshape.services;
021
022import static com.google.common.collect.Maps.filterValues;
023import static java.lang.System.currentTimeMillis;
024import static org.slf4j.LoggerFactory.getLogger;
025
026import java.util.Map;
027import java.util.concurrent.ConcurrentHashMap;
028
029import javax.jcr.RepositoryException;
030import javax.jcr.Session;
031
032import com.google.common.collect.ImmutableSet;
033import org.fcrepo.kernel.api.Transaction;
034import org.fcrepo.kernel.api.TxSession;
035import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
036import org.fcrepo.kernel.api.exception.SessionMissingException;
037import org.fcrepo.kernel.api.services.TransactionService;
038import org.fcrepo.kernel.modeshape.TransactionImpl;
039
040import org.slf4j.Logger;
041import org.springframework.scheduling.annotation.Scheduled;
042import org.springframework.stereotype.Component;
043
044/**
045 * This is part of the strawman implementation for Fedora transactions This
046 * service implements a simple {@link Transaction} service which is able to
047 * create/commit/rollback {@link Transaction} objects A {@link Scheduled}
048 * annotation is used for removing timed out Transactions
049 *
050 * @author frank asseg
051 * @author ajs6f
052 */
053@Component
054public class TransactionServiceImpl extends AbstractService implements TransactionService {
055
056    private static final Logger LOGGER = getLogger(TransactionServiceImpl.class);
057
058    /**
059     * A key for looking up the transaction id in a session key-value pair
060     */
061    static final String FCREPO4_TX_ID = "fcrepo4.tx.id";
062
063    /**
064     * TODO since transactions have to be available on all nodes, they have to
065     * be either persisted or written to a distributed map or sth, not just this
066     * plain hashmap that follows
067     */
068    private static Map<String, Transaction> transactions = new ConcurrentHashMap<>();
069
070    public static final long REAP_INTERVAL = 1000;
071
072    /**
073     * Check if a session is possibly within a transaction
074     * @param session the session
075     * @return whether the session is possibly within a transaction
076     */
077    public static boolean isInTransaction(final Session session) {
078        try {
079            return ImmutableSet.copyOf(session.getNamespacePrefixes()).contains(FCREPO4_TX_ID);
080        } catch (final RepositoryException e) {
081            throw new RepositoryRuntimeException(e);
082        }
083    }
084
085    /**
086     * Every REAP_INTERVAL milliseconds, check for expired transactions. If the
087     * tx is expired, roll it back and remove it from the registry.
088     */
089    /*
090     * (non-Javadoc)
091     * @see
092     * org.fcrepo.kernel.api.services.TransactionService#removeAndRollbackExpired()
093     */
094    @Override
095    @Scheduled(fixedRate = REAP_INTERVAL)
096    public void removeAndRollbackExpired() {
097        synchronized (transactions) {
098            filterValues(transactions, tx -> tx.getExpires().getTime() <= currentTimeMillis())
099                    .forEach((key, tx) -> {
100                        try {
101                            tx.rollback();
102                        } catch (final RepositoryRuntimeException e) {
103                            LOGGER.error("Got exception rolling back expired transaction {}: {}", tx, e.getMessage());
104                        }
105                        transactions.remove(key);
106                    });
107        }
108    }
109
110    /**
111     * Create a new Transaction and add it to the currently open ones
112     *
113     * @param sess The session to use for this Transaction
114     * @return the {@link Transaction}
115     */
116    @Override
117    public Transaction beginTransaction(final Session sess, final String userName) {
118        final Transaction tx = new TransactionImpl(sess, userName);
119        final String txId = tx.getId();
120        transactions.put(txId, tx);
121        try {
122            sess.setNamespacePrefix(FCREPO4_TX_ID, txId);
123        } catch (final RepositoryException e) {
124            throw new RepositoryRuntimeException(e);
125        }
126        return tx;
127    }
128
129    @Override
130    public Transaction getTransaction(final String txId, final String userName) {
131        final Transaction tx = transactions.computeIfAbsent(txId, s -> {
132            throw new SessionMissingException("Transaction with id: " + s + " is not available");
133        });
134        if (!tx.isAssociatedWithUser(userName)) {
135            throw new SessionMissingException("Transaction with id " +
136                        txId + " is not available for user " + userName);
137        }
138        return tx;
139    }
140
141    /**
142     * Get the current Transaction for a session
143     *
144     * @param session the session
145     * @return the given session's current Transaction
146     * @throws SessionMissingException if transaction missing exception occurred
147     */
148    @Override
149    public Transaction getTransaction(final Session session) {
150        final String txId = getCurrentTransactionId(session);
151
152        if (txId == null) {
153            throw new SessionMissingException(
154                    "Transaction is not available");
155        }
156        return transactions.computeIfAbsent(txId, s -> {
157            throw new SessionMissingException("Transaction with id: " + s + " is not available");
158        });
159    }
160
161    /**
162     * Get the current Transaction ID for a session
163     *
164     * @param session the session
165     * @return the current Transaction ID for the given session
166     */
167    public static String getCurrentTransactionId(final Session session) {
168        try {
169            if (session instanceof TxSession) {
170                return ((TxSession) session).getTxId();
171            }
172            return session.getNamespaceURI(FCREPO4_TX_ID);
173        } catch (final RepositoryException e) {
174            LOGGER.trace("Unable to retrieve current transaction ID from session: {}", e.getMessage());
175            return null;
176        }
177    }
178
179    /**
180     * Check if a Transaction exists
181     *
182     * @param txid the Id of the {@link Transaction}
183     * @return the {@link Transaction}
184     */
185    @Override
186    public boolean exists(final String txid) {
187        return transactions.containsKey(txid);
188    }
189
190    /**
191     * Commit a {@link Transaction} with the given id
192     *
193     * @param txid the id of the {@link Transaction}
194     */
195    @Override
196    public Transaction commit(final String txid) {
197        final Transaction tx = transactions.remove(txid);
198        if (tx == null) {
199            throw new SessionMissingException("Transaction with id " + txid +
200                    " is not available");
201        }
202        tx.commit();
203        return tx;
204    }
205
206    /**
207     * Roll a {@link Transaction} back
208     *
209     * @param txid the id of the {@link Transaction}
210     * @return the {@link Transaction} object
211     */
212    @Override
213    public Transaction rollback(final String txid) {
214        final Transaction tx = transactions.remove(txid);
215        if (tx == null) {
216            throw new SessionMissingException("Transaction with id " + txid +
217                    " is not available");
218        }
219        tx.rollback();
220        return tx;
221    }
222
223}