001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018/**
019 *
020 */
021
022package org.fcrepo.kernel.modeshape.services;
023
024import static com.google.common.collect.Maps.filterValues;
025import static java.lang.System.currentTimeMillis;
026import static org.slf4j.LoggerFactory.getLogger;
027
028import java.util.Map;
029import java.util.concurrent.ConcurrentHashMap;
030
031import javax.jcr.RepositoryException;
032import javax.jcr.Session;
033
034import com.google.common.collect.ImmutableSet;
035import org.fcrepo.kernel.api.Transaction;
036import org.fcrepo.kernel.api.TxSession;
037import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
038import org.fcrepo.kernel.api.exception.SessionMissingException;
039import org.fcrepo.kernel.api.services.TransactionService;
040import org.fcrepo.kernel.modeshape.TransactionImpl;
041
042import org.slf4j.Logger;
043import org.springframework.scheduling.annotation.Scheduled;
044import org.springframework.stereotype.Component;
045
046/**
047 * This is part of the strawman implementation for Fedora transactions This
048 * service implements a simple {@link Transaction} service which is able to
049 * create/commit/rollback {@link Transaction} objects A {@link Scheduled}
050 * annotation is used for removing timed out Transactions
051 *
052 * @author frank asseg
053 * @author ajs6f
054 */
055@Component
056public class TransactionServiceImpl extends AbstractService implements TransactionService {
057
058    private static final Logger LOGGER = getLogger(TransactionServiceImpl.class);
059
060    /**
061     * A key for looking up the transaction id in a session key-value pair
062     */
063    static final String FCREPO4_TX_ID = "fcrepo4.tx.id";
064
065    /**
066     * TODO since transactions have to be available on all nodes, they have to
067     * be either persisted or written to a distributed map or sth, not just this
068     * plain hashmap that follows
069     */
070    private static Map<String, Transaction> transactions = new ConcurrentHashMap<>();
071
072    public static final long REAP_INTERVAL = 1000;
073
074    /**
075     * Check if a session is possibly within a transaction
076     * @param session the session
077     * @return whether the session is possibly within a transaction
078     */
079    public static boolean isInTransaction(final Session session) {
080        try {
081            return ImmutableSet.copyOf(session.getNamespacePrefixes()).contains(FCREPO4_TX_ID);
082        } catch (final RepositoryException e) {
083            throw new RepositoryRuntimeException(e);
084        }
085    }
086
087    /**
088     * Every REAP_INTERVAL milliseconds, check for expired transactions. If the
089     * tx is expired, roll it back and remove it from the registry.
090     */
091    /*
092     * (non-Javadoc)
093     * @see
094     * org.fcrepo.kernel.api.services.TransactionService#removeAndRollbackExpired()
095     */
096    @Override
097    @Scheduled(fixedRate = REAP_INTERVAL)
098    public void removeAndRollbackExpired() {
099        synchronized (transactions) {
100            filterValues(transactions, tx -> tx.getExpires().getTime() <= currentTimeMillis())
101                    .forEach((key, tx) -> {
102                        try {
103                            tx.rollback();
104                        } catch (final RepositoryRuntimeException e) {
105                            LOGGER.error("Got exception rolling back expired transaction {}: {}", tx, e.getMessage());
106                        }
107                        transactions.remove(key);
108                    });
109        }
110    }
111
112    /**
113     * Create a new Transaction and add it to the currently open ones
114     *
115     * @param sess The session to use for this Transaction
116     * @return the {@link Transaction}
117     */
118    @Override
119    public Transaction beginTransaction(final Session sess, final String userName) {
120        final Transaction tx = new TransactionImpl(sess, userName);
121        final String txId = tx.getId();
122        transactions.put(txId, tx);
123        try {
124            sess.setNamespacePrefix(FCREPO4_TX_ID, txId);
125        } catch (final RepositoryException e) {
126            throw new RepositoryRuntimeException(e);
127        }
128        return tx;
129    }
130
131    @Override
132    public Transaction getTransaction(final String txId, final String userName) {
133        final Transaction tx = transactions.computeIfAbsent(txId, s -> {
134            throw new SessionMissingException("Transaction with id: " + s + " is not available");
135        });
136        if (!tx.isAssociatedWithUser(userName)) {
137            throw new SessionMissingException("Transaction with id " +
138                        txId + " is not available for user " + userName);
139        }
140        return tx;
141    }
142
143    /**
144     * Get the current Transaction for a session
145     *
146     * @param session the session
147     * @return the given session's current Transaction
148     * @throws SessionMissingException if transaction missing exception occurred
149     */
150    @Override
151    public Transaction getTransaction(final Session session) {
152        final String txId = getCurrentTransactionId(session);
153
154        if (txId == null) {
155            throw new SessionMissingException(
156                    "Transaction is not available");
157        }
158        return transactions.computeIfAbsent(txId, s -> {
159            throw new SessionMissingException("Transaction with id: " + s + " is not available");
160        });
161    }
162
163    /**
164     * Get the current Transaction ID for a session
165     *
166     * @param session the session
167     * @return the current Transaction ID for the given session
168     */
169    public static String getCurrentTransactionId(final Session session) {
170        try {
171            if (session instanceof TxSession) {
172                return ((TxSession) session).getTxId();
173            }
174            return session.getNamespaceURI(FCREPO4_TX_ID);
175        } catch (final RepositoryException e) {
176            LOGGER.trace("Unable to retrieve current transaction ID from session: {}", e.getMessage());
177            return null;
178        }
179    }
180
181    /**
182     * Check if a Transaction exists
183     *
184     * @param txid the Id of the {@link Transaction}
185     * @return the {@link Transaction}
186     */
187    @Override
188    public boolean exists(final String txid) {
189        return transactions.containsKey(txid);
190    }
191
192    /**
193     * Commit a {@link Transaction} with the given id
194     *
195     * @param txid the id of the {@link Transaction}
196     */
197    @Override
198    public Transaction commit(final String txid) {
199        final Transaction tx = transactions.remove(txid);
200        if (tx == null) {
201            throw new SessionMissingException("Transaction with id " + txid +
202                    " is not available");
203        }
204        tx.commit();
205        return tx;
206    }
207
208    /**
209     * Roll a {@link Transaction} back
210     *
211     * @param txid the id of the {@link Transaction}
212     * @return the {@link Transaction} object
213     */
214    @Override
215    public Transaction rollback(final String txid) {
216        final Transaction tx = transactions.remove(txid);
217        if (tx == null) {
218            throw new SessionMissingException("Transaction with id " + txid +
219                    " is not available");
220        }
221        tx.rollback();
222        return tx;
223    }
224
225}