001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.fcrepo.http.api; 020 021import static org.slf4j.LoggerFactory.getLogger; 022 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.locks.Lock; 028import java.util.concurrent.locks.ReadWriteLock; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.fcrepo.kernel.api.FedoraSession; 032import org.fcrepo.kernel.api.exception.InterruptedRuntimeException; 033import org.fcrepo.kernel.api.services.NodeService; 034import org.slf4j.Logger; 035import org.springframework.stereotype.Component; 036 037import com.google.common.annotations.VisibleForTesting; 038 039/** 040 * A class that serves as a pool lockable paths to guarantee synchronized 041 * accesses to the resources at those paths. Because there may be an 042 * extremely high number of paths in the repository, this implementation 043 * is complicated by a need to ensure that for a given path, only one set 044 * of locks exists and only until all the locks have been acquired and released. 045 * 046 * Because this is very complex code, extensive logging is produced at 047 * the TRACE level. 048 * 049 * @author Mike Durbin 050 */ 051 052@Component 053public class DefaultPathLockManager implements PathLockManager { 054 055 private static final Logger LOGGER = getLogger(DefaultPathLockManager.class); 056 057 /** 058 * A map of all the paths for which requests to lock are underway. This 059 * is an exhaustive set of all paths that may be accessed at this time. 060 * Changes to the contents of this map must be synchronized on the instance 061 * of this class, though blocking acquisition of locks against those paths 062 * must NOT be, lest we degrade to an essentially single-threaded 063 * application. 064 */ 065 @VisibleForTesting 066 Map<String, ActivePath> activePaths = new HashMap<>(); 067 068 /** 069 * A list of paths for which delete operations are underway. Attempts to 070 * acquire locks on resources that would be deleted with those paths will 071 * block until the delete lock is released. This is a handy shortcut that 072 * allows this class to meet the locking requirements for delete operations 073 * without actually discovering (and locking) all the descendant nodes. 074 */ 075 @VisibleForTesting 076 List<String> activeDeletePaths = new ArrayList<>(); 077 078 /** 079 * A class that represents a path that can be locked for reading or writing 080 * and is the subject of a currently active lock request (though locks may 081 * not necessarily have been granted yet). 082 */ 083 private class ActivePath { 084 085 private String path; 086 087 private ReadWriteLock rwLock; 088 089 /** 090 * A list with references to every thread that has requested 091 * (though not necessary acquired) a read or write lock through 092 * the mechanism of the outer class' contract and not yet 093 * relinquished it by invoking LockManager.release(). This 094 * list is actually managed by methods outside of ActivePath. 095 */ 096 private List<Thread> threads; 097 098 private ActivePath(final String path) { 099 this.path = path; 100 rwLock = new ReentrantReadWriteLock(); 101 threads = new ArrayList<>(); 102 } 103 104 public PathScopedLock getReadLock() { 105 threads.add(Thread.currentThread()); 106 LOGGER.trace("Thread {} requesting read lock on {}.", Thread.currentThread().getId(), path); 107 return new PathScopedLock(rwLock.readLock()); 108 } 109 110 public PathScopedLock getWriteLock() { 111 threads.add(Thread.currentThread()); 112 LOGGER.trace("Thread {} requesting write lock on {}.", Thread.currentThread().getId(), path); 113 return new PathScopedLock(rwLock.writeLock()); 114 } 115 116 /** 117 * Wraps a lock (read or write) to expose a subset of its 118 * functionality and to add special handling for our "delete" 119 * locks. 120 */ 121 private class PathScopedLock { 122 123 final private Lock lock; 124 125 public PathScopedLock(final Lock l) { 126 lock = l; 127 } 128 129 public ActivePath getPath() { 130 return ActivePath.this; 131 } 132 133 public boolean tryLock() { 134 for (final String deletePath : activeDeletePaths) { 135 if (isOrIsDescendantOf(path, deletePath)) { 136 LOGGER.trace("Thread {} could not be granted lock on {} because that path is being deleted.", 137 Thread.currentThread().getId(), path); 138 return false; 139 } 140 } 141 return lock.tryLock(); 142 } 143 144 public void unlock() { 145 lock.unlock(); 146 } 147 } 148 } 149 150 /** 151 * The AcquiredLock implementation that's returned by the surrounding class. 152 * Two main constructors exist, one that accepts a bunch of locks, all of which 153 * must be acquired and a second representing a "delete" lock, for which at 154 * acquisition time the locks necessary are determined from the current pool of 155 * active locks. 156 * 157 * Never, outside of a block of code synchronized with the surrounding class 158 * instance, does this class hold an incomplete subset of the locks required: 159 * in other words, it gets all of the locks or none of them, never blocking 160 * while holding locks. 161 */ 162 private class AcquiredMultiPathLock implements AcquiredLock { 163 164 private String deletePath; 165 166 private List<ActivePath.PathScopedLock> locks; 167 168 /** 169 * Instantiates and initializes an AcquiredMultiPathLock. This 170 * constructor blocks until all of the necessary locks have been 171 * acquired, but to avoid possible deadlocks releases all acquired 172 * locks when it fails to acquire even one of them. 173 * @param locks each PathLock that must be acquired 174 * @throws InterruptedException 175 */ 176 private AcquiredMultiPathLock(final List<ActivePath.PathScopedLock> locks) throws InterruptedException { 177 this.locks = locks; 178 179 boolean success = false; 180 while (!success) { 181 synchronized (DefaultPathLockManager.this) { 182 success = tryAcquireAll(); 183 if (!success) { 184 LOGGER.debug("Failed to acquire all necessary path locks: waiting. (Thread {})", 185 Thread.currentThread().getId()); 186 DefaultPathLockManager.this.wait(); 187 } 188 } 189 190 } 191 LOGGER.debug("Acquired all necessary path locks (Thread {})", Thread.currentThread().getId()); 192 193 } 194 195 /** 196 * Instantiates and initializes an AcquiredMultiPathLock that requires 197 * locks on the given path and all active paths that are descendants of 198 * the given path. This constructor blocks until all of the necessary 199 * locks have been acquired, but to avoid possible deadlocks releases 200 * all acquired locks when it fails to acquire even one of them. 201 * @param deletePath the path for which all descendant paths must also be 202 * write locked. 203 * @throws InterruptedException 204 */ 205 private AcquiredMultiPathLock(final String deletePath) throws InterruptedException { 206 this.deletePath = deletePath; 207 208 boolean success = false; 209 while (!success) { 210 synchronized (DefaultPathLockManager.this) { 211 this.locks = new ArrayList<>(); 212 213 // find all paths to lock 214 activePaths.forEach((path, lock) -> { 215 if (isOrIsDescendantOf(path, deletePath)) { 216 locks.add(lock.getWriteLock()); 217 } 218 }); 219 220 success = tryAcquireAll(); 221 if (!success) { 222 LOGGER.debug("Failed to acquire all necessary path locks: waiting. (Thread {})", 223 Thread.currentThread().getId()); 224 DefaultPathLockManager.this.wait(); 225 } else { 226 // So, we have acquired locks on every currently active path that is 227 // the target of the DELETE operation or its ancestor... but what if 228 // one is added once we fall out of this synchronized block? 229 // 230 // ...well, in that case we set a special note that this path is being 231 // deleted so that whenever a new lock is attempted on a to-be-deleted 232 // path, those locks fail to acquire. 233 LOGGER.trace("Thread {} acquired delete lock on path {}.", 234 Thread.currentThread().getId(), deletePath); 235 activeDeletePaths.add(deletePath); 236 } 237 } 238 239 } 240 LOGGER.debug("Acquired all necessary path locks (Thread {})", Thread.currentThread().getId()); 241 242 } 243 244 private boolean tryAcquireAll() { 245 final List<ActivePath.PathScopedLock> acquired = new ArrayList<>(); 246 for (final ActivePath.PathScopedLock lock : locks) { 247 if (lock.tryLock()) { 248 acquired.add(lock); 249 } else { 250 // roll back 251 acquired.forEach(ActivePath.PathScopedLock::unlock); 252 return false; 253 } 254 } 255 return true; 256 } 257 258 /* 259 * This is the only method that removes paths from the pool 260 * of currently active paths that can be locked. 261 */ 262 @Override 263 public void release() { 264 synchronized (DefaultPathLockManager.this) { 265 for (final ActivePath.PathScopedLock lock : locks) { 266 lock.unlock(); 267 lock.getPath().threads.remove(Thread.currentThread()); 268 if (lock.getPath().threads.isEmpty()) { 269 activePaths.remove(lock.getPath().path); 270 } 271 } 272 if (deletePath != null) { 273 LOGGER.trace("Thread {} releasing delete lock on path {}.", 274 Thread.currentThread().getId(), deletePath); 275 activeDeletePaths.remove(deletePath); 276 } 277 LOGGER.trace("Thread {} released locks.", Thread.currentThread().getId()); 278 DefaultPathLockManager.this.notify(); 279 } 280 } 281 282 } 283 284 /* 285 * This is the only method that adds paths to the pool of 286 * currently active paths that can be locked. 287 */ 288 private synchronized ActivePath getActivePath(final String path) { 289 ActivePath activePath = activePaths.get(path); 290 if (activePath == null) { 291 activePath = new ActivePath(path); 292 activePaths.put(path, activePath); 293 } 294 return activePath; 295 } 296 297 private boolean isOrIsDescendantOf(final String possibleDescendant, final String path) { 298 return path.equals(possibleDescendant) || possibleDescendant.startsWith(path + "/"); 299 } 300 301 private String getParentPath(final String path) { 302 if (path.indexOf('/') == -1) { 303 return null; 304 } 305 return path.substring(0, path.lastIndexOf('/')); 306 } 307 308 @VisibleForTesting 309 static String normalizePath(final String path) { 310 if (path.endsWith("/")) { 311 return path.substring(0, path.length() - 1); 312 } else { 313 return path; 314 } 315 } 316 317 @Override 318 public AcquiredLock lockForRead(final String path) { 319 final List<ActivePath.PathScopedLock> locks = new ArrayList<>(); 320 321 synchronized (this) { 322 locks.add(getActivePath(normalizePath(path)).getReadLock()); 323 } 324 325 try { 326 return new AcquiredMultiPathLock(locks); 327 } catch (InterruptedException e) { 328 throw new InterruptedRuntimeException(e); 329 } 330 } 331 332 @Override 333 public AcquiredLock lockForWrite(final String path, final FedoraSession session, final NodeService nodeService) { 334 final List<ActivePath.PathScopedLock> locks = new ArrayList<>(); 335 336 synchronized (this) { 337 // lock the specified path while iterating through the path's 338 // ancestry to also lock each path that would be created implicitly 339 // by this write (ie, non-existent ancestral paths) 340 final String startingPath = normalizePath(path); 341 for (String currentPath = startingPath ; 342 currentPath == null || currentPath.length() > 0; 343 currentPath = getParentPath(currentPath)) { 344 if (currentPath == null || (currentPath != startingPath && nodeService.exists(session, currentPath))) { 345 // either we've followed the path back to the root, or we've found an ancestor that exists... 346 // so there are no more locks to create. 347 break; 348 } 349 locks.add(getActivePath(currentPath).getWriteLock()); 350 } 351 } 352 353 try { 354 return new AcquiredMultiPathLock(locks); 355 } catch (InterruptedException e) { 356 throw new InterruptedRuntimeException(e); 357 } 358 } 359 360 @Override 361 public AcquiredLock lockForDelete(final String path) { 362 try { 363 return new AcquiredMultiPathLock(normalizePath(path)); 364 } catch (InterruptedException e) { 365 throw new InterruptedRuntimeException(e); 366 } 367 } 368}