001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.fcrepo.http.api;
020
021import static org.slf4j.LoggerFactory.getLogger;
022
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Objects;
028import java.util.concurrent.locks.Lock;
029import java.util.concurrent.locks.ReadWriteLock;
030import java.util.concurrent.locks.ReentrantReadWriteLock;
031
032import org.fcrepo.kernel.api.FedoraSession;
033import org.fcrepo.kernel.api.exception.InterruptedRuntimeException;
034import org.fcrepo.kernel.api.services.NodeService;
035import org.slf4j.Logger;
036import org.springframework.stereotype.Component;
037
038import com.google.common.annotations.VisibleForTesting;
039
040/**
041 * A class that serves as a pool lockable paths to guarantee synchronized
042 * accesses to the resources at those paths.  Because there may be an
043 * extremely high number of paths in the repository, this implementation
044 * is complicated by a need to ensure that for a given path, only one set
045 * of locks exists and only until all the locks have been acquired and released.
046 *
047 * Because this is very complex code, extensive logging is produced at
048 * the TRACE level.
049 *
050 * @author Mike Durbin
051 */
052
053@Component
054public class DefaultPathLockManager implements PathLockManager {
055
056    private static final Logger LOGGER = getLogger(DefaultPathLockManager.class);
057
058    /**
059     * A map of all the paths for which requests to lock are underway.  This
060     * is an exhaustive set of all paths that may be accessed at this time.
061     * Changes to the contents of this map must be synchronized on the instance
062     * of this class, though blocking acquisition of locks against those paths
063     * must NOT be, lest we degrade to an essentially single-threaded
064     * application.
065     */
066    @VisibleForTesting
067    final
068    Map<String, ActivePath> activePaths = new HashMap<>();
069
070    /**
071     * A list of paths for which delete operations are underway.  Attempts to
072     * acquire locks on resources that would be deleted with those paths will
073     * block until the delete lock is released.  This is a handy shortcut that
074     * allows this class to meet the locking requirements for delete operations
075     * without actually discovering (and locking) all the descendant nodes.
076     */
077    @VisibleForTesting
078    final
079    List<String> activeDeletePaths = new ArrayList<>();
080
081    /**
082     * A class that represents a path that can be locked for reading or writing
083     * and is the subject of a currently active lock request (though locks may
084     * not necessarily have been granted yet).
085     */
086    private class ActivePath {
087
088        private final String path;
089
090        private final ReadWriteLock rwLock;
091
092        /**
093         * A list with references to every thread that has requested
094         * (though not necessary acquired) a read or write lock through
095         * the mechanism of the outer class' contract and not yet
096         * relinquished it by invoking LockManager.release().  This
097         * list is actually managed by methods outside of ActivePath.
098         */
099        private final List<Thread> threads;
100
101        private ActivePath(final String path) {
102            this.path = path;
103            rwLock = new ReentrantReadWriteLock();
104            threads = new ArrayList<>();
105        }
106
107        public PathScopedLock getReadLock() {
108            threads.add(Thread.currentThread());
109            LOGGER.trace("Thread {} requesting read lock on {}.", Thread.currentThread().getId(), path);
110            return new PathScopedLock(rwLock.readLock());
111        }
112
113        public PathScopedLock getWriteLock() {
114            threads.add(Thread.currentThread());
115            LOGGER.trace("Thread {} requesting write lock on {}.", Thread.currentThread().getId(), path);
116            return new PathScopedLock(rwLock.writeLock());
117        }
118
119        /**
120         * Wraps a lock (read or write) to expose a subset of its
121         * functionality and to add special handling for our "delete"
122         * locks.
123         */
124        private class PathScopedLock {
125
126            final private Lock lock;
127
128            public PathScopedLock(final Lock l) {
129                lock = l;
130            }
131
132            public ActivePath getPath() {
133                return ActivePath.this;
134            }
135
136            public boolean tryLock() {
137                for (final String deletePath : activeDeletePaths) {
138                    if (isOrIsDescendantOf(path, deletePath)) {
139                        LOGGER.trace("Thread {} could not be granted lock on {} because that path is being deleted.",
140                                Thread.currentThread().getId(), path);
141                        return false;
142                    }
143                }
144                return lock.tryLock();
145            }
146
147            public void unlock() {
148                lock.unlock();
149            }
150        }
151    }
152
153    /**
154     * The AcquiredLock implementation that's returned by the surrounding class.
155     * Two main constructors exist, one that accepts a bunch of locks, all of which
156     * must be acquired and a second representing a "delete" lock, for which at
157     * acquisition time the locks necessary are determined from the current pool of
158     * active locks.
159     *
160     * Never, outside of a block of code synchronized with the surrounding class
161     * instance, does this class hold an incomplete subset of the locks required:
162     * in other words, it gets all of the locks or none of them, never blocking
163     * while holding locks.
164     */
165    private class AcquiredMultiPathLock implements AcquiredLock {
166
167        private String deletePath;
168
169        private List<ActivePath.PathScopedLock> locks;
170
171        /**
172         * Instantiates and initializes an AcquiredMultiPathLock.  This
173         * constructor blocks until all of the necessary locks have been
174         * acquired, but to avoid possible deadlocks releases all acquired
175         * locks when it fails to acquire even one of them.
176         * @param locks each PathLock that must be acquired
177         * @throws InterruptedException on error
178         */
179        private AcquiredMultiPathLock(final List<ActivePath.PathScopedLock> locks) throws InterruptedException {
180            this.locks = locks;
181
182            boolean success = false;
183            while (!success) {
184                synchronized (DefaultPathLockManager.this) {
185                    success = tryAcquireAll();
186                    if (!success) {
187                        LOGGER.debug("Failed to acquire all necessary path locks: waiting.  (Thread {})",
188                                Thread.currentThread().getId());
189                        DefaultPathLockManager.this.wait();
190                    }
191                }
192
193            }
194            LOGGER.debug("Acquired all necessary path locks  (Thread {})", Thread.currentThread().getId());
195
196        }
197
198        /**
199         * Instantiates and initializes an AcquiredMultiPathLock that requires
200         * locks on the given path and all active paths that are descendants of
201         * the given path.  This constructor blocks until all of the necessary
202         * locks have been acquired, but to avoid possible deadlocks releases
203         * all acquired locks when it fails to acquire even one of them.
204         * @param deletePath the path for which all descendant paths must also be
205         *        write locked.
206         * @throws InterruptedException on error
207         */
208        private AcquiredMultiPathLock(final String deletePath) throws InterruptedException {
209            this.deletePath = deletePath;
210
211            boolean success = false;
212            while (!success) {
213                synchronized (DefaultPathLockManager.this) {
214                    this.locks = new ArrayList<>();
215
216                    // find all paths to lock
217                    activePaths.forEach((path, lock) -> {
218                        if (isOrIsDescendantOf(path, deletePath)) {
219                            locks.add(lock.getWriteLock());
220                        }
221                        });
222
223                    success = tryAcquireAll();
224                    if (!success) {
225                        LOGGER.debug("Failed to acquire all necessary path locks: waiting.  (Thread {})",
226                                Thread.currentThread().getId());
227                        DefaultPathLockManager.this.wait();
228                    } else {
229                        // So, we have acquired locks on every currently active path that is
230                        // the target of the DELETE operation or its ancestor... but what if
231                        // one is added once we fall out of this synchronized block?
232                        //
233                        // ...well, in that case we set a special note that this path is being
234                        // deleted so that whenever a new lock is attempted on a to-be-deleted
235                        // path, those locks fail to acquire.
236                        LOGGER.trace("Thread {} acquired delete lock on path {}.",
237                                Thread.currentThread().getId(), deletePath);
238                        activeDeletePaths.add(deletePath);
239                    }
240                }
241
242            }
243            LOGGER.debug("Acquired all necessary path locks  (Thread {})", Thread.currentThread().getId());
244
245        }
246
247        private boolean tryAcquireAll() {
248            final List<ActivePath.PathScopedLock> acquired = new ArrayList<>();
249            for (final ActivePath.PathScopedLock lock : locks) {
250                if (lock.tryLock()) {
251                    acquired.add(lock);
252                } else {
253                    // roll back
254                    acquired.forEach(ActivePath.PathScopedLock::unlock);
255                    return false;
256                }
257            }
258            return true;
259        }
260
261        /*
262         * This is the only method that removes paths from the pool
263         * of currently active paths that can be locked.
264         */
265        @Override
266        public void release() {
267            synchronized (DefaultPathLockManager.this) {
268                for (final ActivePath.PathScopedLock lock : locks) {
269                    lock.unlock();
270                    lock.getPath().threads.remove(Thread.currentThread());
271                    if (lock.getPath().threads.isEmpty()) {
272                        activePaths.remove(lock.getPath().path);
273                    }
274                }
275                if (deletePath != null) {
276                    LOGGER.trace("Thread {} releasing delete lock on path {}.",
277                            Thread.currentThread().getId(), deletePath);
278                    activeDeletePaths.remove(deletePath);
279                }
280                LOGGER.trace("Thread {} released locks.", Thread.currentThread().getId());
281                DefaultPathLockManager.this.notify();
282            }
283        }
284
285    }
286
287    /*
288     * This is the only method that adds paths to the pool of
289     * currently active paths that can be locked.
290     */
291    private synchronized ActivePath getActivePath(final String path) {
292        ActivePath activePath = activePaths.get(path);
293        if (activePath == null) {
294            activePath = new ActivePath(path);
295            activePaths.put(path, activePath);
296        }
297        return activePath;
298    }
299
300    private boolean isOrIsDescendantOf(final String possibleDescendant, final String path) {
301        return path.equals(possibleDescendant) || possibleDescendant.startsWith(path + "/");
302    }
303
304    private String getParentPath(final String path) {
305        if (path.indexOf('/') == -1) {
306            return null;
307        }
308        return path.substring(0, path.lastIndexOf('/'));
309    }
310
311    @VisibleForTesting
312    private static String normalizePath(final String path) {
313        if (path.endsWith("/")) {
314            return path.substring(0, path.length() - 1);
315        } else {
316            return path;
317        }
318    }
319
320    @Override
321    public AcquiredLock lockForRead(final String path) {
322        final List<ActivePath.PathScopedLock> locks = new ArrayList<>();
323
324        synchronized (this) {
325            locks.add(getActivePath(normalizePath(path)).getReadLock());
326        }
327
328        try {
329            return new AcquiredMultiPathLock(locks);
330        } catch (InterruptedException e) {
331            throw new InterruptedRuntimeException(e);
332        }
333    }
334
335    @Override
336    public AcquiredLock lockForWrite(final String path, final FedoraSession session, final NodeService nodeService) {
337        final List<ActivePath.PathScopedLock> locks = new ArrayList<>();
338
339        synchronized (this) {
340            // lock the specified path while iterating through the path's
341            // ancestry to also lock each path that would be created implicitly
342            // by this write (ie, non-existent ancestral paths)
343            final String startingPath = normalizePath(path);
344            for (String currentPath = startingPath ;
345                    currentPath == null || currentPath.length() > 0;
346                    currentPath = getParentPath(currentPath)) {
347                if (currentPath == null ||
348                        (!Objects.equals(currentPath, startingPath) && nodeService.exists(session, currentPath))) {
349                    // either we've followed the path back to the root, or we've found an ancestor that exists...
350                    // so there are no more locks to create.
351                    break;
352                }
353                locks.add(getActivePath(currentPath).getWriteLock());
354            }
355        }
356
357        try {
358            return new AcquiredMultiPathLock(locks);
359        } catch (InterruptedException e) {
360            throw new InterruptedRuntimeException(e);
361        }
362    }
363
364    @Override
365    public AcquiredLock lockForDelete(final String path) {
366        try {
367            return new AcquiredMultiPathLock(normalizePath(path));
368        } catch (InterruptedException e) {
369            throw new InterruptedRuntimeException(e);
370        }
371    }
372}