001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.fcrepo.kernel.modeshape.services; 020 021import static java.time.Instant.now; 022import static java.util.stream.Collectors.toSet; 023import static com.google.common.base.Strings.nullToEmpty; 024import static org.fcrepo.kernel.modeshape.FedoraSessionImpl.operationTimeout; 025import static org.slf4j.LoggerFactory.getLogger; 026 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030 031import org.fcrepo.kernel.api.FedoraSession; 032import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 033import org.fcrepo.kernel.api.exception.SessionMissingException; 034import org.fcrepo.kernel.api.services.BatchService; 035 036import com.google.common.annotations.VisibleForTesting; 037import org.slf4j.Logger; 038import org.springframework.scheduling.annotation.Scheduled; 039import org.springframework.stereotype.Component; 040 041/** 042 * This is part of the strawman implementation for Fedora batch operations This 043 * service implements a simple {@link FedoraSession} service which is able to 044 * create/commit/rollback {@link FedoraSession} objects. A {@link Scheduled} 045 * annotation is used for removing timed out operations 046 * 047 * @author frank asseg 048 * @author ajs6f 049 * @author acoburn 050 */ 051@Component 052public class BatchServiceImpl extends AbstractService implements BatchService { 053 054 private static final Logger LOGGER = getLogger(BatchServiceImpl.class); 055 056 /** 057 * TODO since sessions have to be available on all nodes, they have to 058 * be either persisted or written to a distributed map or sth, not just this 059 * plain hashmap that follows 060 */ 061 private static final Map<String, FedoraSession> sessions = new ConcurrentHashMap<>(); 062 063 @VisibleForTesting 064 public static final long REAP_INTERVAL = 1000; 065 066 /** 067 * Every REAP_INTERVAL milliseconds, check for expired sessions. If the 068 * tx is expired, roll it back and remove it from the registry. 069 */ 070 @Override 071 @Scheduled(fixedRate = REAP_INTERVAL) 072 public void removeExpired() { 073 final Set<String> reapable = sessions.entrySet().stream() 074 .filter(e -> e.getValue().getExpires().isPresent()) 075 .filter(e -> e.getValue().getExpires().get().isBefore(now())) 076 .map(Map.Entry::getKey).collect(toSet()); 077 reapable.forEach(key -> { 078 final FedoraSession s = sessions.get(key); 079 if (s != null) { 080 try { 081 s.expire(); 082 } catch (final RepositoryRuntimeException e) { 083 LOGGER.error("Got exception rolling back expired session {}: {}", s, e.getMessage()); 084 } 085 } 086 sessions.remove(key); 087 }); 088 } 089 090 @Override 091 public void begin(final FedoraSession session, final String username) { 092 sessions.put(getTxKey(session.getId(), username), session); 093 } 094 095 @Override 096 public FedoraSession getSession(final String sessionId, final String username) { 097 final FedoraSession session = sessions.get(getTxKey(sessionId, username)); 098 if (session == null) { 099 throw new SessionMissingException("Batch session with id: " + sessionId + " is not available"); 100 } 101 return session; 102 } 103 104 @Override 105 public boolean exists(final String sessionId, final String username) { 106 return sessions.containsKey(getTxKey(sessionId, username)); 107 } 108 109 @Override 110 public void commit(final String sessionId, final String username) { 111 final FedoraSession session = getSession(sessionId, username); 112 session.commit(); 113 sessions.remove(getTxKey(sessionId, username)); 114 } 115 116 @Override 117 public void refresh(final String sessionId, final String username) { 118 final FedoraSession session = getSession(sessionId, username); 119 session.updateExpiry(operationTimeout()); 120 } 121 122 @Override 123 public void abort(final String sessionId, final String username) { 124 final FedoraSession session = getSession(sessionId, username); 125 session.expire(); 126 sessions.remove(getTxKey(sessionId, username)); 127 } 128 129 private static String getTxKey(final String sessionId, final String username) { 130 return nullToEmpty(username) + ":" + sessionId; 131 } 132}