001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.persistence.ocfl.impl; 019 020import static org.slf4j.LoggerFactory.getLogger; 021 022import java.util.List; 023import java.util.concurrent.TimeUnit; 024 025import org.fcrepo.common.db.DbTransactionExecutor; 026import org.fcrepo.kernel.api.Transaction; 027import org.fcrepo.kernel.api.TransactionManager; 028 029import org.slf4j.Logger; 030 031import com.google.common.base.Stopwatch; 032 033/** 034 * A reindexing worker thread. 035 * @author whikloj 036 */ 037public class ReindexWorker implements Runnable { 038 039 private static final Logger LOGGER = getLogger(ReindexWorker.class); 040 041 private static final long REPORTING_INTERVAL_SECS = 30; 042 043 private Thread t; 044 private ReindexManager manager; 045 private ReindexService service; 046 private boolean running = true; 047 private boolean failOnError; 048 private TransactionManager txManager; 049 private DbTransactionExecutor dbTransactionExecutor; 050 051 /** 052 * Basic Constructor 053 * @param name the name of the worker -- used in logging 054 * @param reindexManager the manager service. 055 * @param reindexService the reindexing service. 056 * @param transactionManager a transaction manager to generate 057 * @param dbTransactionExecutor manages db transactions 058 * @param failOnError whether the thread should fail on an error or log and continue. 059 */ 060 public ReindexWorker(final String name, 061 final ReindexManager reindexManager, 062 final ReindexService reindexService, 063 final TransactionManager transactionManager, 064 final DbTransactionExecutor dbTransactionExecutor, 065 final boolean failOnError) { 066 manager = reindexManager; 067 service = reindexService; 068 txManager = transactionManager; 069 this.dbTransactionExecutor = dbTransactionExecutor; 070 this.failOnError = failOnError; 071 t = new Thread(this, name); 072 } 073 074 /** 075 * Join the thread. 076 * @throws InterruptedException if the current thread is interrupted. 077 */ 078 public void join() throws InterruptedException { 079 t.join(); 080 } 081 082 /** 083 * Start the thread with this Runnable 084 */ 085 public void start() { 086 t.start(); 087 } 088 089 @Override 090 public void run() { 091 final var stopwatch = Stopwatch.createStarted(); 092 while (running) { 093 final List<String> ids = manager.getIds(); 094 if (ids.isEmpty()) { 095 LOGGER.debug("No more objects found to process. Stopping..."); 096 stopThread(); 097 break; 098 } 099 100 int completed = 0; 101 int errors = 0; 102 103 for (final var id : ids) { 104 if (!running) { 105 break; 106 } 107 108 final Transaction tx = txManager.create(); 109 tx.setShortLived(true); 110 if (stopwatch.elapsed(TimeUnit.SECONDS) > REPORTING_INTERVAL_SECS) { 111 manager.updateComplete(completed, errors); 112 completed = 0; 113 errors = 0; 114 stopwatch.reset().start(); 115 } 116 try { 117 dbTransactionExecutor.doInTxWithRetry(() -> { 118 service.indexOcflObject(tx, id); 119 tx.commit(); 120 }); 121 completed += 1; 122 } catch (final Exception e) { 123 LOGGER.error("Reindexing of OCFL id {} failed", id, e); 124 tx.rollback(); 125 errors += 1; 126 if (failOnError) { 127 manager.updateComplete(completed, errors); 128 manager.stop(); 129 service.cleanupSession(tx.getId()); 130 throw e; 131 } 132 } 133 service.cleanupSession(tx.getId()); 134 } 135 manager.updateComplete(completed, errors); 136 } 137 } 138 139 /** 140 * Stop this thread from running once it has completed its current batch. 141 */ 142 public void stopThread() { 143 this.running = false; 144 } 145 146}