001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.fcrepo.http.api; 020 021import static org.slf4j.LoggerFactory.getLogger; 022 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Objects; 028import java.util.concurrent.locks.Lock; 029import java.util.concurrent.locks.ReadWriteLock; 030import java.util.concurrent.locks.ReentrantReadWriteLock; 031 032import org.fcrepo.kernel.api.FedoraSession; 033import org.fcrepo.kernel.api.exception.InterruptedRuntimeException; 034import org.fcrepo.kernel.api.services.NodeService; 035import org.slf4j.Logger; 036import org.springframework.stereotype.Component; 037 038import com.google.common.annotations.VisibleForTesting; 039 040/** 041 * A class that serves as a pool lockable paths to guarantee synchronized 042 * accesses to the resources at those paths. Because there may be an 043 * extremely high number of paths in the repository, this implementation 044 * is complicated by a need to ensure that for a given path, only one set 045 * of locks exists and only until all the locks have been acquired and released. 046 * 047 * Because this is very complex code, extensive logging is produced at 048 * the TRACE level. 049 * 050 * @author Mike Durbin 051 */ 052 053@Component 054public class DefaultPathLockManager implements PathLockManager { 055 056 private static final Logger LOGGER = getLogger(DefaultPathLockManager.class); 057 058 /** 059 * A map of all the paths for which requests to lock are underway. This 060 * is an exhaustive set of all paths that may be accessed at this time. 061 * Changes to the contents of this map must be synchronized on the instance 062 * of this class, though blocking acquisition of locks against those paths 063 * must NOT be, lest we degrade to an essentially single-threaded 064 * application. 065 */ 066 @VisibleForTesting 067 final 068 Map<String, ActivePath> activePaths = new HashMap<>(); 069 070 /** 071 * A list of paths for which delete operations are underway. Attempts to 072 * acquire locks on resources that would be deleted with those paths will 073 * block until the delete lock is released. This is a handy shortcut that 074 * allows this class to meet the locking requirements for delete operations 075 * without actually discovering (and locking) all the descendant nodes. 076 */ 077 @VisibleForTesting 078 final 079 List<String> activeDeletePaths = new ArrayList<>(); 080 081 /** 082 * A class that represents a path that can be locked for reading or writing 083 * and is the subject of a currently active lock request (though locks may 084 * not necessarily have been granted yet). 085 */ 086 private class ActivePath { 087 088 private final String path; 089 090 private final ReadWriteLock rwLock; 091 092 /** 093 * A list with references to every thread that has requested 094 * (though not necessary acquired) a read or write lock through 095 * the mechanism of the outer class' contract and not yet 096 * relinquished it by invoking LockManager.release(). This 097 * list is actually managed by methods outside of ActivePath. 098 */ 099 private final List<Thread> threads; 100 101 private ActivePath(final String path) { 102 this.path = path; 103 rwLock = new ReentrantReadWriteLock(); 104 threads = new ArrayList<>(); 105 } 106 107 public PathScopedLock getReadLock() { 108 threads.add(Thread.currentThread()); 109 LOGGER.trace("Thread {} requesting read lock on {}.", Thread.currentThread().getId(), path); 110 return new PathScopedLock(rwLock.readLock()); 111 } 112 113 public PathScopedLock getWriteLock() { 114 threads.add(Thread.currentThread()); 115 LOGGER.trace("Thread {} requesting write lock on {}.", Thread.currentThread().getId(), path); 116 return new PathScopedLock(rwLock.writeLock()); 117 } 118 119 /** 120 * Wraps a lock (read or write) to expose a subset of its 121 * functionality and to add special handling for our "delete" 122 * locks. 123 */ 124 private class PathScopedLock { 125 126 final private Lock lock; 127 128 public PathScopedLock(final Lock l) { 129 lock = l; 130 } 131 132 public ActivePath getPath() { 133 return ActivePath.this; 134 } 135 136 public boolean tryLock() { 137 for (final String deletePath : activeDeletePaths) { 138 if (isOrIsDescendantOf(path, deletePath)) { 139 LOGGER.trace("Thread {} could not be granted lock on {} because that path is being deleted.", 140 Thread.currentThread().getId(), path); 141 return false; 142 } 143 } 144 return lock.tryLock(); 145 } 146 147 public void unlock() { 148 lock.unlock(); 149 } 150 } 151 } 152 153 /** 154 * The AcquiredLock implementation that's returned by the surrounding class. 155 * Two main constructors exist, one that accepts a bunch of locks, all of which 156 * must be acquired and a second representing a "delete" lock, for which at 157 * acquisition time the locks necessary are determined from the current pool of 158 * active locks. 159 * 160 * Never, outside of a block of code synchronized with the surrounding class 161 * instance, does this class hold an incomplete subset of the locks required: 162 * in other words, it gets all of the locks or none of them, never blocking 163 * while holding locks. 164 */ 165 private class AcquiredMultiPathLock implements AcquiredLock { 166 167 private String deletePath; 168 169 private List<ActivePath.PathScopedLock> locks; 170 171 /** 172 * Instantiates and initializes an AcquiredMultiPathLock. This 173 * constructor blocks until all of the necessary locks have been 174 * acquired, but to avoid possible deadlocks releases all acquired 175 * locks when it fails to acquire even one of them. 176 * @param locks each PathLock that must be acquired 177 * @throws InterruptedException on error 178 */ 179 private AcquiredMultiPathLock(final List<ActivePath.PathScopedLock> locks) throws InterruptedException { 180 this.locks = locks; 181 182 boolean success = false; 183 while (!success) { 184 synchronized (DefaultPathLockManager.this) { 185 success = tryAcquireAll(); 186 if (!success) { 187 LOGGER.debug("Failed to acquire all necessary path locks: waiting. (Thread {})", 188 Thread.currentThread().getId()); 189 DefaultPathLockManager.this.wait(); 190 } 191 } 192 193 } 194 LOGGER.debug("Acquired all necessary path locks (Thread {})", Thread.currentThread().getId()); 195 196 } 197 198 /** 199 * Instantiates and initializes an AcquiredMultiPathLock that requires 200 * locks on the given path and all active paths that are descendants of 201 * the given path. This constructor blocks until all of the necessary 202 * locks have been acquired, but to avoid possible deadlocks releases 203 * all acquired locks when it fails to acquire even one of them. 204 * @param deletePath the path for which all descendant paths must also be 205 * write locked. 206 * @throws InterruptedException on error 207 */ 208 private AcquiredMultiPathLock(final String deletePath) throws InterruptedException { 209 this.deletePath = deletePath; 210 211 boolean success = false; 212 while (!success) { 213 synchronized (DefaultPathLockManager.this) { 214 this.locks = new ArrayList<>(); 215 216 // find all paths to lock 217 activePaths.forEach((path, lock) -> { 218 if (isOrIsDescendantOf(path, deletePath)) { 219 locks.add(lock.getWriteLock()); 220 } 221 }); 222 223 success = tryAcquireAll(); 224 if (!success) { 225 LOGGER.debug("Failed to acquire all necessary path locks: waiting. (Thread {})", 226 Thread.currentThread().getId()); 227 DefaultPathLockManager.this.wait(); 228 } else { 229 // So, we have acquired locks on every currently active path that is 230 // the target of the DELETE operation or its ancestor... but what if 231 // one is added once we fall out of this synchronized block? 232 // 233 // ...well, in that case we set a special note that this path is being 234 // deleted so that whenever a new lock is attempted on a to-be-deleted 235 // path, those locks fail to acquire. 236 LOGGER.trace("Thread {} acquired delete lock on path {}.", 237 Thread.currentThread().getId(), deletePath); 238 activeDeletePaths.add(deletePath); 239 } 240 } 241 242 } 243 LOGGER.debug("Acquired all necessary path locks (Thread {})", Thread.currentThread().getId()); 244 245 } 246 247 private boolean tryAcquireAll() { 248 final List<ActivePath.PathScopedLock> acquired = new ArrayList<>(); 249 for (final ActivePath.PathScopedLock lock : locks) { 250 if (lock.tryLock()) { 251 acquired.add(lock); 252 } else { 253 // roll back 254 acquired.forEach(ActivePath.PathScopedLock::unlock); 255 return false; 256 } 257 } 258 return true; 259 } 260 261 /* 262 * This is the only method that removes paths from the pool 263 * of currently active paths that can be locked. 264 */ 265 @Override 266 public void release() { 267 synchronized (DefaultPathLockManager.this) { 268 for (final ActivePath.PathScopedLock lock : locks) { 269 lock.unlock(); 270 lock.getPath().threads.remove(Thread.currentThread()); 271 if (lock.getPath().threads.isEmpty()) { 272 activePaths.remove(lock.getPath().path); 273 } 274 } 275 if (deletePath != null) { 276 LOGGER.trace("Thread {} releasing delete lock on path {}.", 277 Thread.currentThread().getId(), deletePath); 278 activeDeletePaths.remove(deletePath); 279 } 280 LOGGER.trace("Thread {} released locks.", Thread.currentThread().getId()); 281 DefaultPathLockManager.this.notify(); 282 } 283 } 284 285 } 286 287 /* 288 * This is the only method that adds paths to the pool of 289 * currently active paths that can be locked. 290 */ 291 private synchronized ActivePath getActivePath(final String path) { 292 ActivePath activePath = activePaths.get(path); 293 if (activePath == null) { 294 activePath = new ActivePath(path); 295 activePaths.put(path, activePath); 296 } 297 return activePath; 298 } 299 300 private boolean isOrIsDescendantOf(final String possibleDescendant, final String path) { 301 return path.equals(possibleDescendant) || possibleDescendant.startsWith(path + "/"); 302 } 303 304 private String getParentPath(final String path) { 305 if (path.indexOf('/') == -1) { 306 return null; 307 } 308 return path.substring(0, path.lastIndexOf('/')); 309 } 310 311 @VisibleForTesting 312 private static String normalizePath(final String path) { 313 if (path.endsWith("/")) { 314 return path.substring(0, path.length() - 1); 315 } else { 316 return path; 317 } 318 } 319 320 @Override 321 public AcquiredLock lockForRead(final String path) { 322 final List<ActivePath.PathScopedLock> locks = new ArrayList<>(); 323 324 synchronized (this) { 325 locks.add(getActivePath(normalizePath(path)).getReadLock()); 326 } 327 328 try { 329 return new AcquiredMultiPathLock(locks); 330 } catch (InterruptedException e) { 331 throw new InterruptedRuntimeException(e); 332 } 333 } 334 335 @Override 336 public AcquiredLock lockForWrite(final String path, final FedoraSession session, final NodeService nodeService) { 337 final List<ActivePath.PathScopedLock> locks = new ArrayList<>(); 338 339 synchronized (this) { 340 // lock the specified path while iterating through the path's 341 // ancestry to also lock each path that would be created implicitly 342 // by this write (ie, non-existent ancestral paths) 343 final String startingPath = normalizePath(path); 344 for (String currentPath = startingPath ; 345 currentPath == null || currentPath.length() > 0; 346 currentPath = getParentPath(currentPath)) { 347 if (currentPath == null || 348 (!Objects.equals(currentPath, startingPath) && nodeService.exists(session, currentPath))) { 349 // either we've followed the path back to the root, or we've found an ancestor that exists... 350 // so there are no more locks to create. 351 break; 352 } 353 locks.add(getActivePath(currentPath).getWriteLock()); 354 } 355 } 356 357 try { 358 return new AcquiredMultiPathLock(locks); 359 } catch (InterruptedException e) { 360 throw new InterruptedRuntimeException(e); 361 } 362 } 363 364 @Override 365 public AcquiredLock lockForDelete(final String path) { 366 try { 367 return new AcquiredMultiPathLock(normalizePath(path)); 368 } catch (InterruptedException e) { 369 throw new InterruptedRuntimeException(e); 370 } 371 } 372}