001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.fcrepo.http.api;
020
021import static org.slf4j.LoggerFactory.getLogger;
022
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.locks.Lock;
028import java.util.concurrent.locks.ReadWriteLock;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import javax.jcr.Session;
032
033import org.fcrepo.kernel.api.exception.InterruptedRuntimeException;
034import org.fcrepo.kernel.api.services.NodeService;
035import org.slf4j.Logger;
036import org.springframework.stereotype.Component;
037
038import com.google.common.annotations.VisibleForTesting;
039
040/**
041 * A class that serves as a pool lockable paths to guarantee synchronized
042 * accesses to the resources at those paths.  Because there may be an
043 * extremely high number of paths in the repository, this implementation
044 * is complicated by a need to ensure that for a given path, only one set
045 * of locks exists and only until all the locks have been acquired and released.
046 *
047 * Because this is very complex code, extensive logging is produced at
048 * the TRACE level.
049 *
050 * @author Mike Durbin
051 */
052
053@Component
054public class DefaultPathLockManager implements PathLockManager {
055
056    private static final Logger LOGGER = getLogger(DefaultPathLockManager.class);
057
058    /**
059     * A map of all the paths for which requests to lock are underway.  This
060     * is an exhaustive set of all paths that may be accessed at this time.
061     * Changes to the contents of this map must be synchronized on the instance
062     * of this class, though blocking acquisition of locks against those paths
063     * must NOT be, lest we degrade to an essentially single-threaded
064     * application.
065     */
066    @VisibleForTesting
067    Map<String, ActivePath> activePaths = new HashMap<>();
068
069    /**
070     * A list of paths for which delete operations are underway.  Attempts to
071     * acquire locks on resources that would be deleted with those paths will
072     * block until the delete lock is released.  This is a handy shortcut that
073     * allows this class to meet the locking requirements for delete operations
074     * without actually discovering (and locking) all the descendant nodes.
075     */
076    @VisibleForTesting
077    List<String> activeDeletePaths = new ArrayList<>();
078
079    /**
080     * A class that represents a path that can be locked for reading or writing
081     * and is the subject of a currently active lock request (though locks may
082     * not necessarily have been granted yet).
083     */
084    private class ActivePath {
085
086        private String path;
087
088        private ReadWriteLock rwLock;
089
090        /**
091         * A list with references to every thread that has requested
092         * (though not necessary acquired) a read or write lock through
093         * the mechanism of the outer class' contract and not yet
094         * relinquished it by invoking LockManager.release().  This
095         * list is actually managed by methods outside of ActivePath.
096         */
097        private List<Thread> threads;
098
099        private ActivePath(final String path) {
100            this.path = path;
101            rwLock = new ReentrantReadWriteLock();
102            threads = new ArrayList<>();
103        }
104
105        public PathScopedLock getReadLock() {
106            threads.add(Thread.currentThread());
107            LOGGER.trace("Thread {} requesting read lock on {}.", Thread.currentThread().getId(), path);
108            return new PathScopedLock(rwLock.readLock());
109        }
110
111        public PathScopedLock getWriteLock() {
112            threads.add(Thread.currentThread());
113            LOGGER.trace("Thread {} requesting write lock on {}.", Thread.currentThread().getId(), path);
114            return new PathScopedLock(rwLock.writeLock());
115        }
116
117        /**
118         * Wraps a lock (read or write) to expose a subset of its
119         * functionality and to add special handling for our "delete"
120         * locks.
121         */
122        private class PathScopedLock {
123
124            final private Lock lock;
125
126            public PathScopedLock(final Lock l) {
127                lock = l;
128            }
129
130            public ActivePath getPath() {
131                return ActivePath.this;
132            }
133
134            public boolean tryLock() {
135                for (final String deletePath : activeDeletePaths) {
136                    if (isOrIsDescendantOf(path, deletePath)) {
137                        LOGGER.trace("Thread {} could not be granted lock on {} because that path is being deleted.",
138                                Thread.currentThread().getId(), path);
139                        return false;
140                    }
141                }
142                return lock.tryLock();
143            }
144
145            public void unlock() {
146                lock.unlock();
147            }
148        }
149    }
150
151    /**
152     * The AcquiredLock implementation that's returned by the surrounding class.
153     * Two main constructors exist, one that accepts a bunch of locks, all of which
154     * must be acquired and a second representing a "delete" lock, for which at
155     * acquisition time the locks necessary are determined from the current pool of
156     * active locks.
157     *
158     * Never, outside of a block of code synchronized with the surrounding class
159     * instance, does this class hold an incomplete subset of the locks required:
160     * in other words, it gets all of the locks or none of them, never blocking
161     * while holding locks.
162     */
163    private class AcquiredMultiPathLock implements AcquiredLock {
164
165        private String deletePath;
166
167        private List<ActivePath.PathScopedLock> locks;
168
169        /**
170         * Instantiates and initializes an AcquiredMultiPathLock.  This
171         * constructor blocks until all of the necessary locks have been
172         * acquired, but to avoid possible deadlocks releases all acquired
173         * locks when it fails to acquire even one of them.
174         * @param locks each PathLock that must be acquired
175         * @throws InterruptedException
176         */
177        private AcquiredMultiPathLock(final List<ActivePath.PathScopedLock> locks) throws InterruptedException {
178            this.locks = locks;
179
180            boolean success = false;
181            while (!success) {
182                synchronized (DefaultPathLockManager.this) {
183                    success = tryAcquireAll();
184                    if (!success) {
185                        LOGGER.debug("Failed to acquire all necessary path locks: waiting.  (Thread {})",
186                                Thread.currentThread().getId());
187                        DefaultPathLockManager.this.wait();
188                    }
189                }
190
191            }
192            LOGGER.debug("Acquired all necessary path locks  (Thread {})", Thread.currentThread().getId());
193
194        }
195
196        /**
197         * Instantiates and initializes an AcquiredMultiPathLock that requires
198         * locks on the given path and all active paths that are descendants of
199         * the given path.  This constructor blocks until all of the necessary
200         * locks have been acquired, but to avoid possible deadlocks releases
201         * all acquired locks when it fails to acquire even one of them.
202         * @param deletePath the path for which all descendant paths must also be
203         *        write locked.
204         * @throws InterruptedException
205         */
206        private AcquiredMultiPathLock(final String deletePath) throws InterruptedException {
207            this.deletePath = deletePath;
208
209            boolean success = false;
210            while (!success) {
211                synchronized (DefaultPathLockManager.this) {
212                    this.locks = new ArrayList<>();
213
214                    // find all paths to lock
215                    activePaths.forEach((path, lock) -> {
216                        if (isOrIsDescendantOf(path, deletePath)) {
217                            locks.add(lock.getWriteLock());
218                        }
219                        });
220
221                    success = tryAcquireAll();
222                    if (!success) {
223                        LOGGER.debug("Failed to acquire all necessary path locks: waiting.  (Thread {})",
224                                Thread.currentThread().getId());
225                        DefaultPathLockManager.this.wait();
226                    } else {
227                        // So, we have acquired locks on every currently active path that is
228                        // the target of the DELETE operation or its ancestor... but what if
229                        // one is added once we fall out of this synchronized block?
230                        //
231                        // ...well, in that case we set a special note that this path is being
232                        // deleted so that whenever a new lock is attempted on a to-be-deleted
233                        // path, those locks fail to acquire.
234                        LOGGER.trace("Thread {} acquired delete lock on path {}.",
235                                Thread.currentThread().getId(), deletePath);
236                        activeDeletePaths.add(deletePath);
237                    }
238                }
239
240            }
241            LOGGER.debug("Acquired all necessary path locks  (Thread {})", Thread.currentThread().getId());
242
243        }
244
245        private boolean tryAcquireAll() {
246            final List<ActivePath.PathScopedLock> acquired = new ArrayList<>();
247            for (final ActivePath.PathScopedLock lock : locks) {
248                if (lock.tryLock()) {
249                    acquired.add(lock);
250                } else {
251                    // roll back
252                    acquired.forEach(ActivePath.PathScopedLock::unlock);
253                    return false;
254                }
255            }
256            return true;
257        }
258
259        /*
260         * This is the only method that removes paths from the pool
261         * of currently active paths that can be locked.
262         */
263        @Override
264        public void release() {
265            synchronized (DefaultPathLockManager.this) {
266                for (final ActivePath.PathScopedLock lock : locks) {
267                    lock.unlock();
268                    lock.getPath().threads.remove(Thread.currentThread());
269                    if (lock.getPath().threads.isEmpty()) {
270                        activePaths.remove(lock.getPath().path);
271                    }
272                }
273                if (deletePath != null) {
274                    LOGGER.trace("Thread {} releasing delete lock on path {}.",
275                            Thread.currentThread().getId(), deletePath);
276                    activeDeletePaths.remove(deletePath);
277                }
278                LOGGER.trace("Thread {} released locks.", Thread.currentThread().getId());
279                DefaultPathLockManager.this.notify();
280            }
281        }
282
283    }
284
285    /*
286     * This is the only method that adds paths to the pool of
287     * currently active paths that can be locked.
288     */
289    private synchronized ActivePath getActivePath(final String path) {
290        ActivePath activePath = activePaths.get(path);
291        if (activePath == null) {
292            activePath = new ActivePath(path);
293            activePaths.put(path, activePath);
294        }
295        return activePath;
296    }
297
298    private boolean isOrIsDescendantOf(final String possibleDescendant, final String path) {
299        return path.equals(possibleDescendant) || possibleDescendant.startsWith(path + "/");
300    }
301
302    private String getParentPath(final String path) {
303        if (path.indexOf('/') == -1) {
304            return null;
305        }
306        return path.substring(0, path.lastIndexOf('/'));
307    }
308
309    @VisibleForTesting
310    static String normalizePath(final String path) {
311        if (path.endsWith("/")) {
312            return path.substring(0, path.length() - 1);
313        } else {
314            return path;
315        }
316    }
317
318    @Override
319    public AcquiredLock lockForRead(final String path) {
320        final List<ActivePath.PathScopedLock> locks = new ArrayList<>();
321
322        synchronized (this) {
323            locks.add(getActivePath(normalizePath(path)).getReadLock());
324        }
325
326        try {
327            return new AcquiredMultiPathLock(locks);
328        } catch (InterruptedException e) {
329            throw new InterruptedRuntimeException(e);
330        }
331    }
332
333    @Override
334    public AcquiredLock lockForWrite(final String path, final Session session, final NodeService nodeService) {
335        final List<ActivePath.PathScopedLock> locks = new ArrayList<>();
336
337        synchronized (this) {
338            // lock the specified path while iterating through the path's
339            // ancestry to also lock each path that would be created implicitly
340            // by this write (ie, non-existent ancestral paths)
341            final String startingPath = normalizePath(path);
342            for (String currentPath = startingPath ;
343                    currentPath == null || currentPath.length() > 0;
344                    currentPath = getParentPath(currentPath)) {
345                if (currentPath == null || (currentPath != startingPath && nodeService.exists(session, currentPath))) {
346                    // either we've followed the path back to the root, or we've found an ancestor that exists...
347                    // so there are no more locks to create.
348                    break;
349                }
350                locks.add(getActivePath(currentPath).getWriteLock());
351            }
352        }
353
354        try {
355            return new AcquiredMultiPathLock(locks);
356        } catch (InterruptedException e) {
357            throw new InterruptedRuntimeException(e);
358        }
359    }
360
361    @Override
362    public AcquiredLock lockForDelete(final String path) {
363        try {
364            return new AcquiredMultiPathLock(normalizePath(path));
365        } catch (InterruptedException e) {
366            throw new InterruptedRuntimeException(e);
367        }
368    }
369}