001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.fcrepo.kernel.modeshape.services;
020
021import static java.time.Instant.now;
022import static java.util.stream.Collectors.toSet;
023import static com.google.common.base.Strings.nullToEmpty;
024import static org.fcrepo.kernel.modeshape.FedoraSessionImpl.operationTimeout;
025import static org.slf4j.LoggerFactory.getLogger;
026
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030
031import org.fcrepo.kernel.api.FedoraSession;
032import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
033import org.fcrepo.kernel.api.exception.SessionMissingException;
034import org.fcrepo.kernel.api.services.BatchService;
035
036import com.google.common.annotations.VisibleForTesting;
037import org.slf4j.Logger;
038import org.springframework.scheduling.annotation.Scheduled;
039import org.springframework.stereotype.Component;
040
041/**
042 * This is part of the strawman implementation for Fedora batch operations This
043 * service implements a simple {@link FedoraSession} service which is able to
044 * create/commit/rollback {@link FedoraSession} objects. A {@link Scheduled}
045 * annotation is used for removing timed out operations
046 *
047 * @author frank asseg
048 * @author ajs6f
049 * @author acoburn
050 */
051@Component
052public class BatchServiceImpl extends AbstractService implements BatchService {
053
054    private static final Logger LOGGER = getLogger(BatchServiceImpl.class);
055
056    /**
057     * TODO since sessions have to be available on all nodes, they have to
058     * be either persisted or written to a distributed map or sth, not just this
059     * plain hashmap that follows
060     */
061    private static final Map<String, FedoraSession> sessions = new ConcurrentHashMap<>();
062
063    @VisibleForTesting
064    public static final long REAP_INTERVAL = 1000;
065
066    /**
067     * Every REAP_INTERVAL milliseconds, check for expired sessions. If the
068     * tx is expired, roll it back and remove it from the registry.
069     */
070    @Override
071    @Scheduled(fixedRate = REAP_INTERVAL)
072    public void removeExpired() {
073        final Set<String> reapable = sessions.entrySet().stream()
074                .filter(e -> e.getValue().getExpires().isPresent())
075                .filter(e -> e.getValue().getExpires().get().isBefore(now()))
076                .map(Map.Entry::getKey).collect(toSet());
077        reapable.forEach(key -> {
078            final FedoraSession s = sessions.get(key);
079            if (s != null) {
080                try {
081                    s.expire();
082                } catch (final RepositoryRuntimeException e) {
083                    LOGGER.error("Got exception rolling back expired session {}: {}", s, e.getMessage());
084                }
085            }
086            sessions.remove(key);
087        });
088    }
089
090    @Override
091    public void begin(final FedoraSession session, final String username) {
092        sessions.put(getTxKey(session.getId(), username), session);
093    }
094
095    @Override
096    public FedoraSession getSession(final String sessionId, final String username) {
097        final FedoraSession session = sessions.get(getTxKey(sessionId, username));
098        if (session == null) {
099            throw new SessionMissingException("Batch session with id: " + sessionId + " is not available");
100        }
101        return session;
102    }
103
104    @Override
105    public boolean exists(final String sessionId, final String username) {
106        return sessions.containsKey(getTxKey(sessionId, username));
107    }
108
109    @Override
110    public void commit(final String sessionId, final String username) {
111        final FedoraSession session = getSession(sessionId, username);
112        session.commit();
113        sessions.remove(getTxKey(sessionId, username));
114    }
115
116    @Override
117    public void refresh(final String sessionId, final String username) {
118        final FedoraSession session = getSession(sessionId, username);
119        session.updateExpiry(operationTimeout());
120    }
121
122    @Override
123    public void abort(final String sessionId, final String username) {
124        final FedoraSession session = getSession(sessionId, username);
125        session.expire();
126        sessions.remove(getTxKey(sessionId, username));
127    }
128
129    private static String getTxKey(final String sessionId, final String username) {
130        return nullToEmpty(username) + ":" + sessionId;
131    }
132}