001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.kernel.impl.lock; 019 020import com.github.benmanes.caffeine.cache.Caffeine; 021import com.google.common.collect.Sets; 022import org.fcrepo.kernel.api.exception.ConcurrentUpdateException; 023import org.fcrepo.kernel.api.identifiers.FedoraId; 024import org.fcrepo.kernel.api.lock.ResourceLockManager; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027import org.springframework.stereotype.Component; 028 029import java.util.Map; 030import java.util.Set; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.TimeUnit; 033 034/** 035 * In memory resource lock manager 036 * 037 * @author pwinckles 038 */ 039@Component 040public class InMemoryResourceLockManager implements ResourceLockManager { 041 042 private static final Logger LOG = LoggerFactory.getLogger(InMemoryResourceLockManager.class); 043 044 private final Map<String, Set<String>> transactionLocks; 045 private final Set<String> lockedResources; 046 private final Map<String, Object> internalResourceLocks; 047 048 public InMemoryResourceLockManager() { 049 transactionLocks = new ConcurrentHashMap<>(); 050 lockedResources = Sets.newConcurrentHashSet(); 051 internalResourceLocks = Caffeine.newBuilder() 052 .expireAfterAccess(10, TimeUnit.MINUTES) 053 .<String, Object>build() 054 .asMap(); 055 } 056 057 @Override 058 public void acquire(final String txId, final FedoraId resourceId) { 059 final var resourceIdStr = resourceId.getResourceId(); 060 061 if (transactionHoldsLock(txId, resourceIdStr)) { 062 return; 063 } 064 065 synchronized (acquireInternalLock(resourceIdStr)) { 066 if (transactionHoldsLock(txId, resourceIdStr)) { 067 return; 068 } 069 070 if (lockedResources.contains(resourceIdStr)) { 071 throw new ConcurrentUpdateException( 072 String.format("Cannot update %s because it is being updated by another transaction.", 073 resourceIdStr)); 074 } 075 076 LOG.debug("Transaction {} acquiring lock on {}", txId, resourceIdStr); 077 078 lockedResources.add(resourceIdStr); 079 transactionLocks.computeIfAbsent(txId, key -> Sets.newConcurrentHashSet()) 080 .add(resourceIdStr); 081 } 082 } 083 084 @Override 085 public void releaseAll(final String txId) { 086 final var locks = transactionLocks.remove(txId); 087 if (locks != null) { 088 locks.forEach(resourceId -> { 089 LOG.debug("Transaction {} releasing lock on {}", txId, resourceId); 090 synchronized (acquireInternalLock(resourceId)) { 091 lockedResources.remove(resourceId); 092 internalResourceLocks.remove(resourceId); 093 } 094 }); 095 } 096 } 097 098 private Object acquireInternalLock(final String resourceId) { 099 return internalResourceLocks.computeIfAbsent(resourceId, key -> new Object()); 100 } 101 102 private boolean transactionHoldsLock(final String txId, final String resourceId) { 103 final var locks = transactionLocks.get(txId); 104 return locks != null && locks.contains(resourceId); 105 } 106 107}