001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018/** 019 * 020 */ 021 022package org.fcrepo.kernel.modeshape.services; 023 024import static java.time.Instant.now; 025import static java.util.stream.Collectors.toSet; 026import static com.google.common.base.Strings.nullToEmpty; 027import static org.fcrepo.kernel.modeshape.FedoraSessionImpl.operationTimeout; 028import static org.slf4j.LoggerFactory.getLogger; 029 030import java.util.Map; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033 034import org.fcrepo.kernel.api.FedoraSession; 035import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 036import org.fcrepo.kernel.api.exception.SessionMissingException; 037import org.fcrepo.kernel.api.services.BatchService; 038 039import com.google.common.annotations.VisibleForTesting; 040import org.slf4j.Logger; 041import org.springframework.scheduling.annotation.Scheduled; 042import org.springframework.stereotype.Component; 043 044/** 045 * This is part of the strawman implementation for Fedora batch operations This 046 * service implements a simple {@link FedoraSession} service which is able to 047 * create/commit/rollback {@link FedoraSession} objects. A {@link Scheduled} 048 * annotation is used for removing timed out operations 049 * 050 * @author frank asseg 051 * @author ajs6f 052 * @author acoburn 053 */ 054@Component 055public class BatchServiceImpl extends AbstractService implements BatchService { 056 057 private static final Logger LOGGER = getLogger(BatchServiceImpl.class); 058 059 /** 060 * TODO since sessions have to be available on all nodes, they have to 061 * be either persisted or written to a distributed map or sth, not just this 062 * plain hashmap that follows 063 */ 064 private static Map<String, FedoraSession> sessions = new ConcurrentHashMap<>(); 065 066 @VisibleForTesting 067 public static final long REAP_INTERVAL = 1000; 068 069 /** 070 * Every REAP_INTERVAL milliseconds, check for expired sessions. If the 071 * tx is expired, roll it back and remove it from the registry. 072 */ 073 @Override 074 @Scheduled(fixedRate = REAP_INTERVAL) 075 public void removeExpired() { 076 final Set<String> reapable = sessions.entrySet().stream() 077 .filter(e -> e.getValue().getExpires().isPresent()) 078 .filter(e -> e.getValue().getExpires().get().isBefore(now())) 079 .map(Map.Entry::getKey).collect(toSet()); 080 reapable.forEach(key -> { 081 final FedoraSession s = sessions.get(key); 082 if (s != null) { 083 try { 084 s.expire(); 085 } catch (final RepositoryRuntimeException e) { 086 LOGGER.error("Got exception rolling back expired session {}: {}", s, e.getMessage()); 087 } 088 } 089 sessions.remove(key); 090 }); 091 } 092 093 @Override 094 public void begin(final FedoraSession session, final String username) { 095 sessions.put(getTxKey(session.getId(), username), session); 096 } 097 098 @Override 099 public FedoraSession getSession(final String sessionId, final String username) { 100 final FedoraSession session = sessions.get(getTxKey(sessionId, username)); 101 if (session == null) { 102 throw new SessionMissingException("Batch session with id: " + sessionId + " is not available"); 103 } 104 return session; 105 } 106 107 @Override 108 public boolean exists(final String sessionId, final String username) { 109 return sessions.containsKey(getTxKey(sessionId, username)); 110 } 111 112 @Override 113 public void commit(final String sessionId, final String username) { 114 final FedoraSession session = getSession(sessionId, username); 115 session.commit(); 116 sessions.remove(getTxKey(sessionId, username)); 117 } 118 119 @Override 120 public void refresh(final String sessionId, final String username) { 121 final FedoraSession session = getSession(sessionId, username); 122 session.updateExpiry(operationTimeout()); 123 } 124 125 @Override 126 public void abort(final String sessionId, final String username) { 127 final FedoraSession session = getSession(sessionId, username); 128 session.expire(); 129 sessions.remove(getTxKey(sessionId, username)); 130 } 131 132 private static String getTxKey(final String sessionId, final String username) { 133 return nullToEmpty(username) + ":" + sessionId; 134 } 135}