001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.kernel.impl;
019
020import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_ID_PREFIX;
021import static org.slf4j.LoggerFactory.getLogger;
022
023import java.sql.Timestamp;
024import java.time.Instant;
025import java.time.temporal.ChronoUnit;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.NoSuchElementException;
030import java.util.Queue;
031import java.util.Spliterator;
032import java.util.Spliterators;
033import java.util.concurrent.ConcurrentLinkedQueue;
034import java.util.concurrent.TimeUnit;
035import java.util.function.Consumer;
036import java.util.stream.Stream;
037import java.util.stream.StreamSupport;
038
039import javax.annotation.Nonnull;
040import javax.annotation.PostConstruct;
041import javax.inject.Inject;
042import javax.sql.DataSource;
043
044import org.fcrepo.common.db.DbPlatform;
045import org.fcrepo.config.FedoraPropsConfig;
046import org.fcrepo.kernel.api.ContainmentIndex;
047import org.fcrepo.kernel.api.Transaction;
048import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
049import org.fcrepo.kernel.api.identifiers.FedoraId;
050
051import org.slf4j.Logger;
052import org.springframework.dao.EmptyResultDataAccessException;
053import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
054import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
055import org.springframework.stereotype.Component;
056import org.springframework.transaction.annotation.Propagation;
057import org.springframework.transaction.annotation.Transactional;
058
059import com.github.benmanes.caffeine.cache.Cache;
060import com.github.benmanes.caffeine.cache.Caffeine;
061
062/**
063 * @author peichman
064 * @author whikloj
065 * @since 6.0.0
066 */
067@Component("containmentIndexImpl")
068public class ContainmentIndexImpl implements ContainmentIndex {
069
070    private static final Logger LOGGER = getLogger(ContainmentIndexImpl.class);
071
072    private int containsLimit = 50000;
073
074    @Inject
075    private DataSource dataSource;
076
077    private NamedParameterJdbcTemplate jdbcTemplate;
078
079    private DbPlatform dbPlatform;
080
081    public static final String RESOURCES_TABLE = "containment";
082
083    private static final String TRANSACTION_OPERATIONS_TABLE = "containment_transactions";
084
085    public static final String FEDORA_ID_COLUMN = "fedora_id";
086
087    private static final String PARENT_COLUMN = "parent";
088
089    private static final String TRANSACTION_ID_COLUMN = "transaction_id";
090
091    private static final String OPERATION_COLUMN = "operation";
092
093    private static final String START_TIME_COLUMN = "start_time";
094
095    private static final String END_TIME_COLUMN = "end_time";
096
097    private static final String UPDATED_COLUMN = "updated";
098
099    /*
100     * Select children of a resource that are not marked as deleted.
101     */
102    private static final String SELECT_CHILDREN = "SELECT " + FEDORA_ID_COLUMN +
103            " FROM " + RESOURCES_TABLE + " WHERE " + PARENT_COLUMN + " = :parent AND " + END_TIME_COLUMN + " IS NULL" +
104            " ORDER BY " + FEDORA_ID_COLUMN + " LIMIT :containsLimit OFFSET :offSet";
105
106    /*
107     * Select children of a memento of a resource.
108     */
109    private static final String SELECT_CHILDREN_OF_MEMENTO = "SELECT " + FEDORA_ID_COLUMN +
110            " FROM " + RESOURCES_TABLE + " WHERE " + PARENT_COLUMN + " = :parent AND " + START_TIME_COLUMN +
111            " <= :asOfTime AND (" + END_TIME_COLUMN + " > :asOfTime OR " + END_TIME_COLUMN + " IS NULL) ORDER BY " +
112            FEDORA_ID_COLUMN + " LIMIT :containsLimit OFFSET :offSet";
113
114    /*
115     * Select children of a parent from resources table and from the transaction table with an 'add' operation,
116     * but exclude any records that also exist in the transaction table with a 'delete' or 'purge' operation.
117     */
118    private static final String SELECT_CHILDREN_IN_TRANSACTION = "SELECT x." + FEDORA_ID_COLUMN + " FROM" +
119            " (SELECT " + FEDORA_ID_COLUMN + " FROM " + RESOURCES_TABLE + " WHERE " + PARENT_COLUMN + " = :parent" +
120            " AND " + END_TIME_COLUMN + " IS NULL " +
121            " UNION SELECT " + FEDORA_ID_COLUMN + " FROM " + TRANSACTION_OPERATIONS_TABLE +
122            " WHERE " + PARENT_COLUMN + " = :parent AND " + TRANSACTION_ID_COLUMN + " = :transactionId" +
123            " AND " + OPERATION_COLUMN + " = 'add') x" +
124            " WHERE NOT EXISTS " +
125            " (SELECT 1 FROM " + TRANSACTION_OPERATIONS_TABLE +
126            " WHERE " + PARENT_COLUMN + " = :parent AND " + FEDORA_ID_COLUMN + " = x." + FEDORA_ID_COLUMN +
127            " AND " + TRANSACTION_ID_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " IN ('delete', 'purge'))" +
128            " ORDER BY x." + FEDORA_ID_COLUMN + " LIMIT :containsLimit OFFSET :offSet";
129
130    /*
131     * Select all children of a resource that are marked for deletion.
132     */
133    private static final String SELECT_DELETED_CHILDREN = "SELECT " + FEDORA_ID_COLUMN +
134            " FROM " + RESOURCES_TABLE + " WHERE " + PARENT_COLUMN + " = :parent AND " + END_TIME_COLUMN +
135            " IS NOT NULL ORDER BY " + FEDORA_ID_COLUMN + " LIMIT :containsLimit OFFSET :offSet";
136
137    /*
138     * Select children of a resource plus children 'delete'd in the non-committed transaction, but excluding any
139     * 'add'ed in the non-committed transaction.
140     */
141    private static final String SELECT_DELETED_CHILDREN_IN_TRANSACTION = "SELECT x." + FEDORA_ID_COLUMN +
142            " FROM (SELECT " + FEDORA_ID_COLUMN + " FROM " + RESOURCES_TABLE +
143            " WHERE " + PARENT_COLUMN + " = :parent AND " + END_TIME_COLUMN + " IS NOT NULL UNION" +
144            " SELECT " + FEDORA_ID_COLUMN + " FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " +
145            PARENT_COLUMN + " = :parent AND " + TRANSACTION_ID_COLUMN + " = :transactionId AND " +
146            OPERATION_COLUMN + " = 'delete') x" +
147            " WHERE NOT EXISTS " +
148            "(SELECT 1 FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " + PARENT_COLUMN + " = :parent AND " +
149            FEDORA_ID_COLUMN + " = x." + FEDORA_ID_COLUMN + " AND " + TRANSACTION_ID_COLUMN + " = :transactionId AND " +
150            OPERATION_COLUMN + " = 'add') ORDER BY x." + FEDORA_ID_COLUMN + " LIMIT :containsLimit OFFSET :offSet";
151
152    /*
153     * Upsert a parent child relationship to the transaction operation table.
154     */
155    private static final String UPSERT_RECORDS_POSTGRESQL = "INSERT INTO " + TRANSACTION_OPERATIONS_TABLE +
156            " ( " + PARENT_COLUMN + ", " + FEDORA_ID_COLUMN + ", " + START_TIME_COLUMN + ", " + END_TIME_COLUMN + ", " +
157            TRANSACTION_ID_COLUMN + ", " + OPERATION_COLUMN + ") VALUES (:parent, :child, :startTime, :endTime, " +
158            ":transactionId, :operation) ON CONFLICT ( " +  FEDORA_ID_COLUMN + ", " + TRANSACTION_ID_COLUMN + ") " +
159            "DO UPDATE SET " + PARENT_COLUMN + " = EXCLUDED." + PARENT_COLUMN + ", " +
160            START_TIME_COLUMN + " = EXCLUDED." + START_TIME_COLUMN + ", " + END_TIME_COLUMN + " = EXCLUDED." +
161            END_TIME_COLUMN + ", " + OPERATION_COLUMN + " = EXCLUDED." + OPERATION_COLUMN;
162
163    private static final String UPSERT_RECORDS_MYSQL_MARIA = "INSERT INTO " + TRANSACTION_OPERATIONS_TABLE +
164            " (" + PARENT_COLUMN + ", " + FEDORA_ID_COLUMN + ", " + START_TIME_COLUMN + ", " + END_TIME_COLUMN + ", " +
165            TRANSACTION_ID_COLUMN + ", " + OPERATION_COLUMN + ") VALUES (:parent, :child, :startTime, :endTime, " +
166            ":transactionId, :operation) ON DUPLICATE KEY UPDATE " +
167            PARENT_COLUMN + " = VALUES(" + PARENT_COLUMN + "), " + START_TIME_COLUMN + " = VALUES(" +
168            START_TIME_COLUMN + "), " + END_TIME_COLUMN + " = VALUES(" + END_TIME_COLUMN + "), " + OPERATION_COLUMN +
169            " = VALUES(" + OPERATION_COLUMN + ")";
170
171    private static final String UPSERT_RECORDS_H2 = "MERGE INTO " + TRANSACTION_OPERATIONS_TABLE +
172            " (" + PARENT_COLUMN + ", " + FEDORA_ID_COLUMN + ", " + START_TIME_COLUMN + ", " + END_TIME_COLUMN + ", " +
173            TRANSACTION_ID_COLUMN + ", " + OPERATION_COLUMN + ") KEY (" + FEDORA_ID_COLUMN + ", " +
174            TRANSACTION_ID_COLUMN + ") VALUES (:parent, :child, :startTime, :endTime, :transactionId, :operation)";
175
176    private static final String DIRECT_UPDATE_END_TIME = "UPDATE " + RESOURCES_TABLE +
177            " SET " + END_TIME_COLUMN + " = :endTime WHERE " +
178            PARENT_COLUMN + " = :parent AND " + FEDORA_ID_COLUMN + " = :child";
179
180    private static final String DIRECT_INSERT_RECORDS = "INSERT INTO " + RESOURCES_TABLE +
181            " (" + PARENT_COLUMN + ", " + FEDORA_ID_COLUMN + ", " + START_TIME_COLUMN + ", " + END_TIME_COLUMN + ")" +
182            " VALUES (:parent, :child, :startTime, :endTime)";
183
184    private static final Map<DbPlatform, String> UPSERT_MAPPING = Map.of(
185            DbPlatform.H2, UPSERT_RECORDS_H2,
186            DbPlatform.MYSQL, UPSERT_RECORDS_MYSQL_MARIA,
187            DbPlatform.MARIADB, UPSERT_RECORDS_MYSQL_MARIA,
188            DbPlatform.POSTGRESQL, UPSERT_RECORDS_POSTGRESQL
189    );
190
191    private static final String DIRECT_PURGE = "DELETE FROM containment WHERE fedora_id = :child";
192
193    /*
194     * Remove an insert row from the transaction operation table for this parent child relationship.
195     */
196    private static final String UNDO_INSERT_CHILD_IN_TRANSACTION = "DELETE FROM " + TRANSACTION_OPERATIONS_TABLE +
197            " WHERE " + PARENT_COLUMN + " = :parent AND " + FEDORA_ID_COLUMN + " = :child AND " + TRANSACTION_ID_COLUMN
198            + " = :transactionId AND " + OPERATION_COLUMN + " = 'add'";
199
200    /*
201     * Remove a mark as deleted row from the transaction operation table for this child relationship (no parent).
202     */
203    private static final String UNDO_DELETE_CHILD_IN_TRANSACTION_NO_PARENT = "DELETE FROM " +
204            TRANSACTION_OPERATIONS_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + TRANSACTION_ID_COLUMN
205            + " = :transactionId AND " + OPERATION_COLUMN + " = 'delete'";
206
207    /*
208     * Is this parent child relationship being added in this transaction?
209     */
210    private static final String IS_CHILD_ADDED_IN_TRANSACTION = "SELECT TRUE FROM " + TRANSACTION_OPERATIONS_TABLE +
211            " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + PARENT_COLUMN + " = :parent" +
212            " AND " + TRANSACTION_ID_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " = 'add'";
213
214    /*
215     * Is this child's relationship being marked for deletion in this transaction (no parent)?
216     */
217    private static final String IS_CHILD_DELETED_IN_TRANSACTION_NO_PARENT = "SELECT TRUE FROM " +
218            TRANSACTION_OPERATIONS_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child " +
219            " AND " + TRANSACTION_ID_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " = 'delete'";
220
221   /*
222    * Delete all rows from the transaction operation table for this transaction.
223    */
224    private static final String DELETE_ENTIRE_TRANSACTION = "DELETE FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " +
225            TRANSACTION_ID_COLUMN + " = :transactionId";
226
227    /*
228     * Add to the main table all rows from the transaction operation table marked 'add' for this transaction.
229     */
230    private static final String COMMIT_ADD_RECORDS_POSTGRESQL = "INSERT INTO " + RESOURCES_TABLE +
231            " ( " + FEDORA_ID_COLUMN + ", " + PARENT_COLUMN + ", " + START_TIME_COLUMN + ", " + END_TIME_COLUMN + ") " +
232            "SELECT " + FEDORA_ID_COLUMN + ", " + PARENT_COLUMN + ", " + START_TIME_COLUMN + ", " + END_TIME_COLUMN +
233            " FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " + OPERATION_COLUMN + " = 'add' AND " +
234            TRANSACTION_ID_COLUMN + " = :transactionId ON CONFLICT ( " +  FEDORA_ID_COLUMN + " )" +
235            " DO UPDATE SET " + PARENT_COLUMN + " = EXCLUDED." + PARENT_COLUMN + ", " +
236            START_TIME_COLUMN + " = EXCLUDED." + START_TIME_COLUMN + ", " + END_TIME_COLUMN + " = EXCLUDED." +
237            END_TIME_COLUMN;
238
239    private static final String COMMIT_ADD_RECORDS_MYSQL_MARIA = "INSERT INTO " + RESOURCES_TABLE +
240            " (" + FEDORA_ID_COLUMN + ", " + PARENT_COLUMN + ", " + START_TIME_COLUMN + ", " + END_TIME_COLUMN + ") " +
241            "SELECT " + FEDORA_ID_COLUMN + ", " + PARENT_COLUMN + ", " + START_TIME_COLUMN + ", " + END_TIME_COLUMN +
242            " FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " + OPERATION_COLUMN + " = 'add' AND " +
243            TRANSACTION_ID_COLUMN + " = :transactionId ON DUPLICATE KEY UPDATE " +
244            PARENT_COLUMN + " = VALUES(" + PARENT_COLUMN + "), " + START_TIME_COLUMN + " = VALUES(" +
245            START_TIME_COLUMN + "), " + END_TIME_COLUMN + " = VALUES(" + END_TIME_COLUMN + ")";
246
247    private static final String COMMIT_ADD_RECORDS_H2 = "MERGE INTO " + RESOURCES_TABLE +
248            " (" + FEDORA_ID_COLUMN + ", " + PARENT_COLUMN + ", " + START_TIME_COLUMN + ", " + END_TIME_COLUMN + ") " +
249            "KEY (" + FEDORA_ID_COLUMN + ") SELECT " + FEDORA_ID_COLUMN + ", " + PARENT_COLUMN + ", " +
250            START_TIME_COLUMN + ", " + END_TIME_COLUMN + " FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " +
251            OPERATION_COLUMN + " = 'add' AND " + TRANSACTION_ID_COLUMN + " = :transactionId";
252
253    private static final Map<DbPlatform, String> COMMIT_ADD_RECORDS_MAP = Map.of(
254            DbPlatform.H2, COMMIT_ADD_RECORDS_H2,
255            DbPlatform.MYSQL, COMMIT_ADD_RECORDS_MYSQL_MARIA,
256            DbPlatform.MARIADB, COMMIT_ADD_RECORDS_MYSQL_MARIA,
257            DbPlatform.POSTGRESQL, COMMIT_ADD_RECORDS_POSTGRESQL
258    );
259
260    /*
261     * Add an end time to the rows in the main table that match all rows from transaction operation table marked
262     * 'delete' for this transaction.
263     */
264    private static final String COMMIT_DELETE_RECORDS_H2 = "UPDATE " + RESOURCES_TABLE +
265            " r SET r." + END_TIME_COLUMN + " = ( SELECT t." + END_TIME_COLUMN + " FROM " +
266            TRANSACTION_OPERATIONS_TABLE + " t " +
267            " WHERE t." + FEDORA_ID_COLUMN + " = r." + FEDORA_ID_COLUMN + " AND t." + TRANSACTION_ID_COLUMN +
268            " = :transactionId AND t." +  OPERATION_COLUMN +
269            " = 'delete' AND t." + PARENT_COLUMN + " = r." + PARENT_COLUMN + " AND r." +
270            END_TIME_COLUMN + " IS NULL)" +
271            " WHERE EXISTS (SELECT 1 FROM " + TRANSACTION_OPERATIONS_TABLE + " t WHERE t." + FEDORA_ID_COLUMN +
272            " = r." + FEDORA_ID_COLUMN + " AND t." + TRANSACTION_ID_COLUMN + " = :transactionId AND t." +
273            OPERATION_COLUMN + " = 'delete' AND t." + PARENT_COLUMN + " = r." + PARENT_COLUMN + " AND r." +
274            END_TIME_COLUMN + " IS NULL)";
275
276    private static final String COMMIT_DELETE_RECORDS_MYSQL = "UPDATE " + RESOURCES_TABLE +
277            " r INNER JOIN " + TRANSACTION_OPERATIONS_TABLE + " t ON t." + FEDORA_ID_COLUMN + " = r." +
278            FEDORA_ID_COLUMN + " SET r." + END_TIME_COLUMN + " = t." + END_TIME_COLUMN +
279            " WHERE t." + PARENT_COLUMN + " = r." +
280            PARENT_COLUMN + " AND t." + TRANSACTION_ID_COLUMN + " = :transactionId AND t." +  OPERATION_COLUMN +
281            " = 'delete' AND r." + END_TIME_COLUMN + " IS NULL";
282
283    private static final String COMMIT_DELETE_RECORDS_POSTGRES = "UPDATE " + RESOURCES_TABLE + " SET " +
284            END_TIME_COLUMN + " = t." + END_TIME_COLUMN + " FROM " + TRANSACTION_OPERATIONS_TABLE + " t WHERE t." +
285            FEDORA_ID_COLUMN + " = " + RESOURCES_TABLE + "." + FEDORA_ID_COLUMN + " AND t." + PARENT_COLUMN +
286            " = " + RESOURCES_TABLE + "." + PARENT_COLUMN + " AND t." + TRANSACTION_ID_COLUMN +
287            " = :transactionId AND t." + OPERATION_COLUMN + " = 'delete' AND " + RESOURCES_TABLE + "." +
288            END_TIME_COLUMN + " IS NULL";
289
290    private Map<DbPlatform, String> COMMIT_DELETE_RECORDS = Map.of(
291            DbPlatform.H2, COMMIT_DELETE_RECORDS_H2,
292            DbPlatform.MARIADB, COMMIT_DELETE_RECORDS_MYSQL,
293            DbPlatform.MYSQL, COMMIT_DELETE_RECORDS_MYSQL,
294            DbPlatform.POSTGRESQL, COMMIT_DELETE_RECORDS_POSTGRES
295    );
296
297    /*
298     * Remove from the main table all rows from transaction operation table marked 'purge' for this transaction.
299     */
300    private static final String COMMIT_PURGE_RECORDS = "DELETE FROM " + RESOURCES_TABLE + " WHERE " +
301            "EXISTS (SELECT 1 FROM " + TRANSACTION_OPERATIONS_TABLE + " t WHERE t." +
302            TRANSACTION_ID_COLUMN + " = :transactionId AND t." +  OPERATION_COLUMN + " = 'purge' AND" +
303            " t." + FEDORA_ID_COLUMN + " = " + RESOURCES_TABLE + "." + FEDORA_ID_COLUMN +
304            " AND t." + PARENT_COLUMN + " = " + RESOURCES_TABLE + "." + PARENT_COLUMN + ")";
305
306    /*
307     * Query if a resource exists in the main table and is not deleted.
308     */
309    private static final String RESOURCE_EXISTS = "SELECT " + FEDORA_ID_COLUMN + " FROM " + RESOURCES_TABLE +
310            " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + END_TIME_COLUMN + " IS NULL";
311
312    /*
313     * Resource exists as a record in the transaction operations table with an 'add' operation and not also
314     * exists as a 'delete' operation.
315     */
316    private static final String RESOURCE_EXISTS_IN_TRANSACTION = "SELECT x." + FEDORA_ID_COLUMN + " FROM" +
317            " (SELECT " + FEDORA_ID_COLUMN + " FROM " + RESOURCES_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child" +
318            "  AND " + END_TIME_COLUMN + " IS NULL UNION SELECT " + FEDORA_ID_COLUMN + " FROM " +
319            TRANSACTION_OPERATIONS_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + TRANSACTION_ID_COLUMN +
320            " = :transactionId" + " AND " + OPERATION_COLUMN + " = 'add') x WHERE NOT EXISTS " +
321            " (SELECT 1 FROM " + TRANSACTION_OPERATIONS_TABLE +
322            " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + TRANSACTION_ID_COLUMN + " = :transactionId" +
323            " AND " + OPERATION_COLUMN + " IN ('delete', 'purge'))";
324
325    /*
326     * Query if a resource exists in the main table even if it is deleted.
327     */
328    private static final String RESOURCE_OR_TOMBSTONE_EXISTS = "SELECT " + FEDORA_ID_COLUMN + " FROM " +
329            RESOURCES_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child";
330
331    /*
332     * Resource exists as a record in the main table even if deleted or in the transaction operations table with an
333     * 'add' operation and not also exists as a 'delete' operation.
334     */
335    private static final String RESOURCE_OR_TOMBSTONE_EXISTS_IN_TRANSACTION = "SELECT x." + FEDORA_ID_COLUMN + " FROM" +
336            " (SELECT " + FEDORA_ID_COLUMN + " FROM " + RESOURCES_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child" +
337            " UNION SELECT " + FEDORA_ID_COLUMN + " FROM " +
338            TRANSACTION_OPERATIONS_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + TRANSACTION_ID_COLUMN +
339            " = :transactionId" + " AND " + OPERATION_COLUMN + " = 'add') x WHERE NOT EXISTS " +
340            " (SELECT 1 FROM " + TRANSACTION_OPERATIONS_TABLE +
341            " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + TRANSACTION_ID_COLUMN + " = :transactionId" +
342            " AND " + OPERATION_COLUMN + " IN ('delete', 'purge'))";
343
344
345    /*
346     * Get the parent ID for this resource from the main table if not deleted.
347     */
348    private static final String PARENT_EXISTS = "SELECT " + PARENT_COLUMN + " FROM " + RESOURCES_TABLE +
349            " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + END_TIME_COLUMN + " IS NULL";
350
351    /*
352     * Get the parent ID for this resource from the operations table for an 'add' operation in this transaction, but
353     * exclude any 'delete' operations for this resource in this transaction.
354     */
355    private static final String PARENT_EXISTS_IN_TRANSACTION = "SELECT x." + PARENT_COLUMN + " FROM" +
356            " (SELECT " + PARENT_COLUMN + " FROM " + RESOURCES_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child" +
357            " AND " + END_TIME_COLUMN + " IS NULL" +
358            " UNION SELECT " + PARENT_COLUMN + " FROM " + TRANSACTION_OPERATIONS_TABLE +
359            " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + TRANSACTION_ID_COLUMN + " = :transactionId" +
360            " AND " + OPERATION_COLUMN + " = 'add') x" +
361            " WHERE NOT EXISTS " +
362            " (SELECT 1 FROM " + TRANSACTION_OPERATIONS_TABLE +
363            " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + TRANSACTION_ID_COLUMN + " = :transactionId" +
364            " AND " + OPERATION_COLUMN + " = 'delete')";
365
366    /*
367     * Get the parent ID for this resource from the main table if deleted.
368     */
369    private static final String PARENT_EXISTS_DELETED = "SELECT " + PARENT_COLUMN + " FROM " + RESOURCES_TABLE +
370            " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + END_TIME_COLUMN + " IS NOT NULL";
371
372    /*
373     * Get the parent ID for this resource from main table and the operations table for a 'delete' operation in this
374     * transaction, excluding any 'add' operations for this resource in this transaction.
375     */
376    private static final String PARENT_EXISTS_DELETED_IN_TRANSACTION = "SELECT x." + PARENT_COLUMN + " FROM" +
377            " (SELECT " + PARENT_COLUMN + " FROM " + RESOURCES_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child" +
378            " AND " + END_TIME_COLUMN + " IS NOT NULL UNION SELECT " + PARENT_COLUMN + " FROM " +
379            TRANSACTION_OPERATIONS_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + TRANSACTION_ID_COLUMN +
380            " = :transactionId AND " + OPERATION_COLUMN + " = 'delete') x WHERE NOT EXISTS " +
381            " (SELECT 1 FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child AND " +
382            TRANSACTION_ID_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " = 'add')";
383
384    /*
385     * Does this resource exist in the transaction operation table for an 'add' record.
386     */
387    private static final String IS_CHILD_ADDED_IN_TRANSACTION_NO_PARENT = "SELECT TRUE FROM " +
388            TRANSACTION_OPERATIONS_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child AND " +
389            TRANSACTION_ID_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " = 'add'";
390
391    /*
392     * Delete a row from the transaction operation table with this resource and 'add' operation, no parent required.
393     */
394    private static final String UNDO_INSERT_CHILD_IN_TRANSACTION_NO_PARENT = "DELETE FROM " +
395            TRANSACTION_OPERATIONS_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :child AND " + TRANSACTION_ID_COLUMN
396            + " = :transactionId AND " + OPERATION_COLUMN + " = 'add'";
397
398    private static final String TRUNCATE_TABLE = "TRUNCATE TABLE ";
399
400    /*
401     * Any record tracked in the containment index is either active or a tombstone. Either way it exists for the
402     * purpose of finding ghost nodes.
403     */
404    private static final String SELECT_ID_LIKE = "SELECT " + FEDORA_ID_COLUMN + " FROM " + RESOURCES_TABLE + " WHERE " +
405            FEDORA_ID_COLUMN + " LIKE :resourceId";
406
407    private static final String SELECT_ID_LIKE_IN_TRANSACTION = "SELECT x." + FEDORA_ID_COLUMN + " FROM (SELECT " +
408            FEDORA_ID_COLUMN + " FROM " + RESOURCES_TABLE + " WHERE " + FEDORA_ID_COLUMN + " LIKE :resourceId" +
409            " UNION SELECT " + FEDORA_ID_COLUMN + " FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " +
410            FEDORA_ID_COLUMN + " LIKE :resourceId AND " + TRANSACTION_ID_COLUMN + " = :transactionId AND " +
411            OPERATION_COLUMN + " = 'add') x WHERE NOT EXISTS (SELECT 1 FROM " + TRANSACTION_OPERATIONS_TABLE +
412            " WHERE " + FEDORA_ID_COLUMN + " LIKE :resourceId AND " + TRANSACTION_ID_COLUMN + " = :transactionId AND " +
413            OPERATION_COLUMN + " = 'delete')";
414
415    private static final String SELECT_LAST_UPDATED = "SELECT " + UPDATED_COLUMN + " FROM " + RESOURCES_TABLE +
416            " WHERE " + FEDORA_ID_COLUMN + " = :resourceId";
417
418    private static final String UPDATE_LAST_UPDATED = "UPDATE " + RESOURCES_TABLE + " SET " + UPDATED_COLUMN +
419            " = :updated WHERE " + FEDORA_ID_COLUMN + " = :resourceId";
420
421    private static final String CONDITIONALLY_UPDATE_LAST_UPDATED = "UPDATE " + RESOURCES_TABLE +
422            " SET " + UPDATED_COLUMN + " = :updated WHERE " + FEDORA_ID_COLUMN + " = :resourceId" +
423            " AND (" + UPDATED_COLUMN + " IS NULL OR " + UPDATED_COLUMN + " < :updated)";
424
425    private static final String SELECT_LAST_UPDATED_IN_TX = "SELECT MAX(x.updated)" +
426            " FROM (SELECT " + UPDATED_COLUMN + " as updated FROM " + RESOURCES_TABLE + " WHERE " +
427            FEDORA_ID_COLUMN + " = :resourceId UNION SELECT " + START_TIME_COLUMN +
428            " as updated FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " + PARENT_COLUMN + " = :resourceId AND " +
429            OPERATION_COLUMN + " = 'add' AND " + TRANSACTION_ID_COLUMN + " = :transactionId UNION SELECT " +
430            END_TIME_COLUMN + " as updated FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " + PARENT_COLUMN +
431            " = :resourceId AND " + OPERATION_COLUMN + " = 'delete' AND " + TRANSACTION_ID_COLUMN +
432            " = :transactionId UNION SELECT " + END_TIME_COLUMN +
433            " as updated FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " + PARENT_COLUMN + " = :resourceId AND " +
434            OPERATION_COLUMN + " = 'add' AND " + TRANSACTION_ID_COLUMN + " = :transactionId) x";
435
436    private static final String GET_UPDATED_RESOURCES = "SELECT DISTINCT " + PARENT_COLUMN + " FROM " +
437            TRANSACTION_OPERATIONS_TABLE + " WHERE " + TRANSACTION_ID_COLUMN + " = :transactionId AND " +
438            OPERATION_COLUMN + " in ('add', 'delete')";
439
440    /*
441     * Get the startTime for the specified resource from the main table, if it exists.
442     */
443    private static final String GET_START_TIME = "SELECT " + START_TIME_COLUMN + " FROM " + RESOURCES_TABLE +
444            " WHERE " + FEDORA_ID_COLUMN + " = :child";
445
446    /*
447     * Get all resources deleted in this transaction
448     */
449    private static final String GET_DELETED_RESOURCES = "SELECT " + FEDORA_ID_COLUMN + " FROM " +
450            TRANSACTION_OPERATIONS_TABLE + " WHERE " + TRANSACTION_ID_COLUMN + " = :transactionId AND " +
451            OPERATION_COLUMN + " = 'delete'";
452
453    /*
454     * Get all resources added in this transaction
455     */
456    private static final String GET_ADDED_RESOURCES = "SELECT " + FEDORA_ID_COLUMN + " FROM " +
457            TRANSACTION_OPERATIONS_TABLE + " WHERE " + TRANSACTION_ID_COLUMN + " = :transactionId AND " +
458            OPERATION_COLUMN + " = 'add'";
459
460    @Inject
461    private FedoraPropsConfig fedoraPropsConfig;
462
463    private Cache<String, String> getContainedByCache;
464
465    private Cache<String, Boolean> resourceExistsCache;
466
467    /**
468     * Connect to the database
469     */
470    @PostConstruct
471    private void setup() {
472        jdbcTemplate = getNamedParameterJdbcTemplate();
473        dbPlatform = DbPlatform.fromDataSource(dataSource);
474        this.getContainedByCache = Caffeine.newBuilder()
475                .maximumSize(fedoraPropsConfig.getContainmentCacheSize())
476                .expireAfterAccess(fedoraPropsConfig.getContainmentCacheTimeout(), TimeUnit.MINUTES)
477                .build();
478        this.resourceExistsCache = Caffeine.newBuilder()
479                .maximumSize(fedoraPropsConfig.getContainmentCacheSize())
480                .expireAfterAccess(fedoraPropsConfig.getContainmentCacheTimeout(), TimeUnit.MINUTES)
481                .build();
482    }
483
484    private NamedParameterJdbcTemplate getNamedParameterJdbcTemplate() {
485        return new NamedParameterJdbcTemplate(getDataSource());
486    }
487
488    void setContainsLimit(final int limit) {
489        containsLimit = limit;
490    }
491
492    @Override
493    public Stream<String> getContains(@Nonnull final Transaction tx, final FedoraId fedoraId) {
494        final String resourceId = fedoraId.isMemento() ? fedoraId.getBaseId() : fedoraId.getFullId();
495        final Instant asOfTime = fedoraId.isMemento() ? fedoraId.getMementoInstant() : null;
496        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
497        parameterSource.addValue("parent", resourceId);
498
499        LOGGER.debug("getContains for {} in transaction {} and instant {}", resourceId, tx, asOfTime);
500
501        final String query;
502        if (asOfTime == null) {
503            if (tx.isOpenLongRunning()) {
504                // we are in a transaction
505                parameterSource.addValue("transactionId", tx.getId());
506                query = SELECT_CHILDREN_IN_TRANSACTION;
507            } else {
508                // not in a transaction
509                query = SELECT_CHILDREN;
510            }
511        } else {
512            parameterSource.addValue("asOfTime", formatInstant(asOfTime));
513            query = SELECT_CHILDREN_OF_MEMENTO;
514        }
515
516        return StreamSupport.stream(new ContainmentIterator(query, parameterSource), false);
517    }
518
519    @Override
520    public Stream<String> getContainsDeleted(@Nonnull final Transaction tx, final FedoraId fedoraId) {
521        final String resourceId = fedoraId.getFullId();
522        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
523        parameterSource.addValue("parent", resourceId);
524
525        final String query;
526        if (tx.isOpenLongRunning()) {
527            // we are in a transaction
528            parameterSource.addValue("transactionId", tx.getId());
529            query = SELECT_DELETED_CHILDREN_IN_TRANSACTION;
530        } else {
531            // not in a transaction
532            query = SELECT_DELETED_CHILDREN;
533        }
534        LOGGER.debug("getContainsDeleted for {} in transaction {}", resourceId, tx);
535        return StreamSupport.stream(new ContainmentIterator(query, parameterSource), false);
536    }
537
538    @Override
539    public String getContainedBy(@Nonnull final Transaction tx, final FedoraId resource) {
540        final String resourceID = resource.getFullId();
541        final String parentID;
542        if (tx.isOpenLongRunning()) {
543            parentID = jdbcTemplate.queryForList(PARENT_EXISTS_IN_TRANSACTION, Map.of("child", resourceID,
544                    "transactionId", tx.getId()), String.class).stream().findFirst().orElse(null);
545        } else {
546            parentID = this.getContainedByCache.get(resourceID, key ->
547                    jdbcTemplate.queryForList(PARENT_EXISTS, Map.of("child", key), String.class).stream()
548                    .findFirst().orElse(null)
549            );
550        }
551        return parentID;
552    }
553
554    @Override
555    public void addContainedBy(@Nonnull final Transaction tx, final FedoraId parent, final FedoraId child) {
556        addContainedBy(tx, parent, child, Instant.now(), null);
557    }
558
559    @Override
560    public void addContainedBy(@Nonnull final Transaction tx, final FedoraId parent, final FedoraId child,
561                               final Instant startTime, final Instant endTime) {
562        tx.doInTx(() -> {
563            final String parentID = parent.getFullId();
564            final String childID = child.getFullId();
565
566            if (!tx.isShortLived()) {
567                LOGGER.debug("Adding: parent: {}, child: {}, in txn: {}, start time {}, end time {}", parentID, childID,
568                        tx.getId(), formatInstant(startTime), formatInstant(endTime));
569                doUpsert(tx, parentID, childID, startTime, endTime, "add");
570            } else {
571                LOGGER.debug("Adding: parent: {}, child: {}, start time {}, end time {}", parentID, childID,
572                        formatInstant(startTime), formatInstant(endTime));
573                doDirectUpsert(parentID, childID, startTime, endTime);
574            }
575        });
576    }
577
578    @Override
579    public void removeContainedBy(@Nonnull final Transaction tx, final FedoraId parent, final FedoraId child) {
580        tx.doInTx(() -> {
581            final String parentID = parent.getFullId();
582            final String childID = child.getFullId();
583
584            if (!tx.isShortLived()) {
585                final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
586                parameterSource.addValue("parent", parentID);
587                parameterSource.addValue("child", childID);
588                parameterSource.addValue("transactionId", tx.getId());
589                final boolean addedInTxn = !jdbcTemplate.queryForList(IS_CHILD_ADDED_IN_TRANSACTION, parameterSource)
590                        .isEmpty();
591                if (addedInTxn) {
592                    jdbcTemplate.update(UNDO_INSERT_CHILD_IN_TRANSACTION, parameterSource);
593                } else {
594                    doUpsert(tx, parentID, childID, null, Instant.now(), "delete");
595                }
596            } else {
597                doDirectUpsert(parentID, childID, null, Instant.now());
598                this.getContainedByCache.invalidate(childID);
599            }
600        });
601    }
602
603    @Override
604    public void removeResource(@Nonnull final Transaction tx, final FedoraId resource) {
605        tx.doInTx(() -> {
606            final String resourceID = resource.getFullId();
607
608            if (!tx.isShortLived()) {
609                final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
610                parameterSource.addValue("child", resourceID);
611                parameterSource.addValue("transactionId", tx.getId());
612                final boolean addedInTxn = !jdbcTemplate.queryForList(IS_CHILD_ADDED_IN_TRANSACTION_NO_PARENT,
613                        parameterSource).isEmpty();
614                if (addedInTxn) {
615                    jdbcTemplate.update(UNDO_INSERT_CHILD_IN_TRANSACTION_NO_PARENT, parameterSource);
616                } else {
617                    final String parent = getContainedBy(tx, resource);
618                    if (parent != null) {
619                        LOGGER.debug("Marking containment relationship between parent ({}) and child ({}) deleted",
620                                parent, resourceID);
621                        doUpsert(tx, parent, resourceID, null, Instant.now(), "delete");
622                    }
623                }
624            } else {
625                final String parent = getContainedBy(tx, resource);
626                if (parent != null) {
627                    LOGGER.debug("Marking containment relationship between parent ({}) and child ({}) deleted", parent,
628                            resourceID);
629                    doDirectUpsert(parent, resourceID, null, Instant.now());
630                    this.getContainedByCache.invalidate(resourceID);
631                }
632            }
633        });
634    }
635
636    @Override
637    public void purgeResource(@Nonnull final Transaction tx, final FedoraId resource) {
638        tx.doInTx(() -> {
639            final String resourceID = resource.getFullId();
640
641            final String parent = getContainedByDeleted(tx, resource);
642
643            if (parent != null) {
644                LOGGER.debug("Removing containment relationship between parent ({}) and child ({})",
645                        parent, resourceID);
646
647                if (!tx.isShortLived()) {
648                    doUpsert(tx, parent, resourceID, null, null, "purge");
649                } else {
650                    final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
651                    parameterSource.addValue("child", resourceID);
652                    jdbcTemplate.update(DIRECT_PURGE, parameterSource);
653                }
654            }
655        });
656    }
657
658    /**
659     * Do the Upsert action to the transaction table.
660     * @param tx the transaction
661     * @param parentId the containing resource id
662     * @param resourceId the contained resource id
663     * @param startTime the instant the relationship started, if null get the current time from the main table.
664     * @param endTime the instant the relationship ended or null for none.
665     * @param operation the operation to perform.
666     */
667    private void doUpsert(final Transaction tx, final String parentId, final String resourceId, final Instant startTime,
668                          final Instant endTime, final String operation) {
669        final var parameterSource = new MapSqlParameterSource();
670        parameterSource.addValue("child", resourceId);
671        parameterSource.addValue("transactionId", tx.getId());
672        parameterSource.addValue("parent", parentId);
673        if (startTime == null) {
674            parameterSource.addValue("startTime", formatInstant(getCurrentStartTime(resourceId)));
675        } else {
676            parameterSource.addValue("startTime", formatInstant(startTime));
677        }
678        parameterSource.addValue("endTime", formatInstant(endTime));
679        parameterSource.addValue("operation", operation);
680        jdbcTemplate.update(UPSERT_MAPPING.get(dbPlatform), parameterSource);
681    }
682
683    /**
684     * Do the Upsert directly to the containment index; not the tx table
685     *
686     * @param parentId the containing resource id
687     * @param resourceId the contained resource id
688     * @param startTime the instant the relationship started, if null get the current time from the main table.
689     * @param endTime the instant the relationship ended or null for none.
690     */
691    private void doDirectUpsert(final String parentId, final String resourceId, final Instant startTime,
692                                final Instant endTime) {
693        final var parameterSource = new MapSqlParameterSource();
694        parameterSource.addValue("child", resourceId);
695        parameterSource.addValue("parent", parentId);
696        parameterSource.addValue("endTime", formatInstant(endTime));
697
698        final String query;
699
700        if (startTime == null) {
701            // This the case for an update
702            query = DIRECT_UPDATE_END_TIME;
703        } else {
704            // This is the case for a new record
705            parameterSource.addValue("startTime", formatInstant(startTime));
706            query = DIRECT_INSERT_RECORDS;
707        }
708
709        jdbcTemplate.update(query, parameterSource);
710        updateParentTimestamp(parentId, startTime, endTime);
711        resourceExistsCache.invalidate(resourceId);
712    }
713
714    private void updateParentTimestamp(final String parentId, final Instant startTime, final Instant endTime) {
715        final var parameterSource = new MapSqlParameterSource();
716        final var updated = endTime == null ? startTime : endTime;
717        parameterSource.addValue("resourceId", parentId);
718        parameterSource.addValue("updated", formatInstant(updated));
719        jdbcTemplate.update(CONDITIONALLY_UPDATE_LAST_UPDATED, parameterSource);
720    }
721
722    /**
723     * Find parent for a resource using a deleted containment relationship.
724     * @param tx the transaction.
725     * @param resource the child resource id.
726     * @return the parent id.
727     */
728    private String getContainedByDeleted(final Transaction tx, final FedoraId resource) {
729        final String resourceID = resource.getFullId();
730        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
731        parameterSource.addValue("child", resourceID);
732        final List<String> parentID;
733        if (tx.isOpenLongRunning()) {
734            parameterSource.addValue("transactionId", tx.getId());
735            parentID = jdbcTemplate.queryForList(PARENT_EXISTS_DELETED_IN_TRANSACTION, parameterSource, String.class);
736        } else {
737            parentID = jdbcTemplate.queryForList(PARENT_EXISTS_DELETED, parameterSource, String.class);
738        }
739        return parentID.stream().findFirst().orElse(null);
740    }
741
742    @Override
743    public void commitTransaction(final Transaction tx) {
744        if (!tx.isShortLived()) {
745            tx.ensureCommitting();
746            try {
747                final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
748                parameterSource.addValue("transactionId", tx.getId());
749                final List<String> changedParents = jdbcTemplate.queryForList(GET_UPDATED_RESOURCES, parameterSource,
750                        String.class);
751                final List<String> removedResources = jdbcTemplate.queryForList(GET_DELETED_RESOURCES, parameterSource,
752                        String.class);
753                final List<String> addedResources = jdbcTemplate.queryForList(GET_ADDED_RESOURCES, parameterSource,
754                        String.class);
755                final int purged = jdbcTemplate.update(COMMIT_PURGE_RECORDS, parameterSource);
756                final int deleted = jdbcTemplate.update(COMMIT_DELETE_RECORDS.get(dbPlatform), parameterSource);
757                final int added = jdbcTemplate.update(COMMIT_ADD_RECORDS_MAP.get(dbPlatform), parameterSource);
758                for (final var parent : changedParents) {
759                    final var updated = jdbcTemplate.queryForObject(SELECT_LAST_UPDATED_IN_TX,
760                            Map.of("resourceId", parent, "transactionId", tx.getId()), Timestamp.class);
761                    if (updated != null) {
762                        jdbcTemplate.update(UPDATE_LAST_UPDATED,
763                                Map.of("resourceId", parent, "updated", updated));
764                    }
765                }
766                jdbcTemplate.update(DELETE_ENTIRE_TRANSACTION, parameterSource);
767                this.getContainedByCache.invalidateAll(removedResources);
768                // Add inserted records to removed records list.
769                removedResources.addAll(addedResources);
770                this.resourceExistsCache.invalidateAll(removedResources);
771                LOGGER.debug("Commit of tx {} complete with {} adds, {} deletes and {} purges",
772                        tx.getId(), added, deleted, purged);
773            } catch (final Exception e) {
774                LOGGER.warn("Unable to commit containment index transaction {}: {}", tx, e.getMessage());
775                throw new RepositoryRuntimeException("Unable to commit containment index transaction", e);
776            }
777        }
778    }
779
780    @Transactional(propagation = Propagation.NOT_SUPPORTED)
781    @Override
782    public void rollbackTransaction(final Transaction tx) {
783        if (!tx.isShortLived()) {
784            final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
785            parameterSource.addValue("transactionId", tx.getId());
786            jdbcTemplate.update(DELETE_ENTIRE_TRANSACTION, parameterSource);
787        }
788    }
789
790    @Override
791    public boolean resourceExists(@Nonnull final Transaction tx, final FedoraId fedoraId,
792                                  final boolean includeDeleted) {
793        // Get the containing ID because fcr:metadata will not exist here but MUST exist if the containing resource does
794        final String resourceId = fedoraId.getBaseId();
795        LOGGER.debug("Checking if {} exists in transaction {}", resourceId, tx);
796        if (fedoraId.isRepositoryRoot()) {
797            // Root always exists.
798            return true;
799        }
800        if (tx.isOpenLongRunning()) {
801            final var queryToUse = includeDeleted ? RESOURCE_OR_TOMBSTONE_EXISTS_IN_TRANSACTION :
802                    RESOURCE_EXISTS_IN_TRANSACTION;
803            return !jdbcTemplate.queryForList(queryToUse,
804                    Map.of("child", resourceId, "transactionId", tx.getId()), String.class).isEmpty();
805        } else if (includeDeleted) {
806            final Boolean exists = resourceExistsCache.getIfPresent(resourceId);
807            if (exists != null && exists) {
808                // Only return true, false values might change once deleted resources are included.
809                return true;
810            }
811            return !jdbcTemplate.queryForList(RESOURCE_OR_TOMBSTONE_EXISTS,
812                    Map.of("child", resourceId), String.class).isEmpty();
813        } else {
814            return resourceExistsCache.get(resourceId, key -> !jdbcTemplate.queryForList(RESOURCE_EXISTS,
815                        Map.of("child", resourceId), String.class).isEmpty()
816            );
817        }
818    }
819
820    @Override
821    public FedoraId getContainerIdByPath(final Transaction tx, final FedoraId fedoraId, final boolean checkDeleted) {
822        if (fedoraId.isRepositoryRoot()) {
823            // If we are root then we are the top.
824            return fedoraId;
825        }
826        final String parent = getContainedBy(tx, fedoraId);
827        if (parent != null) {
828            return FedoraId.create(parent);
829        }
830        String fullId = fedoraId.getFullId();
831        while (fullId.contains("/")) {
832            fullId = fedoraId.getResourceId().substring(0, fullId.lastIndexOf("/"));
833            if (fullId.equals(FEDORA_ID_PREFIX)) {
834                return FedoraId.getRepositoryRootId();
835            }
836            final FedoraId testID = FedoraId.create(fullId);
837            if (resourceExists(tx, testID, checkDeleted)) {
838                return testID;
839            }
840        }
841        return FedoraId.getRepositoryRootId();
842    }
843
844    @Override
845    public void reset() {
846        try {
847            jdbcTemplate.update(TRUNCATE_TABLE + RESOURCES_TABLE, Collections.emptyMap());
848            jdbcTemplate.update(TRUNCATE_TABLE + TRANSACTION_OPERATIONS_TABLE, Collections.emptyMap());
849            this.getContainedByCache.invalidateAll();
850        } catch (final Exception e) {
851            throw new RepositoryRuntimeException("Failed to truncate containment tables", e);
852        }
853    }
854
855    @Override
856    public boolean hasResourcesStartingWith(final Transaction tx, final FedoraId fedoraId) {
857        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
858        parameterSource.addValue("resourceId", fedoraId.getFullId() + "/%");
859        final boolean matchingIds;
860        if (tx.isOpenLongRunning()) {
861            parameterSource.addValue("transactionId", tx.getId());
862            matchingIds = !jdbcTemplate.queryForList(SELECT_ID_LIKE_IN_TRANSACTION, parameterSource, String.class)
863                .isEmpty();
864        } else {
865            matchingIds = !jdbcTemplate.queryForList(SELECT_ID_LIKE, parameterSource, String.class).isEmpty();
866        }
867        return matchingIds;
868    }
869
870    @Override
871    public Instant containmentLastUpdated(final Transaction tx, final FedoraId fedoraId) {
872        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
873        parameterSource.addValue("resourceId", fedoraId.getFullId());
874        final String queryToUse;
875        if (tx.isOpenLongRunning()) {
876            parameterSource.addValue("transactionId", tx.getId());
877            queryToUse = SELECT_LAST_UPDATED_IN_TX;
878        } else {
879            queryToUse = SELECT_LAST_UPDATED;
880        }
881        try {
882            return fromTimestamp(jdbcTemplate.queryForObject(queryToUse, parameterSource, Timestamp.class));
883        } catch (final EmptyResultDataAccessException e) {
884            return null;
885        }
886    }
887
888    /**
889     * Get the data source backing this containment index
890     * @return data source
891     */
892    public DataSource getDataSource() {
893        return dataSource;
894    }
895
896    /**
897     * Set the data source backing this containment index
898     * @param dataSource data source
899     */
900    public void setDataSource(final DataSource dataSource) {
901        this.dataSource = dataSource;
902    }
903
904    /**
905     * Get the current startTime for the resource
906     * @param resourceId id of the resource
907     * @return start time or null if no committed record.
908     */
909    private Instant getCurrentStartTime(final String resourceId) {
910        return fromTimestamp(jdbcTemplate.queryForObject(GET_START_TIME, Map.of(
911                "child", resourceId
912        ), Timestamp.class));
913    }
914
915    private Instant fromTimestamp(final Timestamp timestamp) {
916        if (timestamp != null) {
917            return timestamp.toInstant();
918        }
919        return null;
920    }
921
922    /**
923     * Format an instant to a timestamp without milliseconds, due to precision
924     * issues with memento datetimes.
925     * @param instant the instant to format.
926     * @return the datetime timestamp
927     */
928    private Timestamp formatInstant(final Instant instant) {
929        if (instant == null) {
930            return null;
931        }
932        return Timestamp.from(instant.truncatedTo(ChronoUnit.SECONDS));
933    }
934
935    /**
936     * Private class to back a stream with a paged DB query.
937     *
938     * If this needs to be run in parallel we will have to override trySplit() and determine a good method to split on.
939     */
940    private class ContainmentIterator extends Spliterators.AbstractSpliterator<String> {
941        final Queue<String> children = new ConcurrentLinkedQueue<>();
942        int numOffsets = 0;
943        final String queryToUse;
944        final MapSqlParameterSource parameterSource;
945
946        public ContainmentIterator(final String query, final MapSqlParameterSource parameters) {
947            super(Long.MAX_VALUE, Spliterator.ORDERED);
948            queryToUse = query;
949            parameterSource = parameters;
950            parameterSource.addValue("containsLimit", containsLimit);
951        }
952
953        @Override
954        public boolean tryAdvance(final Consumer<? super String> action) {
955            try {
956                action.accept(children.remove());
957            } catch (final NoSuchElementException e) {
958                parameterSource.addValue("offSet", numOffsets * containsLimit);
959                numOffsets += 1;
960                children.addAll(jdbcTemplate.queryForList(queryToUse, parameterSource, String.class));
961                if (children.size() == 0) {
962                    // no more elements.
963                    return false;
964                }
965                action.accept(children.remove());
966            }
967            return true;
968        }
969    }
970}