001/** 002 * Copyright 2014 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016/** 017 * 018 */ 019 020package org.fcrepo.kernel.impl.services; 021 022import static java.lang.System.currentTimeMillis; 023import static org.slf4j.LoggerFactory.getLogger; 024 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.concurrent.ConcurrentHashMap; 029 030import javax.jcr.RepositoryException; 031import javax.jcr.Session; 032 033import com.google.common.collect.ImmutableSet; 034import org.fcrepo.kernel.Transaction; 035import org.fcrepo.kernel.exception.RepositoryRuntimeException; 036import org.fcrepo.kernel.impl.TransactionImpl; 037import org.fcrepo.kernel.TxSession; 038import org.fcrepo.kernel.exception.TransactionMissingException; 039import org.fcrepo.kernel.services.TransactionService; 040import org.slf4j.Logger; 041import org.springframework.scheduling.annotation.Scheduled; 042import org.springframework.stereotype.Component; 043 044/** 045 * This is part of the strawman implementation for Fedora transactions This 046 * service implements a simple {@link Transaction} service which is able to 047 * create/commit/rollback {@link Transaction} objects A {@link Scheduled} 048 * annotation is used for removing timed out Transactions 049 * 050 * @author frank asseg 051 */ 052@Component 053public class TransactionServiceImpl extends AbstractService implements TransactionService { 054 055 private static final Logger LOGGER = getLogger(TransactionService.class); 056 057 /** 058 * A key for looking up the transaction id in a session key-value pair 059 */ 060 static final String FCREPO4_TX_ID = "fcrepo4.tx.id"; 061 062 /** 063 * TODO since transactions have to be available on all nodes, they have to 064 * be either persisted or written to a distributed map or sth, not just this 065 * plain hashmap that follows 066 */ 067 private static Map<String, Transaction> transactions = new ConcurrentHashMap<>(); 068 069 public static final long REAP_INTERVAL = 1000; 070 071 /** 072 * Check if a session is possibly within a transaction 073 * @param session 074 * @return 075 */ 076 public static boolean isInTransaction(final Session session) { 077 try { 078 return ImmutableSet.copyOf(session.getNamespacePrefixes()).contains(FCREPO4_TX_ID); 079 } catch (final RepositoryException e) { 080 throw new RepositoryRuntimeException(e); 081 } 082 } 083 084 /** 085 * Every REAP_INTERVAL milliseconds, check for expired transactions. If the 086 * tx is expired, roll it back and remove it from the registry. 087 */ 088 /* 089 * (non-Javadoc) 090 * @see 091 * org.fcrepo.kernel.services.TransactionService#removeAndRollbackExpired() 092 */ 093 @Override 094 @Scheduled(fixedRate = REAP_INTERVAL) 095 public void removeAndRollbackExpired() { 096 synchronized (transactions) { 097 final Iterator<Entry<String, Transaction>> txs = 098 transactions.entrySet().iterator(); 099 while (txs.hasNext()) { 100 final Transaction tx = txs.next().getValue(); 101 if (tx.getExpires().getTime() <= currentTimeMillis()) { 102 try { 103 tx.rollback(); 104 } catch (final RepositoryRuntimeException e) { 105 LOGGER.error( 106 "Got exception rolling back expired" + 107 " transaction {}: {}", 108 tx, e); 109 } 110 txs.remove(); 111 } 112 } 113 } 114 } 115 116 /** 117 * Create a new Transaction and add it to the currently open ones 118 * 119 * @param sess The session to use for this Transaction 120 * @return the {@link Transaction} 121 */ 122 @Override 123 public Transaction beginTransaction(final Session sess, final String userName) { 124 final Transaction tx = new TransactionImpl(sess, userName); 125 final String txId = tx.getId(); 126 transactions.put(txId, tx); 127 try { 128 sess.setNamespacePrefix(FCREPO4_TX_ID, txId); 129 } catch (final RepositoryException e) { 130 throw new RepositoryRuntimeException(e); 131 } 132 return tx; 133 } 134 135 @Override 136 public Transaction getTransaction(final String txId, final String userName) 137 throws TransactionMissingException { 138 139 final Transaction tx = transactions.get(txId); 140 141 if (tx == null) { 142 throw new TransactionMissingException( 143 "Transaction is not available"); 144 } 145 146 if (!tx.isAssociatedWithUser(userName)) { 147 throw new TransactionMissingException("Transaction with id " + 148 txId + " is not available for user " + userName); 149 } 150 return tx; 151 } 152 153 /** 154 * Get the current Transaction for a session 155 * 156 * @param session 157 * @return the given session's current Transaction 158 * @throws TransactionMissingException 159 */ 160 @Override 161 public Transaction getTransaction(final Session session) 162 throws TransactionMissingException { 163 164 final String txId = getCurrentTransactionId(session); 165 166 if (txId == null) { 167 throw new TransactionMissingException( 168 "Transaction is not available"); 169 } 170 final Transaction tx = transactions.get(txId); 171 172 if (tx == null) { 173 throw new TransactionMissingException( 174 "Transaction is not available"); 175 } 176 177 return tx; 178 } 179 180 /** 181 * Get the current Transaction ID for a session 182 * 183 * @param session 184 * @return the current Transaction ID for the given session 185 */ 186 public static String getCurrentTransactionId(final Session session) { 187 try { 188 if (session instanceof TxSession) { 189 return ((TxSession) session).getTxId(); 190 } 191 return session.getNamespaceURI(FCREPO4_TX_ID); 192 } catch (final RepositoryException e) { 193 LOGGER.trace("Unable to retrieve current transaction ID from session", e); 194 return null; 195 } 196 } 197 198 /** 199 * Check if a Transaction exists 200 * 201 * @param txid the Id of the {@link Transaction} 202 * @return the {@link Transaction} 203 */ 204 @Override 205 public boolean exists(final String txid) { 206 return transactions.containsKey(txid); 207 } 208 209 /** 210 * Commit a {@link Transaction} with the given id 211 * 212 * @param txid the id of the {@link Transaction} 213 */ 214 @Override 215 public Transaction commit(final String txid) { 216 final Transaction tx = transactions.remove(txid); 217 if (tx == null) { 218 throw new TransactionMissingException("Transaction with id " + txid + 219 " is not available"); 220 } 221 tx.commit(); 222 return tx; 223 } 224 225 /** 226 * Roll a {@link Transaction} back 227 * 228 * @param txid the id of the {@link Transaction} 229 * @return the {@link Transaction} object 230 */ 231 @Override 232 public Transaction rollback(final String txid) { 233 final Transaction tx = transactions.remove(txid); 234 if (tx == null) { 235 throw new TransactionMissingException("Transaction with id " + txid + 236 " is not available"); 237 } 238 tx.rollback(); 239 return tx; 240 } 241 242}