001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.persistence.ocfl.impl;
019
020import static org.slf4j.LoggerFactory.getLogger;
021
022import java.time.Duration;
023import java.time.Instant;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.stream.Stream;
030
031import org.fcrepo.common.db.DbTransactionExecutor;
032import org.fcrepo.config.OcflPropsConfig;
033import org.fcrepo.kernel.api.Transaction;
034import org.fcrepo.kernel.api.TransactionManager;
035
036import org.slf4j.Logger;
037
038/**
039 * Class to coordinate the index rebuilding tasks.
040 * @author whikloj
041 * @since 6.0.0
042 */
043public class ReindexManager {
044
045    private static final Logger LOGGER = getLogger(ReindexManager.class);
046
047    private static final long REPORTING_INTERVAL_SECS = 300;
048
049    private final List<ReindexWorker> workers;
050
051    private final Iterator<String> ocflIter;
052
053    private final Stream<String> ocflStream;
054
055    private final AtomicInteger completedCount;
056
057    private final AtomicInteger errorCount;
058
059    private final ReindexService reindexService;
060
061    private final long batchSize;
062
063    private final boolean failOnError;
064
065    private TransactionManager txManager;
066
067    private DbTransactionExecutor dbTransactionExecutor;
068
069    private Transaction transaction = null;
070
071    /**
072     * Basic constructor
073     * @param ids stream of ocfl ids.
074     * @param reindexService the reindexing service.
075     * @param config OCFL property config object.
076     * @param manager the transaction manager object.
077     * @param dbTransactionExecutor manages db transactions
078     */
079    public ReindexManager(final Stream<String> ids,
080                          final ReindexService reindexService,
081                          final OcflPropsConfig config,
082                          final TransactionManager manager,
083                          final DbTransactionExecutor dbTransactionExecutor) {
084        this.ocflStream = ids;
085        this.ocflIter = ocflStream.iterator();
086        this.reindexService = reindexService;
087        this.batchSize = config.getReindexBatchSize();
088        this.failOnError = config.isReindexFailOnError();
089        txManager = manager;
090        this.dbTransactionExecutor = dbTransactionExecutor;
091        workers = new ArrayList<>();
092        completedCount = new AtomicInteger(0);
093        errorCount = new AtomicInteger(0);
094
095        final var workerCount = config.getReindexingThreads();
096
097        if (workerCount < 1) {
098            throw new IllegalStateException(String.format("Reindexing requires at least 1 thread. Found: %s",
099                    workerCount));
100        }
101
102        for (var i = 0; i < workerCount; i += 1) {
103            workers.add(new ReindexWorker("ReindexWorker-" + i, this,
104                    this.reindexService, txManager, this.dbTransactionExecutor, this.failOnError));
105        }
106    }
107
108    /**
109     * Start reindexing.
110     * @throws InterruptedException on an indexing error in a thread.
111     */
112    public void start() throws InterruptedException {
113        final var reporter = startReporter();
114        try {
115            workers.forEach(ReindexWorker::start);
116            for (final var worker : workers) {
117                worker.join();
118            }
119            indexMembership();
120        } catch (final Exception e) {
121            LOGGER.error("Error while rebuilding index", e);
122            stop();
123            throw e;
124        } finally {
125            reporter.interrupt();
126        }
127    }
128
129    /**
130     * Stop all threads.
131     */
132    public void stop() {
133        LOGGER.debug("Stop worker threads");
134        workers.forEach(ReindexWorker::stopThread);
135    }
136
137    /**
138     * Return a batch of OCFL ids to reindex.
139     * @return list of OCFL ids.
140     */
141    public synchronized List<String> getIds() {
142        int counter = 0;
143        final List<String> ids = new ArrayList<>((int) batchSize);
144        while (ocflIter.hasNext() && counter < batchSize) {
145            ids.add(ocflIter.next());
146            counter += 1;
147        }
148        return ids;
149    }
150
151    /**
152     * Update the master list of reindexing states.
153     * @param batchSuccessful how many items were completed successfully in the last batch.
154     * @param batchErrors how many items had an error in the last batch.
155     */
156    public void updateComplete(final int batchSuccessful, final int batchErrors) {
157        completedCount.addAndGet(batchSuccessful);
158        errorCount.addAndGet(batchErrors);
159    }
160
161    /**
162     * @return the count of items that completed successfully.
163     */
164    public int getCompletedCount() {
165        return completedCount.get();
166    }
167
168    /**
169     * @return the count of items that had errors.
170     */
171    public int getErrorCount() {
172        return errorCount.get();
173    }
174
175    /**
176     * Index the membership relationships
177     */
178    private void indexMembership() {
179        final var tx = transaction();
180        LOGGER.info("Starting membership indexing");
181        reindexService.indexMembership(tx);
182        tx.commit();
183        LOGGER.debug("Completed membership indexing");
184    }
185
186    /**
187     * Close stream.
188     */
189    public void shutdown() {
190        ocflStream.close();
191    }
192
193    private Thread startReporter() {
194        final var reporter = new Thread(() -> {
195            final var startTime = Instant.now();
196            try {
197                while (true) {
198                    TimeUnit.SECONDS.sleep(REPORTING_INTERVAL_SECS);
199                    final var complete = completedCount.get();
200                    final var errored = errorCount.get();
201                    final var now = Instant.now();
202                    final var duration = Duration.between(startTime, now);
203                    LOGGER.info("Index rebuild progress: Complete: {}; Errored: {}; Time: {}; Rate: {}/s",
204                            complete, errored, getDurationMessage(duration),
205                            (complete + errored) / duration.getSeconds());
206                }
207            } catch (final InterruptedException e) {
208                // processing has completed exit normally
209            }
210        });
211
212        reporter.start();
213        return reporter;
214    }
215
216    private String getDurationMessage(final Duration duration) {
217        String message = String.format("%d secs", duration.toSecondsPart());
218        if (duration.getSeconds() > 60) {
219            message = String.format("%d mins, ", duration.toMinutesPart()) + message;
220        }
221        if (duration.getSeconds() > 3600) {
222            message = String.format("%d hours, ", duration.getSeconds() / 3600) + message;
223        }
224        return message;
225    }
226
227    private Transaction transaction() {
228        if (transaction == null) {
229            transaction = txManager.create();
230            transaction.setShortLived(true);
231        }
232        return transaction;
233    }
234}