001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018/** 019 * 020 */ 021 022package org.fcrepo.kernel.modeshape.services; 023 024import static com.google.common.collect.Maps.filterValues; 025import static java.lang.System.currentTimeMillis; 026import static org.slf4j.LoggerFactory.getLogger; 027 028import java.util.Map; 029import java.util.concurrent.ConcurrentHashMap; 030 031import javax.jcr.RepositoryException; 032import javax.jcr.Session; 033 034import com.google.common.collect.ImmutableSet; 035import org.fcrepo.kernel.api.Transaction; 036import org.fcrepo.kernel.api.TxSession; 037import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 038import org.fcrepo.kernel.api.exception.SessionMissingException; 039import org.fcrepo.kernel.api.services.TransactionService; 040import org.fcrepo.kernel.modeshape.TransactionImpl; 041 042import org.slf4j.Logger; 043import org.springframework.scheduling.annotation.Scheduled; 044import org.springframework.stereotype.Component; 045 046/** 047 * This is part of the strawman implementation for Fedora transactions This 048 * service implements a simple {@link Transaction} service which is able to 049 * create/commit/rollback {@link Transaction} objects A {@link Scheduled} 050 * annotation is used for removing timed out Transactions 051 * 052 * @author frank asseg 053 * @author ajs6f 054 */ 055@Component 056public class TransactionServiceImpl extends AbstractService implements TransactionService { 057 058 private static final Logger LOGGER = getLogger(TransactionServiceImpl.class); 059 060 /** 061 * A key for looking up the transaction id in a session key-value pair 062 */ 063 static final String FCREPO4_TX_ID = "fcrepo4.tx.id"; 064 065 /** 066 * TODO since transactions have to be available on all nodes, they have to 067 * be either persisted or written to a distributed map or sth, not just this 068 * plain hashmap that follows 069 */ 070 private static Map<String, Transaction> transactions = new ConcurrentHashMap<>(); 071 072 public static final long REAP_INTERVAL = 1000; 073 074 /** 075 * Check if a session is possibly within a transaction 076 * @param session the session 077 * @return whether the session is possibly within a transaction 078 */ 079 public static boolean isInTransaction(final Session session) { 080 try { 081 return ImmutableSet.copyOf(session.getNamespacePrefixes()).contains(FCREPO4_TX_ID); 082 } catch (final RepositoryException e) { 083 throw new RepositoryRuntimeException(e); 084 } 085 } 086 087 /** 088 * Every REAP_INTERVAL milliseconds, check for expired transactions. If the 089 * tx is expired, roll it back and remove it from the registry. 090 */ 091 /* 092 * (non-Javadoc) 093 * @see 094 * org.fcrepo.kernel.api.services.TransactionService#removeAndRollbackExpired() 095 */ 096 @Override 097 @Scheduled(fixedRate = REAP_INTERVAL) 098 public void removeAndRollbackExpired() { 099 synchronized (transactions) { 100 filterValues(transactions, tx -> tx.getExpires().getTime() <= currentTimeMillis()) 101 .forEach((key, tx) -> { 102 try { 103 tx.rollback(); 104 } catch (final RepositoryRuntimeException e) { 105 LOGGER.error("Got exception rolling back expired transaction {}: {}", tx, e.getMessage()); 106 } 107 transactions.remove(key); 108 }); 109 } 110 } 111 112 /** 113 * Create a new Transaction and add it to the currently open ones 114 * 115 * @param sess The session to use for this Transaction 116 * @return the {@link Transaction} 117 */ 118 @Override 119 public Transaction beginTransaction(final Session sess, final String userName) { 120 final Transaction tx = new TransactionImpl(sess, userName); 121 final String txId = tx.getId(); 122 transactions.put(txId, tx); 123 try { 124 sess.setNamespacePrefix(FCREPO4_TX_ID, txId); 125 } catch (final RepositoryException e) { 126 throw new RepositoryRuntimeException(e); 127 } 128 return tx; 129 } 130 131 @Override 132 public Transaction getTransaction(final String txId, final String userName) { 133 final Transaction tx = transactions.computeIfAbsent(txId, s -> { 134 throw new SessionMissingException("Transaction with id: " + s + " is not available"); 135 }); 136 if (!tx.isAssociatedWithUser(userName)) { 137 throw new SessionMissingException("Transaction with id " + 138 txId + " is not available for user " + userName); 139 } 140 return tx; 141 } 142 143 /** 144 * Get the current Transaction for a session 145 * 146 * @param session the session 147 * @return the given session's current Transaction 148 * @throws SessionMissingException if transaction missing exception occurred 149 */ 150 @Override 151 public Transaction getTransaction(final Session session) { 152 final String txId = getCurrentTransactionId(session); 153 154 if (txId == null) { 155 throw new SessionMissingException( 156 "Transaction is not available"); 157 } 158 return transactions.computeIfAbsent(txId, s -> { 159 throw new SessionMissingException("Transaction with id: " + s + " is not available"); 160 }); 161 } 162 163 /** 164 * Get the current Transaction ID for a session 165 * 166 * @param session the session 167 * @return the current Transaction ID for the given session 168 */ 169 public static String getCurrentTransactionId(final Session session) { 170 try { 171 if (session instanceof TxSession) { 172 return ((TxSession) session).getTxId(); 173 } 174 return session.getNamespaceURI(FCREPO4_TX_ID); 175 } catch (final RepositoryException e) { 176 LOGGER.trace("Unable to retrieve current transaction ID from session: {}", e.getMessage()); 177 return null; 178 } 179 } 180 181 /** 182 * Check if a Transaction exists 183 * 184 * @param txid the Id of the {@link Transaction} 185 * @return the {@link Transaction} 186 */ 187 @Override 188 public boolean exists(final String txid) { 189 return transactions.containsKey(txid); 190 } 191 192 /** 193 * Commit a {@link Transaction} with the given id 194 * 195 * @param txid the id of the {@link Transaction} 196 */ 197 @Override 198 public Transaction commit(final String txid) { 199 final Transaction tx = transactions.remove(txid); 200 if (tx == null) { 201 throw new SessionMissingException("Transaction with id " + txid + 202 " is not available"); 203 } 204 tx.commit(); 205 return tx; 206 } 207 208 /** 209 * Roll a {@link Transaction} back 210 * 211 * @param txid the id of the {@link Transaction} 212 * @return the {@link Transaction} object 213 */ 214 @Override 215 public Transaction rollback(final String txid) { 216 final Transaction tx = transactions.remove(txid); 217 if (tx == null) { 218 throw new SessionMissingException("Transaction with id " + txid + 219 " is not available"); 220 } 221 tx.rollback(); 222 return tx; 223 } 224 225}