001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018/**
019 *
020 */
021
022package org.fcrepo.kernel.modeshape.services;
023
024import static java.time.Instant.now;
025import static java.util.stream.Collectors.toSet;
026import static com.google.common.base.Strings.nullToEmpty;
027import static org.fcrepo.kernel.modeshape.FedoraSessionImpl.operationTimeout;
028import static org.slf4j.LoggerFactory.getLogger;
029
030import java.util.Map;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033
034import org.fcrepo.kernel.api.FedoraSession;
035import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
036import org.fcrepo.kernel.api.exception.SessionMissingException;
037import org.fcrepo.kernel.api.services.BatchService;
038
039import com.google.common.annotations.VisibleForTesting;
040import org.slf4j.Logger;
041import org.springframework.scheduling.annotation.Scheduled;
042import org.springframework.stereotype.Component;
043
044/**
045 * This is part of the strawman implementation for Fedora batch operations This
046 * service implements a simple {@link FedoraSession} service which is able to
047 * create/commit/rollback {@link FedoraSession} objects. A {@link Scheduled}
048 * annotation is used for removing timed out operations
049 *
050 * @author frank asseg
051 * @author ajs6f
052 * @author acoburn
053 */
054@Component
055public class BatchServiceImpl extends AbstractService implements BatchService {
056
057    private static final Logger LOGGER = getLogger(BatchServiceImpl.class);
058
059    /**
060     * TODO since sessions have to be available on all nodes, they have to
061     * be either persisted or written to a distributed map or sth, not just this
062     * plain hashmap that follows
063     */
064    private static Map<String, FedoraSession> sessions = new ConcurrentHashMap<>();
065
066    @VisibleForTesting
067    public static final long REAP_INTERVAL = 1000;
068
069    /**
070     * Every REAP_INTERVAL milliseconds, check for expired sessions. If the
071     * tx is expired, roll it back and remove it from the registry.
072     */
073    @Override
074    @Scheduled(fixedRate = REAP_INTERVAL)
075    public void removeExpired() {
076        final Set<String> reapable = sessions.entrySet().stream()
077                .filter(e -> e.getValue().getExpires().isPresent())
078                .filter(e -> e.getValue().getExpires().get().isBefore(now()))
079                .map(Map.Entry::getKey).collect(toSet());
080        reapable.forEach(key -> {
081            final FedoraSession s = sessions.get(key);
082            if (s != null) {
083                try {
084                    s.expire();
085                } catch (final RepositoryRuntimeException e) {
086                    LOGGER.error("Got exception rolling back expired session {}: {}", s, e.getMessage());
087                }
088            }
089            sessions.remove(key);
090        });
091    }
092
093    @Override
094    public void begin(final FedoraSession session, final String username) {
095        sessions.put(getTxKey(session.getId(), username), session);
096    }
097
098    @Override
099    public FedoraSession getSession(final String sessionId, final String username) {
100        final FedoraSession session = sessions.get(getTxKey(sessionId, username));
101        if (session == null) {
102            throw new SessionMissingException("Batch session with id: " + sessionId + " is not available");
103        }
104        return session;
105    }
106
107    @Override
108    public boolean exists(final String sessionId, final String username) {
109        return sessions.containsKey(getTxKey(sessionId, username));
110    }
111
112    @Override
113    public void commit(final String sessionId, final String username) {
114        final FedoraSession session = getSession(sessionId, username);
115        session.commit();
116        sessions.remove(getTxKey(sessionId, username));
117    }
118
119    @Override
120    public void refresh(final String sessionId, final String username) {
121        final FedoraSession session = getSession(sessionId, username);
122        session.updateExpiry(operationTimeout());
123    }
124
125    @Override
126    public void abort(final String sessionId, final String username) {
127        final FedoraSession session = getSession(sessionId, username);
128        session.expire();
129        sessions.remove(getTxKey(sessionId, username));
130    }
131
132    private static String getTxKey(final String sessionId, final String username) {
133        return nullToEmpty(username) + ":" + sessionId;
134    }
135}