001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.fcrepo.http.api;
020
021import static org.slf4j.LoggerFactory.getLogger;
022
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.locks.Lock;
028import java.util.concurrent.locks.ReadWriteLock;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import org.fcrepo.kernel.api.FedoraSession;
032import org.fcrepo.kernel.api.exception.InterruptedRuntimeException;
033import org.fcrepo.kernel.api.services.NodeService;
034import org.slf4j.Logger;
035import org.springframework.stereotype.Component;
036
037import com.google.common.annotations.VisibleForTesting;
038
039/**
040 * A class that serves as a pool lockable paths to guarantee synchronized
041 * accesses to the resources at those paths.  Because there may be an
042 * extremely high number of paths in the repository, this implementation
043 * is complicated by a need to ensure that for a given path, only one set
044 * of locks exists and only until all the locks have been acquired and released.
045 *
046 * Because this is very complex code, extensive logging is produced at
047 * the TRACE level.
048 *
049 * @author Mike Durbin
050 */
051
052@Component
053public class DefaultPathLockManager implements PathLockManager {
054
055    private static final Logger LOGGER = getLogger(DefaultPathLockManager.class);
056
057    /**
058     * A map of all the paths for which requests to lock are underway.  This
059     * is an exhaustive set of all paths that may be accessed at this time.
060     * Changes to the contents of this map must be synchronized on the instance
061     * of this class, though blocking acquisition of locks against those paths
062     * must NOT be, lest we degrade to an essentially single-threaded
063     * application.
064     */
065    @VisibleForTesting
066    Map<String, ActivePath> activePaths = new HashMap<>();
067
068    /**
069     * A list of paths for which delete operations are underway.  Attempts to
070     * acquire locks on resources that would be deleted with those paths will
071     * block until the delete lock is released.  This is a handy shortcut that
072     * allows this class to meet the locking requirements for delete operations
073     * without actually discovering (and locking) all the descendant nodes.
074     */
075    @VisibleForTesting
076    List<String> activeDeletePaths = new ArrayList<>();
077
078    /**
079     * A class that represents a path that can be locked for reading or writing
080     * and is the subject of a currently active lock request (though locks may
081     * not necessarily have been granted yet).
082     */
083    private class ActivePath {
084
085        private String path;
086
087        private ReadWriteLock rwLock;
088
089        /**
090         * A list with references to every thread that has requested
091         * (though not necessary acquired) a read or write lock through
092         * the mechanism of the outer class' contract and not yet
093         * relinquished it by invoking LockManager.release().  This
094         * list is actually managed by methods outside of ActivePath.
095         */
096        private List<Thread> threads;
097
098        private ActivePath(final String path) {
099            this.path = path;
100            rwLock = new ReentrantReadWriteLock();
101            threads = new ArrayList<>();
102        }
103
104        public PathScopedLock getReadLock() {
105            threads.add(Thread.currentThread());
106            LOGGER.trace("Thread {} requesting read lock on {}.", Thread.currentThread().getId(), path);
107            return new PathScopedLock(rwLock.readLock());
108        }
109
110        public PathScopedLock getWriteLock() {
111            threads.add(Thread.currentThread());
112            LOGGER.trace("Thread {} requesting write lock on {}.", Thread.currentThread().getId(), path);
113            return new PathScopedLock(rwLock.writeLock());
114        }
115
116        /**
117         * Wraps a lock (read or write) to expose a subset of its
118         * functionality and to add special handling for our "delete"
119         * locks.
120         */
121        private class PathScopedLock {
122
123            final private Lock lock;
124
125            public PathScopedLock(final Lock l) {
126                lock = l;
127            }
128
129            public ActivePath getPath() {
130                return ActivePath.this;
131            }
132
133            public boolean tryLock() {
134                for (final String deletePath : activeDeletePaths) {
135                    if (isOrIsDescendantOf(path, deletePath)) {
136                        LOGGER.trace("Thread {} could not be granted lock on {} because that path is being deleted.",
137                                Thread.currentThread().getId(), path);
138                        return false;
139                    }
140                }
141                return lock.tryLock();
142            }
143
144            public void unlock() {
145                lock.unlock();
146            }
147        }
148    }
149
150    /**
151     * The AcquiredLock implementation that's returned by the surrounding class.
152     * Two main constructors exist, one that accepts a bunch of locks, all of which
153     * must be acquired and a second representing a "delete" lock, for which at
154     * acquisition time the locks necessary are determined from the current pool of
155     * active locks.
156     *
157     * Never, outside of a block of code synchronized with the surrounding class
158     * instance, does this class hold an incomplete subset of the locks required:
159     * in other words, it gets all of the locks or none of them, never blocking
160     * while holding locks.
161     */
162    private class AcquiredMultiPathLock implements AcquiredLock {
163
164        private String deletePath;
165
166        private List<ActivePath.PathScopedLock> locks;
167
168        /**
169         * Instantiates and initializes an AcquiredMultiPathLock.  This
170         * constructor blocks until all of the necessary locks have been
171         * acquired, but to avoid possible deadlocks releases all acquired
172         * locks when it fails to acquire even one of them.
173         * @param locks each PathLock that must be acquired
174         * @throws InterruptedException
175         */
176        private AcquiredMultiPathLock(final List<ActivePath.PathScopedLock> locks) throws InterruptedException {
177            this.locks = locks;
178
179            boolean success = false;
180            while (!success) {
181                synchronized (DefaultPathLockManager.this) {
182                    success = tryAcquireAll();
183                    if (!success) {
184                        LOGGER.debug("Failed to acquire all necessary path locks: waiting.  (Thread {})",
185                                Thread.currentThread().getId());
186                        DefaultPathLockManager.this.wait();
187                    }
188                }
189
190            }
191            LOGGER.debug("Acquired all necessary path locks  (Thread {})", Thread.currentThread().getId());
192
193        }
194
195        /**
196         * Instantiates and initializes an AcquiredMultiPathLock that requires
197         * locks on the given path and all active paths that are descendants of
198         * the given path.  This constructor blocks until all of the necessary
199         * locks have been acquired, but to avoid possible deadlocks releases
200         * all acquired locks when it fails to acquire even one of them.
201         * @param deletePath the path for which all descendant paths must also be
202         *        write locked.
203         * @throws InterruptedException
204         */
205        private AcquiredMultiPathLock(final String deletePath) throws InterruptedException {
206            this.deletePath = deletePath;
207
208            boolean success = false;
209            while (!success) {
210                synchronized (DefaultPathLockManager.this) {
211                    this.locks = new ArrayList<>();
212
213                    // find all paths to lock
214                    activePaths.forEach((path, lock) -> {
215                        if (isOrIsDescendantOf(path, deletePath)) {
216                            locks.add(lock.getWriteLock());
217                        }
218                        });
219
220                    success = tryAcquireAll();
221                    if (!success) {
222                        LOGGER.debug("Failed to acquire all necessary path locks: waiting.  (Thread {})",
223                                Thread.currentThread().getId());
224                        DefaultPathLockManager.this.wait();
225                    } else {
226                        // So, we have acquired locks on every currently active path that is
227                        // the target of the DELETE operation or its ancestor... but what if
228                        // one is added once we fall out of this synchronized block?
229                        //
230                        // ...well, in that case we set a special note that this path is being
231                        // deleted so that whenever a new lock is attempted on a to-be-deleted
232                        // path, those locks fail to acquire.
233                        LOGGER.trace("Thread {} acquired delete lock on path {}.",
234                                Thread.currentThread().getId(), deletePath);
235                        activeDeletePaths.add(deletePath);
236                    }
237                }
238
239            }
240            LOGGER.debug("Acquired all necessary path locks  (Thread {})", Thread.currentThread().getId());
241
242        }
243
244        private boolean tryAcquireAll() {
245            final List<ActivePath.PathScopedLock> acquired = new ArrayList<>();
246            for (final ActivePath.PathScopedLock lock : locks) {
247                if (lock.tryLock()) {
248                    acquired.add(lock);
249                } else {
250                    // roll back
251                    acquired.forEach(ActivePath.PathScopedLock::unlock);
252                    return false;
253                }
254            }
255            return true;
256        }
257
258        /*
259         * This is the only method that removes paths from the pool
260         * of currently active paths that can be locked.
261         */
262        @Override
263        public void release() {
264            synchronized (DefaultPathLockManager.this) {
265                for (final ActivePath.PathScopedLock lock : locks) {
266                    lock.unlock();
267                    lock.getPath().threads.remove(Thread.currentThread());
268                    if (lock.getPath().threads.isEmpty()) {
269                        activePaths.remove(lock.getPath().path);
270                    }
271                }
272                if (deletePath != null) {
273                    LOGGER.trace("Thread {} releasing delete lock on path {}.",
274                            Thread.currentThread().getId(), deletePath);
275                    activeDeletePaths.remove(deletePath);
276                }
277                LOGGER.trace("Thread {} released locks.", Thread.currentThread().getId());
278                DefaultPathLockManager.this.notify();
279            }
280        }
281
282    }
283
284    /*
285     * This is the only method that adds paths to the pool of
286     * currently active paths that can be locked.
287     */
288    private synchronized ActivePath getActivePath(final String path) {
289        ActivePath activePath = activePaths.get(path);
290        if (activePath == null) {
291            activePath = new ActivePath(path);
292            activePaths.put(path, activePath);
293        }
294        return activePath;
295    }
296
297    private boolean isOrIsDescendantOf(final String possibleDescendant, final String path) {
298        return path.equals(possibleDescendant) || possibleDescendant.startsWith(path + "/");
299    }
300
301    private String getParentPath(final String path) {
302        if (path.indexOf('/') == -1) {
303            return null;
304        }
305        return path.substring(0, path.lastIndexOf('/'));
306    }
307
308    @VisibleForTesting
309    static String normalizePath(final String path) {
310        if (path.endsWith("/")) {
311            return path.substring(0, path.length() - 1);
312        } else {
313            return path;
314        }
315    }
316
317    @Override
318    public AcquiredLock lockForRead(final String path) {
319        final List<ActivePath.PathScopedLock> locks = new ArrayList<>();
320
321        synchronized (this) {
322            locks.add(getActivePath(normalizePath(path)).getReadLock());
323        }
324
325        try {
326            return new AcquiredMultiPathLock(locks);
327        } catch (InterruptedException e) {
328            throw new InterruptedRuntimeException(e);
329        }
330    }
331
332    @Override
333    public AcquiredLock lockForWrite(final String path, final FedoraSession session, final NodeService nodeService) {
334        final List<ActivePath.PathScopedLock> locks = new ArrayList<>();
335
336        synchronized (this) {
337            // lock the specified path while iterating through the path's
338            // ancestry to also lock each path that would be created implicitly
339            // by this write (ie, non-existent ancestral paths)
340            final String startingPath = normalizePath(path);
341            for (String currentPath = startingPath ;
342                    currentPath == null || currentPath.length() > 0;
343                    currentPath = getParentPath(currentPath)) {
344                if (currentPath == null || (currentPath != startingPath && nodeService.exists(session, currentPath))) {
345                    // either we've followed the path back to the root, or we've found an ancestor that exists...
346                    // so there are no more locks to create.
347                    break;
348                }
349                locks.add(getActivePath(currentPath).getWriteLock());
350            }
351        }
352
353        try {
354            return new AcquiredMultiPathLock(locks);
355        } catch (InterruptedException e) {
356            throw new InterruptedRuntimeException(e);
357        }
358    }
359
360    @Override
361    public AcquiredLock lockForDelete(final String path) {
362        try {
363            return new AcquiredMultiPathLock(normalizePath(path));
364        } catch (InterruptedException e) {
365            throw new InterruptedRuntimeException(e);
366        }
367    }
368}