001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.persistence.ocfl.impl; 019 020import static org.apache.jena.graph.NodeFactory.createURI; 021import static org.apache.jena.rdf.model.ModelFactory.createDefaultModel; 022import static org.fcrepo.kernel.api.RdfLexicon.NON_RDF_SOURCE; 023import static org.fcrepo.persistence.ocfl.impl.OcflPersistentStorageUtils.getRdfFormat; 024import static org.slf4j.LoggerFactory.getLogger; 025 026import java.io.IOException; 027import java.io.InputStream; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.Optional; 031import java.util.concurrent.Callable; 032import java.util.concurrent.atomic.AtomicReference; 033 034import javax.inject.Inject; 035import javax.validation.constraints.NotNull; 036 037import org.fcrepo.config.FedoraPropsConfig; 038import org.fcrepo.kernel.api.ContainmentIndex; 039import org.fcrepo.kernel.api.RdfLexicon; 040import org.fcrepo.kernel.api.RdfStream; 041import org.fcrepo.kernel.api.Transaction; 042import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 043import org.fcrepo.kernel.api.identifiers.FedoraId; 044import org.fcrepo.kernel.api.models.ResourceHeaders; 045import org.fcrepo.kernel.api.rdf.DefaultRdfStream; 046import org.fcrepo.kernel.api.services.MembershipService; 047import org.fcrepo.kernel.api.services.ReferenceService; 048import org.fcrepo.persistence.api.PersistentStorageSessionManager; 049import org.fcrepo.persistence.ocfl.api.FedoraToOcflObjectIndex; 050import org.fcrepo.search.api.Condition; 051import org.fcrepo.search.api.InvalidQueryException; 052import org.fcrepo.search.api.SearchIndex; 053import org.fcrepo.search.api.SearchParameters; 054import org.fcrepo.storage.ocfl.OcflObjectSessionFactory; 055import org.fcrepo.storage.ocfl.validation.ObjectValidator; 056 057import org.apache.jena.rdf.model.Model; 058import org.apache.jena.riot.RDFDataMgr; 059import org.slf4j.Logger; 060import org.springframework.beans.factory.annotation.Autowired; 061import org.springframework.beans.factory.annotation.Qualifier; 062import org.springframework.stereotype.Component; 063 064/** 065 * Service that does the reindexing for one OCFL object. 066 * @author whikloj 067 */ 068@Component 069public class ReindexService { 070 071 @Inject 072 private PersistentStorageSessionManager persistentStorageSessionManager; 073 074 @Inject 075 private OcflObjectSessionFactory ocflObjectSessionFactory; 076 077 @Autowired 078 @Qualifier("ocflIndex") 079 private FedoraToOcflObjectIndex ocflIndex; 080 081 @Autowired 082 @Qualifier("containmentIndex") 083 private ContainmentIndex containmentIndex; 084 085 @Autowired 086 @Qualifier("searchIndex") 087 private SearchIndex searchIndex; 088 089 @Autowired 090 @Qualifier("referenceService") 091 private ReferenceService referenceService; 092 093 @Inject 094 private MembershipService membershipService; 095 096 @Inject 097 private ObjectValidator objectValidator; 098 099 @Inject 100 private FedoraPropsConfig config; 101 102 private static final Logger LOGGER = getLogger(ReindexService.class); 103 104 private int membershipPageSize = 500; 105 106 public void indexOcflObject(final Transaction tx, final String ocflId) { 107 LOGGER.debug("Indexing ocflId {} in transaction {}", ocflId, tx.getId()); 108 109 objectValidator.validate(ocflId, config.isRebuildFixityCheck()); 110 111 try (final var session = ocflObjectSessionFactory.newSession(ocflId)) { 112 final var rootId = new AtomicReference<FedoraId>(); 113 final var fedoraIds = new ArrayList<FedoraId>(); 114 final var headersList = new ArrayList<ResourceHeaders>(); 115 116 session.streamResourceHeaders().forEach(storageHeaders -> { 117 final var headers = new ResourceHeadersAdapter(storageHeaders); 118 119 final var fedoraId = headers.getId(); 120 fedoraIds.add(fedoraId); 121 if (headers.isArchivalGroup() || headers.isObjectRoot()) { 122 rootId.set(fedoraId); 123 } 124 125 if (!fedoraId.isRepositoryRoot()) { 126 var parentId = headers.getParent(); 127 128 if (headers.getParent() == null) { 129 if (headers.isObjectRoot()) { 130 parentId = FedoraId.getRepositoryRootId(); 131 } else { 132 throw new IllegalStateException( 133 String.format("Resource %s must have a parent defined", fedoraId.getFullId())); 134 } 135 } 136 final var created = headers.getCreatedDate(); 137 if (!headers.isDeleted()) { 138 if (!headers.getInteractionModel().equals(NON_RDF_SOURCE.toString())) { 139 final Optional<InputStream> content = session.readContent(fedoraId.getFullId()) 140 .getContentStream(); 141 if (content.isPresent()) { 142 try (final var stream = content.get()) { 143 final RdfStream rdf = parseRdf(fedoraId, stream); 144 this.referenceService.updateReferences(tx, fedoraId, null, rdf); 145 } catch (final IOException e) { 146 LOGGER.warn("Content stream for {} closed prematurely, inbound references skipped.", 147 fedoraId.getFullId()); 148 throw new RepositoryRuntimeException(e.getMessage(), e); 149 } 150 } 151 } 152 153 this.containmentIndex.addContainedBy(tx, parentId, fedoraId, created, null); 154 headersList.add(headers.asKernelHeaders()); 155 } else { 156 final var deleted = headers.getLastModifiedDate(); 157 this.containmentIndex.addContainedBy(tx, parentId, fedoraId, created, deleted); 158 } 159 } 160 }); 161 162 if (rootId.get() == null) { 163 throw new IllegalStateException(String.format("Failed to find the root resource in object " + 164 "identified by %s. Please ensure that the object ID you are attempting to index " + 165 "refers to a corresponding valid Fedora-flavored object in the OCFL repository. Additionally " + 166 "be sure that the object ID corresponds with the object root resource (as opposed to child " + 167 "resources within the object).", ocflId)); 168 } 169 170 fedoraIds.forEach(fedoraIdentifier -> { 171 final var rootFedoraIdentifier = rootId.get(); 172 ocflIndex.addMapping(tx, fedoraIdentifier, rootFedoraIdentifier, ocflId); 173 LOGGER.debug("Rebuilt fedora-to-ocfl object index entry for {}", fedoraIdentifier); 174 }); 175 176 headersList.forEach(headers -> { 177 searchIndex.addUpdateIndex(tx, headers); 178 LOGGER.debug("Rebuilt searchIndex for {}", headers.getId()); 179 }); 180 } 181 } 182 183 /** 184 * Remove persistent sessions for a transaction to avoid memory leaks. 185 * @param transactionId the transaction id. 186 */ 187 public void cleanupSession(final String transactionId) { 188 persistentStorageSessionManager.removeSession(transactionId); 189 } 190 191 /** 192 * Set the membership page size. 193 * @param pageSize the new page size. 194 */ 195 public void setMembershipPageSize(final int pageSize) { 196 membershipPageSize = pageSize; 197 } 198 199 /** 200 * Reset all the indexes. 201 */ 202 public void reset() { 203 ocflIndex.reset(); 204 containmentIndex.reset(); 205 searchIndex.reset(); 206 referenceService.reset(); 207 membershipService.reset(); 208 } 209 210 /** 211 * Index all membership properties by querying for Direct containers, and then 212 * trying population of the membership index for each one 213 * @param transaction the transaction id. 214 */ 215 public void indexMembership(final Transaction transaction) { 216 LOGGER.debug("Starting indexMembership for transaction {}", transaction); 217 final var fields = List.of(Condition.Field.FEDORA_ID); 218 final var conditions = List.of(Condition.fromEnums(Condition.Field.RDF_TYPE, Condition.Operator.EQ, 219 RdfLexicon.DIRECT_CONTAINER.getURI())); 220 int offset = 0; 221 222 try { 223 int numResults; 224 do { 225 final var params = new SearchParameters(fields, conditions, membershipPageSize, 226 offset, Condition.Field.FEDORA_ID, "asc", false); 227 228 final var searchResult = searchIndex.doSearch(params); 229 final var resultList = searchResult.getItems(); 230 numResults = resultList.size(); 231 232 resultList.stream() 233 .map(entry -> FedoraId.create((String) entry.get(Condition.Field.FEDORA_ID.toString()))) 234 .forEach(containerId -> membershipService.populateMembershipHistory(transaction, containerId)); 235 236 // Results are paged, so step through pages until we reach the last one 237 offset += membershipPageSize; 238 } while (numResults == membershipPageSize); 239 240 } catch (final InvalidQueryException e) { 241 throw new RepositoryRuntimeException("Failed to repopulate membership history", e); 242 } 243 LOGGER.debug("Finished indexMembership for transaction {}", transaction); 244 } 245 246 /** 247 * Rollback changes in the transaction. 248 * @param tx the transaction 249 */ 250 public void rollbackMembership(@NotNull final Transaction tx) { 251 execQuietly("Failed to rollback membership index transaction " + tx.getId(), () -> { 252 membershipService.rollbackTransaction(tx); 253 return null; 254 }); 255 } 256 257 /** 258 * Executes the closure, capturing all exceptions, and logging them as errors. 259 * 260 * @param failureMessage what to print if the closure fails 261 * @param callable closure to execute 262 */ 263 private void execQuietly(final String failureMessage, final Callable<Void> callable) { 264 try { 265 callable.call(); 266 } catch (final Exception e) { 267 LOGGER.error(failureMessage, e); 268 } 269 } 270 271 /** 272 * Parse the inputstream from a Rdf resource to a RDFstream. 273 * 274 * @param fedoraIdentifier the resource identifier. 275 * @param inputStream the inputstream. 276 * @return an RdfStream of the resource triples. 277 */ 278 private static RdfStream parseRdf(final FedoraId fedoraIdentifier, final InputStream inputStream) { 279 final Model model = createDefaultModel(); 280 RDFDataMgr.read(model, inputStream, getRdfFormat().getLang()); 281 final FedoraId topic = (fedoraIdentifier.isDescription() ? fedoraIdentifier.asBaseId() : fedoraIdentifier); 282 return DefaultRdfStream.fromModel(createURI(topic.getFullId()), model); 283 } 284}