001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016/** 017 * 018 */ 019 020package org.fcrepo.kernel.impl.services; 021 022import static java.lang.System.currentTimeMillis; 023import static org.slf4j.LoggerFactory.getLogger; 024 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.concurrent.ConcurrentHashMap; 029 030import javax.jcr.RepositoryException; 031import javax.jcr.Session; 032 033import com.google.common.collect.ImmutableSet; 034import org.fcrepo.kernel.Transaction; 035import org.fcrepo.kernel.exception.RepositoryRuntimeException; 036import org.fcrepo.kernel.impl.TransactionImpl; 037import org.fcrepo.kernel.TxSession; 038import org.fcrepo.kernel.exception.TransactionMissingException; 039import org.fcrepo.kernel.services.TransactionService; 040import org.slf4j.Logger; 041import org.springframework.scheduling.annotation.Scheduled; 042import org.springframework.stereotype.Component; 043 044/** 045 * This is part of the strawman implementation for Fedora transactions This 046 * service implements a simple {@link Transaction} service which is able to 047 * create/commit/rollback {@link Transaction} objects A {@link Scheduled} 048 * annotation is used for removing timed out Transactions 049 * 050 * @author frank asseg 051 */ 052@Component 053public class TransactionServiceImpl extends AbstractService implements TransactionService { 054 055 private static final Logger LOGGER = getLogger(TransactionService.class); 056 057 /** 058 * A key for looking up the transaction id in a session key-value pair 059 */ 060 static final String FCREPO4_TX_ID = "fcrepo4.tx.id"; 061 062 /** 063 * TODO since transactions have to be available on all nodes, they have to 064 * be either persisted or written to a distributed map or sth, not just this 065 * plain hashmap that follows 066 */ 067 private static Map<String, Transaction> transactions = new ConcurrentHashMap<>(); 068 069 public static final long REAP_INTERVAL = 1000; 070 071 /** 072 * Check if a session is possibly within a transaction 073 * @param session the session 074 * @return whether the session is possibly within a transaction 075 */ 076 public static boolean isInTransaction(final Session session) { 077 try { 078 return ImmutableSet.copyOf(session.getNamespacePrefixes()).contains(FCREPO4_TX_ID); 079 } catch (final RepositoryException e) { 080 throw new RepositoryRuntimeException(e); 081 } 082 } 083 084 /** 085 * Every REAP_INTERVAL milliseconds, check for expired transactions. If the 086 * tx is expired, roll it back and remove it from the registry. 087 */ 088 /* 089 * (non-Javadoc) 090 * @see 091 * org.fcrepo.kernel.services.TransactionService#removeAndRollbackExpired() 092 */ 093 @Override 094 @Scheduled(fixedRate = REAP_INTERVAL) 095 public void removeAndRollbackExpired() { 096 synchronized (transactions) { 097 final Iterator<Entry<String, Transaction>> txs = 098 transactions.entrySet().iterator(); 099 while (txs.hasNext()) { 100 final Transaction tx = txs.next().getValue(); 101 if (tx.getExpires().getTime() <= currentTimeMillis()) { 102 try { 103 tx.rollback(); 104 } catch (final RepositoryRuntimeException e) { 105 LOGGER.error( 106 "Got exception rolling back expired" + 107 " transaction {}: {}", 108 tx, e); 109 } 110 txs.remove(); 111 } 112 } 113 } 114 } 115 116 /** 117 * Create a new Transaction and add it to the currently open ones 118 * 119 * @param sess The session to use for this Transaction 120 * @return the {@link Transaction} 121 */ 122 @Override 123 public Transaction beginTransaction(final Session sess, final String userName) { 124 final Transaction tx = new TransactionImpl(sess, userName); 125 final String txId = tx.getId(); 126 transactions.put(txId, tx); 127 try { 128 sess.setNamespacePrefix(FCREPO4_TX_ID, txId); 129 } catch (final RepositoryException e) { 130 throw new RepositoryRuntimeException(e); 131 } 132 return tx; 133 } 134 135 @Override 136 public Transaction getTransaction(final String txId, final String userName) { 137 final Transaction tx = transactions.get(txId); 138 139 if (tx == null) { 140 throw new TransactionMissingException( 141 "Transaction is not available"); 142 } 143 144 if (!tx.isAssociatedWithUser(userName)) { 145 throw new TransactionMissingException("Transaction with id " + 146 txId + " is not available for user " + userName); 147 } 148 return tx; 149 } 150 151 /** 152 * Get the current Transaction for a session 153 * 154 * @param session the session 155 * @return the given session's current Transaction 156 * @throws TransactionMissingException if transaction missing exception occurred 157 */ 158 @Override 159 public Transaction getTransaction(final Session session) { 160 final String txId = getCurrentTransactionId(session); 161 162 if (txId == null) { 163 throw new TransactionMissingException( 164 "Transaction is not available"); 165 } 166 final Transaction tx = transactions.get(txId); 167 168 if (tx == null) { 169 throw new TransactionMissingException( 170 "Transaction is not available"); 171 } 172 173 return tx; 174 } 175 176 /** 177 * Get the current Transaction ID for a session 178 * 179 * @param session the session 180 * @return the current Transaction ID for the given session 181 */ 182 public static String getCurrentTransactionId(final Session session) { 183 try { 184 if (session instanceof TxSession) { 185 return ((TxSession) session).getTxId(); 186 } 187 return session.getNamespaceURI(FCREPO4_TX_ID); 188 } catch (final RepositoryException e) { 189 LOGGER.trace("Unable to retrieve current transaction ID from session", e); 190 return null; 191 } 192 } 193 194 /** 195 * Check if a Transaction exists 196 * 197 * @param txid the Id of the {@link Transaction} 198 * @return the {@link Transaction} 199 */ 200 @Override 201 public boolean exists(final String txid) { 202 return transactions.containsKey(txid); 203 } 204 205 /** 206 * Commit a {@link Transaction} with the given id 207 * 208 * @param txid the id of the {@link Transaction} 209 */ 210 @Override 211 public Transaction commit(final String txid) { 212 final Transaction tx = transactions.remove(txid); 213 if (tx == null) { 214 throw new TransactionMissingException("Transaction with id " + txid + 215 " is not available"); 216 } 217 tx.commit(); 218 return tx; 219 } 220 221 /** 222 * Roll a {@link Transaction} back 223 * 224 * @param txid the id of the {@link Transaction} 225 * @return the {@link Transaction} object 226 */ 227 @Override 228 public Transaction rollback(final String txid) { 229 final Transaction tx = transactions.remove(txid); 230 if (tx == null) { 231 throw new TransactionMissingException("Transaction with id " + txid + 232 " is not available"); 233 } 234 tx.rollback(); 235 return tx; 236 } 237 238}