001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.persistence.ocfl.impl; 019 020import static org.slf4j.LoggerFactory.getLogger; 021 022import java.time.Duration; 023import java.time.Instant; 024import java.util.ArrayList; 025import java.util.Iterator; 026import java.util.List; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.stream.Stream; 030 031import org.fcrepo.common.db.DbTransactionExecutor; 032import org.fcrepo.config.OcflPropsConfig; 033import org.fcrepo.kernel.api.Transaction; 034import org.fcrepo.kernel.api.TransactionManager; 035 036import org.slf4j.Logger; 037 038/** 039 * Class to coordinate the index rebuilding tasks. 040 * @author whikloj 041 * @since 6.0.0 042 */ 043public class ReindexManager { 044 045 private static final Logger LOGGER = getLogger(ReindexManager.class); 046 047 private static final long REPORTING_INTERVAL_SECS = 300; 048 049 private final List<ReindexWorker> workers; 050 051 private final Iterator<String> ocflIter; 052 053 private final Stream<String> ocflStream; 054 055 private final AtomicInteger completedCount; 056 057 private final AtomicInteger errorCount; 058 059 private final ReindexService reindexService; 060 061 private final long batchSize; 062 063 private final boolean failOnError; 064 065 private TransactionManager txManager; 066 067 private DbTransactionExecutor dbTransactionExecutor; 068 069 private Transaction transaction = null; 070 071 /** 072 * Basic constructor 073 * @param ids stream of ocfl ids. 074 * @param reindexService the reindexing service. 075 * @param config OCFL property config object. 076 * @param manager the transaction manager object. 077 * @param dbTransactionExecutor manages db transactions 078 */ 079 public ReindexManager(final Stream<String> ids, 080 final ReindexService reindexService, 081 final OcflPropsConfig config, 082 final TransactionManager manager, 083 final DbTransactionExecutor dbTransactionExecutor) { 084 this.ocflStream = ids; 085 this.ocflIter = ocflStream.iterator(); 086 this.reindexService = reindexService; 087 this.batchSize = config.getReindexBatchSize(); 088 this.failOnError = config.isReindexFailOnError(); 089 txManager = manager; 090 this.dbTransactionExecutor = dbTransactionExecutor; 091 workers = new ArrayList<>(); 092 completedCount = new AtomicInteger(0); 093 errorCount = new AtomicInteger(0); 094 095 final var workerCount = config.getReindexingThreads(); 096 097 if (workerCount < 1) { 098 throw new IllegalStateException(String.format("Reindexing requires at least 1 thread. Found: %s", 099 workerCount)); 100 } 101 102 for (var i = 0; i < workerCount; i += 1) { 103 workers.add(new ReindexWorker("ReindexWorker-" + i, this, 104 this.reindexService, txManager, this.dbTransactionExecutor, this.failOnError)); 105 } 106 } 107 108 /** 109 * Start reindexing. 110 * @throws InterruptedException on an indexing error in a thread. 111 */ 112 public void start() throws InterruptedException { 113 final var reporter = startReporter(); 114 try { 115 workers.forEach(ReindexWorker::start); 116 for (final var worker : workers) { 117 worker.join(); 118 } 119 indexMembership(); 120 } catch (final Exception e) { 121 LOGGER.error("Error while rebuilding index", e); 122 stop(); 123 throw e; 124 } finally { 125 reporter.interrupt(); 126 } 127 } 128 129 /** 130 * Stop all threads. 131 */ 132 public void stop() { 133 LOGGER.debug("Stop worker threads"); 134 workers.forEach(ReindexWorker::stopThread); 135 } 136 137 /** 138 * Return a batch of OCFL ids to reindex. 139 * @return list of OCFL ids. 140 */ 141 public synchronized List<String> getIds() { 142 int counter = 0; 143 final List<String> ids = new ArrayList<>((int) batchSize); 144 while (ocflIter.hasNext() && counter < batchSize) { 145 ids.add(ocflIter.next()); 146 counter += 1; 147 } 148 return ids; 149 } 150 151 /** 152 * Update the master list of reindexing states. 153 * @param batchSuccessful how many items were completed successfully in the last batch. 154 * @param batchErrors how many items had an error in the last batch. 155 */ 156 public void updateComplete(final int batchSuccessful, final int batchErrors) { 157 completedCount.addAndGet(batchSuccessful); 158 errorCount.addAndGet(batchErrors); 159 } 160 161 /** 162 * @return the count of items that completed successfully. 163 */ 164 public int getCompletedCount() { 165 return completedCount.get(); 166 } 167 168 /** 169 * @return the count of items that had errors. 170 */ 171 public int getErrorCount() { 172 return errorCount.get(); 173 } 174 175 /** 176 * Index the membership relationships 177 */ 178 private void indexMembership() { 179 final var tx = transaction(); 180 LOGGER.info("Starting membership indexing"); 181 reindexService.indexMembership(tx); 182 tx.commit(); 183 LOGGER.debug("Completed membership indexing"); 184 } 185 186 /** 187 * Close stream. 188 */ 189 public void shutdown() { 190 ocflStream.close(); 191 } 192 193 private Thread startReporter() { 194 final var reporter = new Thread(() -> { 195 final var startTime = Instant.now(); 196 try { 197 while (true) { 198 TimeUnit.SECONDS.sleep(REPORTING_INTERVAL_SECS); 199 final var complete = completedCount.get(); 200 final var errored = errorCount.get(); 201 final var now = Instant.now(); 202 final var duration = Duration.between(startTime, now); 203 LOGGER.info("Index rebuild progress: Complete: {}; Errored: {}; Time: {}; Rate: {}/s", 204 complete, errored, getDurationMessage(duration), 205 (complete + errored) / duration.getSeconds()); 206 } 207 } catch (final InterruptedException e) { 208 // processing has completed exit normally 209 } 210 }); 211 212 reporter.start(); 213 return reporter; 214 } 215 216 private String getDurationMessage(final Duration duration) { 217 String message = String.format("%d secs", duration.toSecondsPart()); 218 if (duration.getSeconds() > 60) { 219 message = String.format("%d mins, ", duration.toMinutesPart()) + message; 220 } 221 if (duration.getSeconds() > 3600) { 222 message = String.format("%d hours, ", duration.getSeconds() / 3600) + message; 223 } 224 return message; 225 } 226 227 private Transaction transaction() { 228 if (transaction == null) { 229 transaction = txManager.create(); 230 transaction.setShortLived(true); 231 } 232 return transaction; 233 } 234}