001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.persistence.ocfl.impl;
019
020import static org.apache.jena.graph.NodeFactory.createURI;
021import static org.apache.jena.rdf.model.ModelFactory.createDefaultModel;
022import static org.fcrepo.kernel.api.RdfLexicon.NON_RDF_SOURCE;
023import static org.fcrepo.persistence.ocfl.impl.OcflPersistentStorageUtils.getRdfFormat;
024import static org.slf4j.LoggerFactory.getLogger;
025
026import java.io.IOException;
027import java.io.InputStream;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.Optional;
031import java.util.concurrent.Callable;
032import java.util.concurrent.atomic.AtomicReference;
033
034import javax.inject.Inject;
035import javax.validation.constraints.NotNull;
036
037import org.fcrepo.config.FedoraPropsConfig;
038import org.fcrepo.kernel.api.ContainmentIndex;
039import org.fcrepo.kernel.api.RdfLexicon;
040import org.fcrepo.kernel.api.RdfStream;
041import org.fcrepo.kernel.api.Transaction;
042import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
043import org.fcrepo.kernel.api.identifiers.FedoraId;
044import org.fcrepo.kernel.api.models.ResourceHeaders;
045import org.fcrepo.kernel.api.rdf.DefaultRdfStream;
046import org.fcrepo.kernel.api.services.MembershipService;
047import org.fcrepo.kernel.api.services.ReferenceService;
048import org.fcrepo.persistence.api.PersistentStorageSessionManager;
049import org.fcrepo.persistence.ocfl.api.FedoraToOcflObjectIndex;
050import org.fcrepo.search.api.Condition;
051import org.fcrepo.search.api.InvalidQueryException;
052import org.fcrepo.search.api.SearchIndex;
053import org.fcrepo.search.api.SearchParameters;
054import org.fcrepo.storage.ocfl.OcflObjectSessionFactory;
055import org.fcrepo.storage.ocfl.validation.ObjectValidator;
056
057import org.apache.jena.rdf.model.Model;
058import org.apache.jena.riot.RDFDataMgr;
059import org.slf4j.Logger;
060import org.springframework.beans.factory.annotation.Autowired;
061import org.springframework.beans.factory.annotation.Qualifier;
062import org.springframework.stereotype.Component;
063
064/**
065 * Service that does the reindexing for one OCFL object.
066 * @author whikloj
067 */
068@Component
069public class ReindexService {
070
071    @Inject
072    private PersistentStorageSessionManager persistentStorageSessionManager;
073
074    @Inject
075    private OcflObjectSessionFactory ocflObjectSessionFactory;
076
077    @Autowired
078    @Qualifier("ocflIndex")
079    private FedoraToOcflObjectIndex ocflIndex;
080
081    @Autowired
082    @Qualifier("containmentIndex")
083    private ContainmentIndex containmentIndex;
084
085    @Autowired
086    @Qualifier("searchIndex")
087    private SearchIndex searchIndex;
088
089    @Autowired
090    @Qualifier("referenceService")
091    private ReferenceService referenceService;
092
093    @Inject
094    private MembershipService membershipService;
095
096    @Inject
097    private ObjectValidator objectValidator;
098
099    @Inject
100    private FedoraPropsConfig config;
101
102    private static final Logger LOGGER = getLogger(ReindexService.class);
103
104    private int membershipPageSize = 500;
105
106    public void indexOcflObject(final Transaction tx, final String ocflId) {
107        LOGGER.debug("Indexing ocflId {} in transaction {}", ocflId, tx.getId());
108
109        objectValidator.validate(ocflId, config.isRebuildFixityCheck());
110
111        try (final var session = ocflObjectSessionFactory.newSession(ocflId)) {
112            final var rootId = new AtomicReference<FedoraId>();
113            final var fedoraIds = new ArrayList<FedoraId>();
114            final var headersList = new ArrayList<ResourceHeaders>();
115
116            session.streamResourceHeaders().forEach(storageHeaders -> {
117                final var headers = new ResourceHeadersAdapter(storageHeaders);
118
119                final var fedoraId = headers.getId();
120                fedoraIds.add(fedoraId);
121                if (headers.isArchivalGroup() || headers.isObjectRoot()) {
122                    rootId.set(fedoraId);
123                }
124
125                if (!fedoraId.isRepositoryRoot()) {
126                    var parentId = headers.getParent();
127
128                    if (headers.getParent() == null) {
129                        if (headers.isObjectRoot()) {
130                            parentId = FedoraId.getRepositoryRootId();
131                        } else {
132                            throw new IllegalStateException(
133                                    String.format("Resource %s must have a parent defined", fedoraId.getFullId()));
134                        }
135                    }
136                    final var created = headers.getCreatedDate();
137                    if (!headers.isDeleted()) {
138                        if (!headers.getInteractionModel().equals(NON_RDF_SOURCE.toString())) {
139                            final Optional<InputStream> content = session.readContent(fedoraId.getFullId())
140                                    .getContentStream();
141                            if (content.isPresent()) {
142                                try (final var stream = content.get()) {
143                                    final RdfStream rdf = parseRdf(fedoraId, stream);
144                                    this.referenceService.updateReferences(tx, fedoraId, null, rdf);
145                                } catch (final IOException e) {
146                                    LOGGER.warn("Content stream for {} closed prematurely, inbound references skipped.",
147                                            fedoraId.getFullId());
148                                    throw new RepositoryRuntimeException(e.getMessage(), e);
149                                }
150                            }
151                        }
152
153                        this.containmentIndex.addContainedBy(tx, parentId, fedoraId, created, null);
154                        headersList.add(headers.asKernelHeaders());
155                    } else {
156                        final var deleted = headers.getLastModifiedDate();
157                        this.containmentIndex.addContainedBy(tx, parentId, fedoraId, created, deleted);
158                    }
159                }
160            });
161
162            if (rootId.get() == null) {
163                throw new IllegalStateException(String.format("Failed to find the root resource in object " +
164                        "identified by %s. Please ensure that the object ID you are attempting to index " +
165                        "refers to a corresponding valid Fedora-flavored object in the OCFL repository. Additionally " +
166                        "be sure that the object ID corresponds with the object root resource (as opposed to child " +
167                        "resources within the object).", ocflId));
168            }
169
170            fedoraIds.forEach(fedoraIdentifier -> {
171                final var rootFedoraIdentifier = rootId.get();
172                ocflIndex.addMapping(tx, fedoraIdentifier, rootFedoraIdentifier, ocflId);
173                LOGGER.debug("Rebuilt fedora-to-ocfl object index entry for {}", fedoraIdentifier);
174            });
175
176            headersList.forEach(headers -> {
177                searchIndex.addUpdateIndex(tx, headers);
178                LOGGER.debug("Rebuilt searchIndex for {}", headers.getId());
179            });
180        }
181    }
182
183    /**
184     * Remove persistent sessions for a transaction to avoid memory leaks.
185     * @param transactionId the transaction id.
186     */
187    public void cleanupSession(final String transactionId) {
188        persistentStorageSessionManager.removeSession(transactionId);
189    }
190
191    /**
192     * Set the membership page size.
193     * @param pageSize the new page size.
194     */
195    public void setMembershipPageSize(final int pageSize) {
196        membershipPageSize = pageSize;
197    }
198
199    /**
200     * Reset all the indexes.
201     */
202    public void reset() {
203        ocflIndex.reset();
204        containmentIndex.reset();
205        searchIndex.reset();
206        referenceService.reset();
207        membershipService.reset();
208    }
209
210    /**
211     * Index all membership properties by querying for Direct containers, and then
212     * trying population of the membership index for each one
213     * @param transaction the transaction id.
214     */
215    public void indexMembership(final Transaction transaction) {
216        LOGGER.debug("Starting indexMembership for transaction {}", transaction);
217        final var fields = List.of(Condition.Field.FEDORA_ID);
218        final var conditions = List.of(Condition.fromEnums(Condition.Field.RDF_TYPE, Condition.Operator.EQ,
219                RdfLexicon.DIRECT_CONTAINER.getURI()));
220        int offset = 0;
221
222        try {
223            int numResults;
224            do {
225                final var params = new SearchParameters(fields, conditions, membershipPageSize,
226                        offset, Condition.Field.FEDORA_ID, "asc", false);
227
228                final var searchResult = searchIndex.doSearch(params);
229                final var resultList = searchResult.getItems();
230                numResults = resultList.size();
231
232                resultList.stream()
233                        .map(entry -> FedoraId.create((String) entry.get(Condition.Field.FEDORA_ID.toString())))
234                        .forEach(containerId -> membershipService.populateMembershipHistory(transaction, containerId));
235
236                // Results are paged, so step through pages until we reach the last one
237                offset += membershipPageSize;
238            } while (numResults == membershipPageSize);
239
240        } catch (final InvalidQueryException e) {
241            throw new RepositoryRuntimeException("Failed to repopulate membership history", e);
242        }
243        LOGGER.debug("Finished indexMembership for transaction {}", transaction);
244    }
245
246    /**
247     * Rollback changes in the transaction.
248     * @param tx the transaction
249     */
250    public void rollbackMembership(@NotNull final Transaction tx) {
251        execQuietly("Failed to rollback membership index transaction " + tx.getId(), () -> {
252            membershipService.rollbackTransaction(tx);
253            return null;
254        });
255    }
256
257    /**
258     * Executes the closure, capturing all exceptions, and logging them as errors.
259     *
260     * @param failureMessage what to print if the closure fails
261     * @param callable closure to execute
262     */
263    private void execQuietly(final String failureMessage, final Callable<Void> callable) {
264        try {
265            callable.call();
266        } catch (final Exception e) {
267            LOGGER.error(failureMessage, e);
268        }
269    }
270
271    /**
272     * Parse the inputstream from a Rdf resource to a RDFstream.
273     *
274     * @param fedoraIdentifier the resource identifier.
275     * @param inputStream the inputstream.
276     * @return an RdfStream of the resource triples.
277     */
278    private static RdfStream parseRdf(final FedoraId fedoraIdentifier, final InputStream inputStream) {
279        final Model model = createDefaultModel();
280        RDFDataMgr.read(model, inputStream, getRdfFormat().getLang());
281        final FedoraId topic = (fedoraIdentifier.isDescription() ? fedoraIdentifier.asBaseId() : fedoraIdentifier);
282        return DefaultRdfStream.fromModel(createURI(topic.getFullId()), model);
283    }
284}