001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.fcrepo.http.api; 020 021import static org.slf4j.LoggerFactory.getLogger; 022 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.locks.Lock; 028import java.util.concurrent.locks.ReadWriteLock; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import javax.jcr.Session; 032 033import org.fcrepo.kernel.api.exception.InterruptedRuntimeException; 034import org.fcrepo.kernel.api.services.NodeService; 035import org.slf4j.Logger; 036import org.springframework.stereotype.Component; 037 038import com.google.common.annotations.VisibleForTesting; 039 040/** 041 * A class that serves as a pool lockable paths to guarantee synchronized 042 * accesses to the resources at those paths. Because there may be an 043 * extremely high number of paths in the repository, this implementation 044 * is complicated by a need to ensure that for a given path, only one set 045 * of locks exists and only until all the locks have been acquired and released. 046 * 047 * Because this is very complex code, extensive logging is produced at 048 * the TRACE level. 049 * 050 * @author Mike Durbin 051 */ 052 053@Component 054public class DefaultPathLockManager implements PathLockManager { 055 056 private static final Logger LOGGER = getLogger(DefaultPathLockManager.class); 057 058 /** 059 * A map of all the paths for which requests to lock are underway. This 060 * is an exhaustive set of all paths that may be accessed at this time. 061 * Changes to the contents of this map must be synchronized on the instance 062 * of this class, though blocking acquisition of locks against those paths 063 * must NOT be, lest we degrade to an essentially single-threaded 064 * application. 065 */ 066 @VisibleForTesting 067 Map<String, ActivePath> activePaths = new HashMap<>(); 068 069 /** 070 * A list of paths for which delete operations are underway. Attempts to 071 * acquire locks on resources that would be deleted with those paths will 072 * block until the delete lock is released. This is a handy shortcut that 073 * allows this class to meet the locking requirements for delete operations 074 * without actually discovering (and locking) all the descendant nodes. 075 */ 076 @VisibleForTesting 077 List<String> activeDeletePaths = new ArrayList<>(); 078 079 /** 080 * A class that represents a path that can be locked for reading or writing 081 * and is the subject of a currently active lock request (though locks may 082 * not necessarily have been granted yet). 083 */ 084 private class ActivePath { 085 086 private String path; 087 088 private ReadWriteLock rwLock; 089 090 /** 091 * A list with references to every thread that has requested 092 * (though not necessary acquired) a read or write lock through 093 * the mechanism of the outer class' contract and not yet 094 * relinquished it by invoking LockManager.release(). This 095 * list is actually managed by methods outside of ActivePath. 096 */ 097 private List<Thread> threads; 098 099 private ActivePath(final String path) { 100 this.path = path; 101 rwLock = new ReentrantReadWriteLock(); 102 threads = new ArrayList<>(); 103 } 104 105 public PathScopedLock getReadLock() { 106 threads.add(Thread.currentThread()); 107 LOGGER.trace("Thread {} requesting read lock on {}.", Thread.currentThread().getId(), path); 108 return new PathScopedLock(rwLock.readLock()); 109 } 110 111 public PathScopedLock getWriteLock() { 112 threads.add(Thread.currentThread()); 113 LOGGER.trace("Thread {} requesting write lock on {}.", Thread.currentThread().getId(), path); 114 return new PathScopedLock(rwLock.writeLock()); 115 } 116 117 /** 118 * Wraps a lock (read or write) to expose a subset of its 119 * functionality and to add special handling for our "delete" 120 * locks. 121 */ 122 private class PathScopedLock { 123 124 final private Lock lock; 125 126 public PathScopedLock(final Lock l) { 127 lock = l; 128 } 129 130 public ActivePath getPath() { 131 return ActivePath.this; 132 } 133 134 public boolean tryLock() { 135 for (final String deletePath : activeDeletePaths) { 136 if (isOrIsDescendantOf(path, deletePath)) { 137 LOGGER.trace("Thread {} could not be granted lock on {} because that path is being deleted.", 138 Thread.currentThread().getId(), path); 139 return false; 140 } 141 } 142 return lock.tryLock(); 143 } 144 145 public void unlock() { 146 lock.unlock(); 147 } 148 } 149 } 150 151 /** 152 * The AcquiredLock implementation that's returned by the surrounding class. 153 * Two main constructors exist, one that accepts a bunch of locks, all of which 154 * must be acquired and a second representing a "delete" lock, for which at 155 * acquisition time the locks necessary are determined from the current pool of 156 * active locks. 157 * 158 * Never, outside of a block of code synchronized with the surrounding class 159 * instance, does this class hold an incomplete subset of the locks required: 160 * in other words, it gets all of the locks or none of them, never blocking 161 * while holding locks. 162 */ 163 private class AcquiredMultiPathLock implements AcquiredLock { 164 165 private String deletePath; 166 167 private List<ActivePath.PathScopedLock> locks; 168 169 /** 170 * Instantiates and initializes an AcquiredMultiPathLock. This 171 * constructor blocks until all of the necessary locks have been 172 * acquired, but to avoid possible deadlocks releases all acquired 173 * locks when it fails to acquire even one of them. 174 * @param locks each PathLock that must be acquired 175 * @throws InterruptedException 176 */ 177 private AcquiredMultiPathLock(final List<ActivePath.PathScopedLock> locks) throws InterruptedException { 178 this.locks = locks; 179 180 boolean success = false; 181 while (!success) { 182 synchronized (DefaultPathLockManager.this) { 183 success = tryAcquireAll(); 184 if (!success) { 185 LOGGER.debug("Failed to acquire all necessary path locks: waiting. (Thread {})", 186 Thread.currentThread().getId()); 187 DefaultPathLockManager.this.wait(); 188 } 189 } 190 191 } 192 LOGGER.debug("Acquired all necessary path locks (Thread {})", Thread.currentThread().getId()); 193 194 } 195 196 /** 197 * Instantiates and initializes an AcquiredMultiPathLock that requires 198 * locks on the given path and all active paths that are descendants of 199 * the given path. This constructor blocks until all of the necessary 200 * locks have been acquired, but to avoid possible deadlocks releases 201 * all acquired locks when it fails to acquire even one of them. 202 * @param deletePath the path for which all descendant paths must also be 203 * write locked. 204 * @throws InterruptedException 205 */ 206 private AcquiredMultiPathLock(final String deletePath) throws InterruptedException { 207 this.deletePath = deletePath; 208 209 boolean success = false; 210 while (!success) { 211 synchronized (DefaultPathLockManager.this) { 212 this.locks = new ArrayList<>(); 213 214 // find all paths to lock 215 activePaths.forEach((path, lock) -> { 216 if (isOrIsDescendantOf(path, deletePath)) { 217 locks.add(lock.getWriteLock()); 218 } 219 }); 220 221 success = tryAcquireAll(); 222 if (!success) { 223 LOGGER.debug("Failed to acquire all necessary path locks: waiting. (Thread {})", 224 Thread.currentThread().getId()); 225 DefaultPathLockManager.this.wait(); 226 } else { 227 // So, we have acquired locks on every currently active path that is 228 // the target of the DELETE operation or its ancestor... but what if 229 // one is added once we fall out of this synchronized block? 230 // 231 // ...well, in that case we set a special note that this path is being 232 // deleted so that whenever a new lock is attempted on a to-be-deleted 233 // path, those locks fail to acquire. 234 LOGGER.trace("Thread {} acquired delete lock on path {}.", 235 Thread.currentThread().getId(), deletePath); 236 activeDeletePaths.add(deletePath); 237 } 238 } 239 240 } 241 LOGGER.debug("Acquired all necessary path locks (Thread {})", Thread.currentThread().getId()); 242 243 } 244 245 private boolean tryAcquireAll() { 246 final List<ActivePath.PathScopedLock> acquired = new ArrayList<>(); 247 for (final ActivePath.PathScopedLock lock : locks) { 248 if (lock.tryLock()) { 249 acquired.add(lock); 250 } else { 251 // roll back 252 acquired.forEach(ActivePath.PathScopedLock::unlock); 253 return false; 254 } 255 } 256 return true; 257 } 258 259 /* 260 * This is the only method that removes paths from the pool 261 * of currently active paths that can be locked. 262 */ 263 @Override 264 public void release() { 265 synchronized (DefaultPathLockManager.this) { 266 for (final ActivePath.PathScopedLock lock : locks) { 267 lock.unlock(); 268 lock.getPath().threads.remove(Thread.currentThread()); 269 if (lock.getPath().threads.isEmpty()) { 270 activePaths.remove(lock.getPath().path); 271 } 272 } 273 if (deletePath != null) { 274 LOGGER.trace("Thread {} releasing delete lock on path {}.", 275 Thread.currentThread().getId(), deletePath); 276 activeDeletePaths.remove(deletePath); 277 } 278 LOGGER.trace("Thread {} released locks.", Thread.currentThread().getId()); 279 DefaultPathLockManager.this.notify(); 280 } 281 } 282 283 } 284 285 /* 286 * This is the only method that adds paths to the pool of 287 * currently active paths that can be locked. 288 */ 289 private synchronized ActivePath getActivePath(final String path) { 290 ActivePath activePath = activePaths.get(path); 291 if (activePath == null) { 292 activePath = new ActivePath(path); 293 activePaths.put(path, activePath); 294 } 295 return activePath; 296 } 297 298 private boolean isOrIsDescendantOf(final String possibleDescendant, final String path) { 299 return path.equals(possibleDescendant) || possibleDescendant.startsWith(path + "/"); 300 } 301 302 private String getParentPath(final String path) { 303 if (path.indexOf('/') == -1) { 304 return null; 305 } 306 return path.substring(0, path.lastIndexOf('/')); 307 } 308 309 @VisibleForTesting 310 static String normalizePath(final String path) { 311 if (path.endsWith("/")) { 312 return path.substring(0, path.length() - 1); 313 } else { 314 return path; 315 } 316 } 317 318 @Override 319 public AcquiredLock lockForRead(final String path) { 320 final List<ActivePath.PathScopedLock> locks = new ArrayList<>(); 321 322 synchronized (this) { 323 locks.add(getActivePath(normalizePath(path)).getReadLock()); 324 } 325 326 try { 327 return new AcquiredMultiPathLock(locks); 328 } catch (InterruptedException e) { 329 throw new InterruptedRuntimeException(e); 330 } 331 } 332 333 @Override 334 public AcquiredLock lockForWrite(final String path, final Session session, final NodeService nodeService) { 335 final List<ActivePath.PathScopedLock> locks = new ArrayList<>(); 336 337 synchronized (this) { 338 // lock the specified path while iterating through the path's 339 // ancestry to also lock each path that would be created implicitly 340 // by this write (ie, non-existent ancestral paths) 341 final String startingPath = normalizePath(path); 342 for (String currentPath = startingPath ; 343 currentPath == null || currentPath.length() > 0; 344 currentPath = getParentPath(currentPath)) { 345 if (currentPath == null || (currentPath != startingPath && nodeService.exists(session, currentPath))) { 346 // either we've followed the path back to the root, or we've found an ancestor that exists... 347 // so there are no more locks to create. 348 break; 349 } 350 locks.add(getActivePath(currentPath).getWriteLock()); 351 } 352 } 353 354 try { 355 return new AcquiredMultiPathLock(locks); 356 } catch (InterruptedException e) { 357 throw new InterruptedRuntimeException(e); 358 } 359 } 360 361 @Override 362 public AcquiredLock lockForDelete(final String path) { 363 try { 364 return new AcquiredMultiPathLock(normalizePath(path)); 365 } catch (InterruptedException e) { 366 throw new InterruptedRuntimeException(e); 367 } 368 } 369}