001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016/** 017 * 018 */ 019 020package org.fcrepo.kernel.modeshape.services; 021 022import static com.google.common.collect.Maps.filterValues; 023import static java.lang.System.currentTimeMillis; 024import static org.slf4j.LoggerFactory.getLogger; 025 026import java.util.Map; 027import java.util.concurrent.ConcurrentHashMap; 028 029import javax.jcr.RepositoryException; 030import javax.jcr.Session; 031 032import com.google.common.collect.ImmutableSet; 033import org.fcrepo.kernel.api.Transaction; 034import org.fcrepo.kernel.api.TxSession; 035import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 036import org.fcrepo.kernel.api.exception.TransactionMissingException; 037import org.fcrepo.kernel.api.services.TransactionService; 038import org.fcrepo.kernel.modeshape.TransactionImpl; 039 040import org.slf4j.Logger; 041import org.springframework.scheduling.annotation.Scheduled; 042import org.springframework.stereotype.Component; 043 044/** 045 * This is part of the strawman implementation for Fedora transactions This 046 * service implements a simple {@link Transaction} service which is able to 047 * create/commit/rollback {@link Transaction} objects A {@link Scheduled} 048 * annotation is used for removing timed out Transactions 049 * 050 * @author frank asseg 051 * @author ajs6f 052 */ 053@Component 054public class TransactionServiceImpl extends AbstractService implements TransactionService { 055 056 private static final Logger LOGGER = getLogger(TransactionServiceImpl.class); 057 058 /** 059 * A key for looking up the transaction id in a session key-value pair 060 */ 061 static final String FCREPO4_TX_ID = "fcrepo4.tx.id"; 062 063 /** 064 * TODO since transactions have to be available on all nodes, they have to 065 * be either persisted or written to a distributed map or sth, not just this 066 * plain hashmap that follows 067 */ 068 private static Map<String, Transaction> transactions = new ConcurrentHashMap<>(); 069 070 public static final long REAP_INTERVAL = 1000; 071 072 /** 073 * Check if a session is possibly within a transaction 074 * @param session the session 075 * @return whether the session is possibly within a transaction 076 */ 077 public static boolean isInTransaction(final Session session) { 078 try { 079 return ImmutableSet.copyOf(session.getNamespacePrefixes()).contains(FCREPO4_TX_ID); 080 } catch (final RepositoryException e) { 081 throw new RepositoryRuntimeException(e); 082 } 083 } 084 085 /** 086 * Every REAP_INTERVAL milliseconds, check for expired transactions. If the 087 * tx is expired, roll it back and remove it from the registry. 088 */ 089 /* 090 * (non-Javadoc) 091 * @see 092 * org.fcrepo.kernel.api.services.TransactionService#removeAndRollbackExpired() 093 */ 094 @Override 095 @Scheduled(fixedRate = REAP_INTERVAL) 096 public void removeAndRollbackExpired() { 097 synchronized (transactions) { 098 filterValues(transactions, tx -> tx.getExpires().getTime() <= currentTimeMillis()) 099 .forEach((key, tx) -> { 100 try { 101 tx.rollback(); 102 } catch (final RepositoryRuntimeException e) { 103 LOGGER.error("Got exception rolling back expired transaction {}: {}", tx, e.getMessage()); 104 } 105 transactions.remove(key); 106 }); 107 } 108 } 109 110 /** 111 * Create a new Transaction and add it to the currently open ones 112 * 113 * @param sess The session to use for this Transaction 114 * @return the {@link Transaction} 115 */ 116 @Override 117 public Transaction beginTransaction(final Session sess, final String userName) { 118 final Transaction tx = new TransactionImpl(sess, userName); 119 final String txId = tx.getId(); 120 transactions.put(txId, tx); 121 try { 122 sess.setNamespacePrefix(FCREPO4_TX_ID, txId); 123 } catch (final RepositoryException e) { 124 throw new RepositoryRuntimeException(e); 125 } 126 return tx; 127 } 128 129 @Override 130 public Transaction getTransaction(final String txId, final String userName) { 131 final Transaction tx = transactions.computeIfAbsent(txId, s -> { 132 throw new TransactionMissingException("Transaction with id: " + s + " is not available"); 133 }); 134 if (!tx.isAssociatedWithUser(userName)) { 135 throw new TransactionMissingException("Transaction with id " + 136 txId + " is not available for user " + userName); 137 } 138 return tx; 139 } 140 141 /** 142 * Get the current Transaction for a session 143 * 144 * @param session the session 145 * @return the given session's current Transaction 146 * @throws TransactionMissingException if transaction missing exception occurred 147 */ 148 @Override 149 public Transaction getTransaction(final Session session) { 150 final String txId = getCurrentTransactionId(session); 151 152 if (txId == null) { 153 throw new TransactionMissingException( 154 "Transaction is not available"); 155 } 156 return transactions.computeIfAbsent(txId, s -> { 157 throw new TransactionMissingException("Transaction with id: " + s + " is not available"); 158 }); 159 } 160 161 /** 162 * Get the current Transaction ID for a session 163 * 164 * @param session the session 165 * @return the current Transaction ID for the given session 166 */ 167 public static String getCurrentTransactionId(final Session session) { 168 try { 169 if (session instanceof TxSession) { 170 return ((TxSession) session).getTxId(); 171 } 172 return session.getNamespaceURI(FCREPO4_TX_ID); 173 } catch (final RepositoryException e) { 174 LOGGER.trace("Unable to retrieve current transaction ID from session: {}", e.getMessage()); 175 return null; 176 } 177 } 178 179 /** 180 * Check if a Transaction exists 181 * 182 * @param txid the Id of the {@link Transaction} 183 * @return the {@link Transaction} 184 */ 185 @Override 186 public boolean exists(final String txid) { 187 return transactions.containsKey(txid); 188 } 189 190 /** 191 * Commit a {@link Transaction} with the given id 192 * 193 * @param txid the id of the {@link Transaction} 194 */ 195 @Override 196 public Transaction commit(final String txid) { 197 final Transaction tx = transactions.remove(txid); 198 if (tx == null) { 199 throw new TransactionMissingException("Transaction with id " + txid + 200 " is not available"); 201 } 202 tx.commit(); 203 return tx; 204 } 205 206 /** 207 * Roll a {@link Transaction} back 208 * 209 * @param txid the id of the {@link Transaction} 210 * @return the {@link Transaction} object 211 */ 212 @Override 213 public Transaction rollback(final String txid) { 214 final Transaction tx = transactions.remove(txid); 215 if (tx == null) { 216 throw new TransactionMissingException("Transaction with id " + txid + 217 " is not available"); 218 } 219 tx.rollback(); 220 return tx; 221 } 222 223}