001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.persistence.ocfl.impl;
019
020import static org.slf4j.LoggerFactory.getLogger;
021
022import java.util.List;
023import java.util.concurrent.TimeUnit;
024
025import org.fcrepo.common.db.DbTransactionExecutor;
026import org.fcrepo.kernel.api.Transaction;
027import org.fcrepo.kernel.api.TransactionManager;
028
029import org.slf4j.Logger;
030
031import com.google.common.base.Stopwatch;
032
033/**
034 * A reindexing worker thread.
035 * @author whikloj
036 */
037public class ReindexWorker implements Runnable {
038
039    private static final Logger LOGGER = getLogger(ReindexWorker.class);
040
041    private static final long REPORTING_INTERVAL_SECS = 30;
042
043    private Thread t;
044    private ReindexManager manager;
045    private ReindexService service;
046    private boolean running = true;
047    private boolean failOnError;
048    private TransactionManager txManager;
049    private DbTransactionExecutor dbTransactionExecutor;
050
051    /**
052     * Basic Constructor
053     * @param name the name of the worker -- used in logging
054     * @param reindexManager the manager service.
055     * @param reindexService the reindexing service.
056     * @param transactionManager a transaction manager to generate
057     * @param dbTransactionExecutor manages db transactions
058     * @param failOnError whether the thread should fail on an error or log and continue.
059     */
060    public ReindexWorker(final String name,
061                         final ReindexManager reindexManager,
062                         final ReindexService reindexService,
063                         final TransactionManager transactionManager,
064                         final DbTransactionExecutor dbTransactionExecutor,
065                         final boolean failOnError) {
066        manager = reindexManager;
067        service = reindexService;
068        txManager = transactionManager;
069        this.dbTransactionExecutor = dbTransactionExecutor;
070        this.failOnError = failOnError;
071        t = new Thread(this, name);
072    }
073
074    /**
075     * Join the thread.
076     * @throws InterruptedException if the current thread is interrupted.
077     */
078    public void join() throws InterruptedException {
079        t.join();
080    }
081
082    /**
083     * Start the thread with this Runnable
084     */
085    public void start() {
086        t.start();
087    }
088
089    @Override
090    public void run() {
091        final var stopwatch = Stopwatch.createStarted();
092        while (running) {
093            final List<String> ids = manager.getIds();
094            if (ids.isEmpty()) {
095                LOGGER.debug("No more objects found to process. Stopping...");
096                stopThread();
097                break;
098            }
099
100            int completed = 0;
101            int errors = 0;
102
103            for (final var id : ids) {
104                if (!running) {
105                    break;
106                }
107
108                final Transaction tx = txManager.create();
109                tx.setShortLived(true);
110                if (stopwatch.elapsed(TimeUnit.SECONDS) > REPORTING_INTERVAL_SECS) {
111                    manager.updateComplete(completed, errors);
112                    completed = 0;
113                    errors = 0;
114                    stopwatch.reset().start();
115                }
116                try {
117                    dbTransactionExecutor.doInTxWithRetry(() -> {
118                        service.indexOcflObject(tx, id);
119                        tx.commit();
120                    });
121                    completed += 1;
122                } catch (final Exception e) {
123                    LOGGER.error("Reindexing of OCFL id {} failed", id, e);
124                    tx.rollback();
125                    errors += 1;
126                    if (failOnError) {
127                        manager.updateComplete(completed, errors);
128                        manager.stop();
129                        service.cleanupSession(tx.getId());
130                        throw e;
131                    }
132                }
133                service.cleanupSession(tx.getId());
134            }
135            manager.updateComplete(completed, errors);
136        }
137    }
138
139    /**
140     * Stop this thread from running once it has completed its current batch.
141     */
142    public void stopThread() {
143        this.running = false;
144    }
145
146}