001/*
002 * The contents of this file are subject to the license and copyright
003 * detailed in the LICENSE and NOTICE files at the root of the source
004 * tree.
005 */
006package org.fcrepo.kernel.impl.services;
007
008import static org.slf4j.LoggerFactory.getLogger;
009
010import java.sql.ResultSet;
011import java.sql.SQLException;
012import java.sql.Timestamp;
013import java.time.Instant;
014import java.util.Map;
015import java.util.NoSuchElementException;
016import java.util.Queue;
017import java.util.Spliterator;
018import java.util.Spliterators;
019import java.util.concurrent.ConcurrentLinkedQueue;
020import java.util.function.Consumer;
021import java.util.stream.Stream;
022import java.util.stream.StreamSupport;
023
024import javax.annotation.PostConstruct;
025import javax.inject.Inject;
026import javax.sql.DataSource;
027
028import org.fcrepo.common.db.DbPlatform;
029import org.fcrepo.kernel.api.Transaction;
030import org.fcrepo.kernel.api.identifiers.FedoraId;
031
032import org.apache.jena.graph.Node;
033import org.apache.jena.graph.NodeFactory;
034import org.apache.jena.graph.Triple;
035import org.slf4j.Logger;
036import org.springframework.jdbc.core.RowCallbackHandler;
037import org.springframework.jdbc.core.RowMapper;
038import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
039import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
040import org.springframework.stereotype.Component;
041import org.springframework.transaction.annotation.Propagation;
042import org.springframework.transaction.annotation.Transactional;
043
044/**
045 * Manager for the membership index
046 *
047 * @author bbpennel
048 */
049@Component
050public class MembershipIndexManager {
051    private static final Logger log = getLogger(MembershipIndexManager.class);
052
053    private static final Timestamp NO_END_TIMESTAMP = Timestamp.from(MembershipServiceImpl.NO_END_INSTANT);
054    private static final Timestamp NO_START_TIMESTAMP = Timestamp.from(Instant.parse("1000-01-01T00:00:00.000Z"));
055
056    private static final String ADD_OPERATION = "add";
057    private static final String DELETE_OPERATION = "delete";
058    private static final String FORCE_FLAG = "force";
059
060    private static final String TX_ID_PARAM = "txId";
061    private static final String SUBJECT_ID_PARAM = "subjectId";
062    private static final String NO_END_TIME_PARAM = "noEndTime";
063    private static final String ADD_OP_PARAM = "addOp";
064    private static final String DELETE_OP_PARAM = "deleteOp";
065    private static final String MEMENTO_TIME_PARAM = "mementoTime";
066    private static final String PROPERTY_PARAM = "property";
067    private static final String TARGET_ID_PARAM = "targetId";
068    private static final String SOURCE_ID_PARAM = "sourceId";
069    private static final String PROXY_ID_PARAM = "proxyId";
070    private static final String START_TIME_PARAM = "startTime";
071    private static final String END_TIME_PARAM = "endTime";
072    private static final String LAST_UPDATED_PARAM = "lastUpdated";
073    private static final String OPERATION_PARAM = "operation";
074    private static final String FORCE_PARAM = "forceFlag";
075    private static final String LIMIT_PARAM = "limit";
076    private static final String OFFSET_PARAM = "offSet";
077
078    private static final String SELECT_ALL_MEMBERSHIP = "SELECT * FROM membership";
079
080    private static final String SELECT_ALL_OPERATIONS = "SELECT * FROM membership_tx_operations";
081
082    private static final String SELECT_MEMBERSHIP_IN_TX =
083            "SELECT property, object_id" +
084            " FROM membership m" +
085            " WHERE subject_id = :subjectId" +
086                " AND end_time = :noEndTime" +
087                " AND NOT EXISTS (" +
088                    " SELECT 1" +
089                    " FROM membership_tx_operations mto" +
090                    " WHERE mto.subject_id = :subjectId" +
091                        " AND mto.source_id = m.source_id" +
092                        " AND mto.object_id = m.object_id" +
093                        " AND mto.tx_id = :txId" +
094                        " AND mto.operation = :deleteOp)" +
095            " UNION" +
096            " SELECT property, object_id" +
097            " FROM membership_tx_operations" +
098            " WHERE subject_id = :subjectId" +
099                " AND tx_id = :txId" +
100                " AND end_time = :noEndTime" +
101                " AND operation = :addOp" +
102            " ORDER BY property, object_id" +
103            " LIMIT :limit OFFSET :offSet";
104
105    private static final String DIRECT_SELECT_MEMBERSHIP =
106            "SELECT property, object_id" +
107            " FROM membership" +
108            " WHERE subject_id = :subjectId" +
109                " AND end_time = :noEndTime" +
110            " ORDER BY property, object_id" +
111            " LIMIT :limit OFFSET :offSet";
112
113    private static final String SELECT_MEMBERSHIP_MEMENTO_IN_TX =
114            "SELECT property, object_id" +
115            " FROM membership m" +
116            " WHERE m.subject_id = :subjectId" +
117                " AND m.start_time <= :mementoTime" +
118                " AND m.end_time > :mementoTime" +
119                " AND NOT EXISTS (" +
120                    " SELECT 1" +
121                    " FROM membership_tx_operations mto" +
122                    " WHERE mto.subject_id = :subjectId" +
123                        " AND mto.source_id = m.source_id" +
124                        " AND mto.property = m.property" +
125                        " AND mto.object_id = m.object_id" +
126                        " AND mto.end_time <= :mementoTime" +
127                        " AND mto.tx_id = :txId" +
128                        " AND mto.operation = :deleteOp)" +
129            " UNION" +
130            " SELECT property, object_id" +
131            " FROM membership_tx_operations" +
132            " WHERE subject_id = :subjectId" +
133                " AND tx_id = :txId" +
134                " AND start_time <= :mementoTime" +
135                " AND end_time > :mementoTime" +
136                " AND operation = :addOp" +
137            " ORDER BY property, object_id" +
138            " LIMIT :limit OFFSET :offSet";
139
140    private static final String DIRECT_SELECT_MEMBERSHIP_MEMENTO =
141            "SELECT property, object_id" +
142            " FROM membership" +
143            " WHERE subject_id = :subjectId" +
144                " AND start_time <= :mementoTime" +
145                " AND end_time > :mementoTime" +
146            " ORDER BY property, object_id" +
147            " LIMIT :limit OFFSET :offSet";
148
149    private static final String SELECT_LAST_UPDATED =
150            "SELECT max(last_updated) as last_updated" +
151            " FROM membership" +
152            " WHERE subject_id = :subjectId";
153
154    // For mementos, use the start_time instead of last_updated as the
155    // end_time reflects when the next version starts
156    private static final String SELECT_LAST_UPDATED_MEMENTO =
157            "SELECT max(start_time)" +
158            " FROM membership" +
159            " WHERE subject_id = :subjectId" +
160                " AND start_time <= :mementoTime" +
161                " AND end_time > :mementoTime";
162
163    private static final String SELECT_LAST_UPDATED_IN_TX =
164            "SELECT max(combined.updated) as last_updated" +
165            " FROM (" +
166                " SELECT max(last_updated) as updated" +
167                " FROM membership m" +
168                " WHERE subject_id = :subjectId" +
169                    " AND NOT EXISTS (" +
170                        " SELECT 1" +
171                        " FROM membership_tx_operations mto" +
172                        " WHERE mto.subject_id = :subjectId" +
173                            " AND mto.source_id = m.source_id" +
174                            " AND mto.object_id = m.object_id" +
175                            " AND mto.tx_id = :txId" +
176                            " AND mto.operation = :deleteOp)" +
177                " UNION" +
178                " SELECT max(last_updated) as updated" +
179                " FROM membership_tx_operations" +
180                " WHERE subject_id = :subjectId" +
181                    " AND tx_id = :txId" +
182            ") combined";
183
184    private static final String INSERT_MEMBERSHIP_IN_TX =
185            "INSERT INTO membership_tx_operations (subject_id, property, object_id, source_id," +
186                    " proxy_id, start_time, end_time, last_updated, tx_id, operation)" +
187            " VALUES (:subjectId, :property, :targetId, :sourceId," +
188                    " :proxyId, :startTime, :endTime, :lastUpdated, :txId, :operation)";
189
190    private static final String DIRECT_INSERT_MEMBERSHIP =
191            "INSERT INTO membership (subject_id, property, object_id, source_id," +
192                    " proxy_id, start_time, end_time, last_updated)" +
193            " VALUES (:subjectId, :property, :targetId, :sourceId," +
194                    " :proxyId, :startTime, :endTime, :lastUpdated)";
195
196    private static final String END_EXISTING_MEMBERSHIP =
197            "INSERT INTO membership_tx_operations (subject_id, property, object_id, source_id," +
198                    " proxy_id, start_time, end_time, last_updated, tx_id, operation)" +
199            " SELECT m.subject_id, m.property, m.object_id, m.source_id, m.proxy_id, m.start_time," +
200                    " :endTime, :endTime, :txId, :deleteOp" +
201            " FROM membership m" +
202            " WHERE m.source_id = :sourceId" +
203                " AND m.proxy_id = :proxyId" +
204                " AND m.end_time = :noEndTime";
205
206    private static final String DIRECT_END_EXISTING_MEMBERSHIP =
207            "UPDATE membership SET end_time = :endTime, last_updated = :endTime" +
208            " WHERE source_id = :sourceId" +
209                " AND proxy_id = :proxyId" +
210                " AND end_time = :noEndTime";
211
212    private static final String CLEAR_FOR_PROXY_IN_TX =
213            "DELETE FROM membership_tx_operations" +
214            " WHERE source_id = :sourceId" +
215                " AND tx_id = :txId" +
216                " AND proxy_id = :proxyId" +
217                " AND force_flag IS NULL";
218
219    private static final String CLEAR_ALL_ADDED_FOR_SOURCE_IN_TX =
220            "DELETE FROM membership_tx_operations" +
221            " WHERE source_id = :sourceId" +
222                " AND tx_id = :txId" +
223                " AND operation = :addOp";
224
225    // Add "delete" entries for all existing membership from the given source, if not already deleted
226    private static final String END_EXISTING_FOR_SOURCE =
227            "INSERT INTO membership_tx_operations (subject_id, property, object_id, source_id," +
228                    " proxy_id, start_time, end_time, last_updated, tx_id, operation)" +
229            " SELECT subject_id, property, object_id, source_id," +
230                    " proxy_id, start_time, :endTime, :endTime, :txId, :deleteOp" +
231            " FROM membership m" +
232            " WHERE source_id = :sourceId" +
233                " AND end_time = :noEndTime" +
234                " AND NOT EXISTS (" +
235                    " SELECT TRUE" +
236                    " FROM membership_tx_operations mtx" +
237                    " WHERE mtx.subject_id = m.subject_id" +
238                        " AND mtx.property = m.property" +
239                        " AND mtx.object_id = m.object_id" +
240                        " AND mtx.source_id = m.source_id" +
241                        " AND mtx.proxy_id = m.proxy_id" +
242                        " AND mtx.operation = :deleteOp" +
243                    ")";
244
245    private static final String DIRECT_END_EXISTING_FOR_SOURCE =
246            "UPDATE membership SET end_time = :endTime, last_updated = :endTime" +
247                    " WHERE source_id = :sourceId" +
248                    " AND end_time = :noEndTime";
249
250    private static final String DELETE_EXISTING_FOR_SOURCE_AFTER =
251            "INSERT INTO membership_tx_operations(subject_id, property, object_id, source_id," +
252                    " proxy_id, start_time, end_time, last_updated, tx_id, operation, force_flag)" +
253            " SELECT subject_id, property, object_id, source_id, proxy_id, start_time, end_time," +
254                    " last_updated, :txId, :deleteOp, :forceFlag" +
255            " FROM membership m" +
256            " WHERE m.source_id = :sourceId" +
257                " AND (m.start_time >= :startTime" +
258                " OR m.end_time >= :startTime)";
259
260    private static final String DIRECT_DELETE_EXISTING_FOR_SOURCE_AFTER =
261            "DELETE FROM membership" +
262                    " WHERE source_id = :sourceId" +
263                    " AND (start_time >= :startTime" +
264                    " OR end_time >= :startTime)";
265
266    private static final String DELETE_EXISTING_FOR_PROXY_AFTER =
267            "INSERT INTO membership_tx_operations(subject_id, property, object_id, source_id," +
268                    " proxy_id, start_time, end_time, last_updated, tx_id, operation, force_flag)" +
269            " SELECT subject_id, property, object_id, source_id, proxy_id, start_time, end_time," +
270                    " last_updated, :txId, :deleteOp, :forceFlag" +
271            " FROM membership m" +
272            " WHERE m.proxy_id = :proxyId" +
273                " AND (m.start_time >= :startTime" +
274                " OR m.end_time >= :startTime)";
275
276    private static final String DIRECT_DELETE_EXISTING_FOR_PROXY_AFTER =
277            "UPDATE membership SET end_time = :endTime, last_updated = :endTime" +
278            " WHERE proxy_id = :proxyId" +
279                " AND (start_time >= :endTime" +
280                " OR end_time >= :endTime)";
281
282    private static final String PURGE_ALL_REFERENCES_MEMBERSHIP =
283            "DELETE from membership" +
284            " where source_id = :targetId" +
285                " OR subject_id = :targetId" +
286                " OR object_id = :targetId";
287
288    private static final String PURGE_ALL_REFERENCES_TRANSACTION =
289            "DELETE from membership_tx_operations" +
290            " WHERE tx_id = :txId" +
291                " AND (source_id = :targetId" +
292                " OR subject_id = :targetId" +
293                " OR object_id = :targetId)";
294
295    private static final String COMMIT_DELETES =
296            "DELETE from membership" +
297            " WHERE EXISTS (" +
298                " SELECT TRUE" +
299                " FROM membership_tx_operations mto" +
300                " WHERE mto.tx_id = :txId" +
301                    " AND mto.operation = :deleteOp" +
302                    " AND mto.force_flag = :forceFlag" +
303                    " AND membership.source_id = mto.source_id" +
304                    " AND membership.proxy_id = mto.proxy_id" +
305                    " AND membership.subject_id = mto.subject_id" +
306                    " AND membership.property = mto.property" +
307                    " AND membership.object_id = mto.object_id" +
308                " )";
309
310    private static final String COMMIT_ENDS_H2 =
311            "UPDATE membership m" +
312            " SET end_time = (" +
313                " SELECT mto.end_time" +
314                " FROM membership_tx_operations mto" +
315                " WHERE mto.tx_id = :txId" +
316                    " AND m.source_id = mto.source_id" +
317                    " AND m.proxy_id = mto.proxy_id" +
318                    " AND m.subject_id = mto.subject_id" +
319                    " AND m.property = mto.property" +
320                    " AND m.object_id = mto.object_id" +
321                    " AND mto.operation = :deleteOp" +
322                " )," +
323                " last_updated = (" +
324                    " SELECT mto.end_time" +
325                    " FROM membership_tx_operations mto" +
326                    " WHERE mto.tx_id = :txId" +
327                        " AND m.source_id = mto.source_id" +
328                        " AND m.proxy_id = mto.proxy_id" +
329                        " AND m.subject_id = mto.subject_id" +
330                        " AND m.property = mto.property" +
331                        " AND m.object_id = mto.object_id" +
332                        " AND mto.operation = :deleteOp" +
333                    " )" +
334            " WHERE EXISTS (" +
335                "SELECT TRUE" +
336                " FROM membership_tx_operations mto" +
337                " WHERE mto.tx_id = :txId" +
338                    " AND mto.operation = :deleteOp" +
339                    " AND m.source_id = mto.source_id" +
340                    " AND m.proxy_id = mto.proxy_id" +
341                    " AND m.subject_id = mto.subject_id" +
342                    " AND m.property = mto.property" +
343                    " AND m.object_id = mto.object_id" +
344                " )";
345
346    private static final String COMMIT_ENDS_POSTGRES =
347            "UPDATE membership" +
348            " SET end_time = mto.end_time, last_updated = mto.end_time" +
349            " FROM membership_tx_operations mto" +
350            " WHERE mto.tx_id = :txId" +
351                " AND mto.operation = :deleteOp" +
352                " AND membership.source_id = mto.source_id" +
353                " AND membership.proxy_id = mto.proxy_id" +
354                " AND membership.subject_id = mto.subject_id" +
355                " AND membership.property = mto.property" +
356                " AND membership.object_id = mto.object_id";
357
358    private static final String COMMIT_ENDS_MYSQL =
359            "UPDATE membership m" +
360            " INNER JOIN membership_tx_operations mto ON" +
361                " m.source_id = mto.source_id" +
362                " AND m.proxy_id = mto.proxy_id" +
363                " AND m.subject_id = mto.subject_id" +
364                " AND m.property = mto.property" +
365                " AND m.object_id = mto.object_id" +
366            " SET m.end_time = mto.end_time, m.last_updated = mto.end_time" +
367            " WHERE mto.tx_id = :txId" +
368                " AND mto.operation = :deleteOp";
369
370    private static final Map<DbPlatform, String> COMMIT_ENDS_MAP = Map.of(
371            DbPlatform.MYSQL, COMMIT_ENDS_MYSQL,
372            DbPlatform.MARIADB, COMMIT_ENDS_MYSQL,
373            DbPlatform.POSTGRESQL, COMMIT_ENDS_POSTGRES,
374            DbPlatform.H2, COMMIT_ENDS_H2
375    );
376
377    // Transfer all "add" operations from tx to committed membership, unless the entry already exists
378    private static final String COMMIT_ADDS =
379            "INSERT INTO membership" +
380            " (subject_id, property, object_id, source_id, proxy_id, start_time, end_time, last_updated)" +
381            " SELECT subject_id, property, object_id, source_id, proxy_id, start_time, end_time, last_updated" +
382            " FROM membership_tx_operations mto" +
383            " WHERE mto.tx_id = :txId" +
384                " AND mto.operation = :addOp" +
385                " AND NOT EXISTS (" +
386                    " SELECT TRUE" +
387                    " FROM membership m" +
388                    " WHERE m.source_id = mto.source_id" +
389                        " AND m.proxy_id = mto.proxy_id" +
390                        " AND m.subject_id = mto.subject_id" +
391                        " AND m.property = mto.property" +
392                        " AND m.object_id = mto.object_id" +
393                        " AND m.start_time = mto.start_time" +
394                        " AND m.end_time = mto.end_time" +
395                " )";
396
397    private static final String DELETE_TRANSACTION =
398            "DELETE FROM membership_tx_operations" +
399            " WHERE tx_id = :txId";
400
401    private static final String TRUNCATE_MEMBERSHIP = "TRUNCATE TABLE membership";
402
403    private static final String TRUNCATE_MEMBERSHIP_TX = "TRUNCATE TABLE membership_tx_operations";
404
405    @Inject
406    private DataSource dataSource;
407
408    private NamedParameterJdbcTemplate jdbcTemplate;
409
410    private DbPlatform dbPlatform;
411
412    private static final int MEMBERSHIP_LIMIT = 50000;
413
414    @PostConstruct
415    public void setUp() {
416        jdbcTemplate = new NamedParameterJdbcTemplate(getDataSource());
417        dbPlatform = DbPlatform.fromDataSource(dataSource);
418    }
419
420    /**
421     * End a membership from the child of a Direct/IndirectContainer, setting an end time if committed,
422     * or clearing from the current tx if it was newly added.
423     *
424     * @param tx transaction
425     * @param sourceId ID of the direct/indirect container whose membership should be ended
426     * @param proxyId ID of the proxy producing this membership, when applicable
427     * @param endTime the time the resource was deleted, generally its last modified
428     */
429    public void endMembershipFromChild(final Transaction tx, final FedoraId sourceId, final FedoraId proxyId,
430            final Instant endTime) {
431        tx.doInTx(() -> {
432            if (!tx.isShortLived()) {
433                final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
434                parameterSource.addValue(TX_ID_PARAM, tx.getId());
435                parameterSource.addValue(SOURCE_ID_PARAM, sourceId.getFullId());
436                parameterSource.addValue(PROXY_ID_PARAM, proxyId.getFullId());
437
438                final int affected = jdbcTemplate.update(CLEAR_FOR_PROXY_IN_TX, parameterSource);
439
440                // If no rows were deleted, then assume we need to delete permanent entry
441                if (affected == 0) {
442                    final MapSqlParameterSource parameterSource2 = new MapSqlParameterSource();
443                    parameterSource2.addValue(TX_ID_PARAM, tx.getId());
444                    parameterSource2.addValue(SOURCE_ID_PARAM, sourceId.getFullId());
445                    parameterSource2.addValue(PROXY_ID_PARAM, proxyId.getFullId());
446                    parameterSource2.addValue(END_TIME_PARAM, formatInstant(endTime));
447                    parameterSource2.addValue(NO_END_TIME_PARAM, NO_END_TIMESTAMP);
448                    parameterSource2.addValue(DELETE_OP_PARAM, DELETE_OPERATION);
449                    jdbcTemplate.update(END_EXISTING_MEMBERSHIP, parameterSource2);
450                }
451            } else {
452                final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
453                parameterSource.addValue(SOURCE_ID_PARAM, sourceId.getFullId());
454                parameterSource.addValue(PROXY_ID_PARAM, proxyId.getFullId());
455                parameterSource.addValue(END_TIME_PARAM, formatInstant(endTime));
456                parameterSource.addValue(NO_END_TIME_PARAM, NO_END_TIMESTAMP);
457                jdbcTemplate.update(DIRECT_END_EXISTING_MEMBERSHIP, parameterSource);
458            }
459        });
460    }
461
462    public void deleteMembershipForProxyAfter(final Transaction tx,
463                                              final FedoraId sourceId,
464                                              final FedoraId proxyId,
465                                              final Instant afterTime) {
466        tx.doInTx(() -> {
467            final var afterTimestamp = afterTime == null ? NO_START_TIMESTAMP : formatInstant(afterTime);
468
469            if (!tx.isShortLived()) {
470                // Clear all membership added in this transaction
471                final var parameterSource = Map.of(
472                        TX_ID_PARAM, tx.getId(),
473                        SOURCE_ID_PARAM, sourceId.getFullId(),
474                        PROXY_ID_PARAM, proxyId.getFullId(),
475                        OPERATION_PARAM, ADD_OPERATION);
476
477                jdbcTemplate.update(CLEAR_FOR_PROXY_IN_TX, parameterSource);
478
479                // Delete all existing membership entries that start after or end after the given timestamp
480                final Map<String, Object> parameterSource2 = Map.of(
481                        TX_ID_PARAM, tx.getId(),
482                        PROXY_ID_PARAM, proxyId.getFullId(),
483                        START_TIME_PARAM, afterTimestamp,
484                        FORCE_PARAM, FORCE_FLAG,
485                        DELETE_OP_PARAM, DELETE_OPERATION);
486                jdbcTemplate.update(DELETE_EXISTING_FOR_PROXY_AFTER, parameterSource2);
487            } else {
488                final Map<String, Object> parameterSource = Map.of(
489                        PROXY_ID_PARAM, proxyId.getFullId(),
490                        END_TIME_PARAM, afterTimestamp);
491                jdbcTemplate.update(DIRECT_DELETE_EXISTING_FOR_PROXY_AFTER, parameterSource);
492            }
493        });
494    }
495
496    /**
497     * End all membership properties resulting from the specified source container
498     * @param tx transaction
499     * @param sourceId ID of the direct/indirect container whose membership should be ended
500     * @param endTime the time the resource was deleted, generally its last modified
501     */
502    public void endMembershipForSource(final Transaction tx, final FedoraId sourceId, final Instant endTime) {
503        tx.doInTx(() -> {
504            if (!tx.isShortLived()) {
505                final Map<String, Object> parameterSource = Map.of(
506                        TX_ID_PARAM, tx.getId(),
507                        SOURCE_ID_PARAM, sourceId.getFullId(),
508                        ADD_OP_PARAM, ADD_OPERATION);
509
510                jdbcTemplate.update(CLEAR_ALL_ADDED_FOR_SOURCE_IN_TX, parameterSource);
511
512                final Map<String, Object> parameterSource2 = Map.of(
513                        TX_ID_PARAM, tx.getId(),
514                        SOURCE_ID_PARAM, sourceId.getFullId(),
515                        END_TIME_PARAM, formatInstant(endTime),
516                        NO_END_TIME_PARAM, NO_END_TIMESTAMP,
517                        DELETE_OP_PARAM, DELETE_OPERATION);
518                jdbcTemplate.update(END_EXISTING_FOR_SOURCE, parameterSource2);
519            } else {
520                final Map<String, Object> parameterSource = Map.of(
521                        SOURCE_ID_PARAM, sourceId.getFullId(),
522                        END_TIME_PARAM, formatInstant(endTime),
523                        NO_END_TIME_PARAM, NO_END_TIMESTAMP);
524                jdbcTemplate.update(DIRECT_END_EXISTING_FOR_SOURCE, parameterSource);
525            }
526        });
527    }
528
529    /**
530     * Delete membership entries that are active at or after the given timestamp for the specified source
531     * @param tx transaction
532     * @param sourceId ID of the direct/indirect container
533     * @param afterTime time at or after which membership should be removed
534     */
535    public void deleteMembershipForSourceAfter(final Transaction tx, final FedoraId sourceId, final Instant afterTime) {
536        tx.doInTx(() -> {
537            final var afterTimestamp = afterTime == null ? NO_START_TIMESTAMP : formatInstant(afterTime);
538
539            if (!tx.isShortLived()) {
540                // Clear all membership added in this transaction
541                final Map<String, Object> parameterSource = Map.of(
542                        TX_ID_PARAM, tx.getId(),
543                        SOURCE_ID_PARAM, sourceId.getFullId(),
544                        ADD_OP_PARAM, ADD_OPERATION);
545
546                jdbcTemplate.update(CLEAR_ALL_ADDED_FOR_SOURCE_IN_TX, parameterSource);
547
548                // Delete all existing membership entries that start after or end after the given timestamp
549                final Map<String, Object> parameterSource2 = Map.of(
550                        TX_ID_PARAM, tx.getId(),
551                        SOURCE_ID_PARAM, sourceId.getFullId(),
552                        START_TIME_PARAM, afterTimestamp,
553                        FORCE_PARAM, FORCE_FLAG,
554                        DELETE_OP_PARAM, DELETE_OPERATION);
555                jdbcTemplate.update(DELETE_EXISTING_FOR_SOURCE_AFTER, parameterSource2);
556            } else {
557                final Map<String, Object> parameterSource = Map.of(
558                        SOURCE_ID_PARAM, sourceId.getFullId(),
559                        START_TIME_PARAM, afterTimestamp);
560                jdbcTemplate.update(DIRECT_DELETE_EXISTING_FOR_SOURCE_AFTER, parameterSource);
561            }
562        });
563    }
564
565    /**
566     * Clean up any references to the target id, in transactions and outside
567     * @param txId transaction id
568     * @param targetId identifier of the resource to cleanup membership references for
569     */
570    public void deleteMembershipReferences(final String txId, final FedoraId targetId) {
571        final Map<String, Object> parameterSource = Map.of(
572                TARGET_ID_PARAM, targetId.getFullId(),
573                TX_ID_PARAM, txId);
574
575        jdbcTemplate.update(PURGE_ALL_REFERENCES_TRANSACTION, parameterSource);
576        jdbcTemplate.update(PURGE_ALL_REFERENCES_MEMBERSHIP, parameterSource);
577    }
578
579    /**
580     * Add new membership property to the index, clearing any delete
581     * operations for the property if necessary.
582     * @param tx transaction
583     * @param sourceId ID of the direct/indirect container which produced the membership
584     * @param proxyId ID of the proxy producing this membership, when applicable
585     * @param membership membership triple
586     * @param startTime time the membership triple was added
587     */
588    public void addMembership(final Transaction tx, final FedoraId sourceId, final FedoraId proxyId,
589            final Triple membership, final Instant startTime) {
590        if (membership == null) {
591            return;
592        }
593        addMembership(tx, sourceId, proxyId, membership, startTime, null);
594    }
595
596    /**
597     * Add new membership property to the index
598     * @param tx transaction
599     * @param sourceId ID of the direct/indirect container which produced the membership
600     * @param proxyId ID of the proxy producing this membership, when applicable
601     * @param membership membership triple
602     * @param startTime time the membership triple was added
603     * @param endTime time the membership triple ends, or never if not provided
604     */
605    public void addMembership(final Transaction tx, final FedoraId sourceId, final FedoraId proxyId,
606            final Triple membership, final Instant startTime, final Instant endTime) {
607        tx.doInTx(() -> {
608            final Timestamp endTimestamp;
609            final Timestamp lastUpdated;
610            final Timestamp startTimestamp = formatInstant(startTime);
611            if (endTime == null) {
612                endTimestamp = NO_END_TIMESTAMP;
613                lastUpdated = startTimestamp;
614            } else {
615                endTimestamp = formatInstant(endTime);
616                lastUpdated = endTimestamp;
617            }
618            // Add the new membership operation
619            final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
620            parameterSource.addValue(SUBJECT_ID_PARAM, membership.getSubject().getURI());
621            parameterSource.addValue(PROPERTY_PARAM, membership.getPredicate().getURI());
622            parameterSource.addValue(TARGET_ID_PARAM, membership.getObject().getURI());
623            parameterSource.addValue(SOURCE_ID_PARAM, sourceId.getFullId());
624            parameterSource.addValue(PROXY_ID_PARAM, proxyId.getFullId());
625            parameterSource.addValue(START_TIME_PARAM, startTimestamp);
626            parameterSource.addValue(END_TIME_PARAM, endTimestamp);
627            parameterSource.addValue(LAST_UPDATED_PARAM, lastUpdated);
628
629            if (!tx.isShortLived()) {
630                parameterSource.addValue(TX_ID_PARAM, tx.getId());
631                parameterSource.addValue(OPERATION_PARAM, ADD_OPERATION);
632                jdbcTemplate.update(INSERT_MEMBERSHIP_IN_TX, parameterSource);
633            } else {
634                jdbcTemplate.update(DIRECT_INSERT_MEMBERSHIP, parameterSource);
635            }
636        });
637    }
638
639    /**
640     * Get a stream of membership triples with
641     * @param tx transaction from which membership will be retrieved, or null for no transaction
642     * @param subjectId ID of the subject
643     * @return Stream of membership triples
644     */
645    public Stream<Triple> getMembership(final Transaction tx, final FedoraId subjectId) {
646        final Node subjectNode = NodeFactory.createURI(subjectId.getBaseId());
647
648        final RowMapper<Triple> membershipMapper = (rs, rowNum) ->
649                Triple.create(subjectNode,
650                        NodeFactory.createURI(rs.getString("property")),
651                        NodeFactory.createURI(rs.getString("object_id")));
652
653        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
654        final String query;
655
656        if (subjectId.isMemento()) {
657            parameterSource.addValue(SUBJECT_ID_PARAM, subjectId.getBaseId());
658            parameterSource.addValue(MEMENTO_TIME_PARAM, formatInstant(subjectId.getMementoInstant()));
659        } else {
660            parameterSource.addValue(SUBJECT_ID_PARAM, subjectId.getFullId());
661            parameterSource.addValue(NO_END_TIME_PARAM, NO_END_TIMESTAMP);
662        }
663
664        if (tx.isOpenLongRunning()) {
665            parameterSource.addValue(TX_ID_PARAM, tx.getId());
666
667            if (subjectId.isMemento()) {
668                query = SELECT_MEMBERSHIP_MEMENTO_IN_TX;
669            } else {
670                query = SELECT_MEMBERSHIP_IN_TX;
671            }
672        } else {
673            if (subjectId.isMemento()) {
674                query = DIRECT_SELECT_MEMBERSHIP_MEMENTO;
675            } else {
676                query = DIRECT_SELECT_MEMBERSHIP;
677            }
678        }
679
680        return StreamSupport.stream(new MembershipIterator(query, parameterSource, membershipMapper), false);
681    }
682
683    public Instant getLastUpdated(final Transaction transaction, final FedoraId subjectId) {
684        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
685
686        parameterSource.addValue(NO_END_TIME_PARAM, NO_END_TIMESTAMP);
687        final String lastUpdatedQuery;
688        if (subjectId.isMemento()) {
689            lastUpdatedQuery = SELECT_LAST_UPDATED_MEMENTO;
690            parameterSource.addValue(SUBJECT_ID_PARAM, subjectId.getBaseId());
691            parameterSource.addValue(MEMENTO_TIME_PARAM, formatInstant(subjectId.getMementoInstant()));
692        } else if (transaction.isOpenLongRunning()) {
693            lastUpdatedQuery = SELECT_LAST_UPDATED_IN_TX;
694            parameterSource.addValue(SUBJECT_ID_PARAM, subjectId.getFullId());
695            parameterSource.addValue(TX_ID_PARAM, transaction.getId());
696            parameterSource.addValue(DELETE_OP_PARAM, DELETE_OPERATION);
697        } else {
698            lastUpdatedQuery = SELECT_LAST_UPDATED;
699            parameterSource.addValue(SUBJECT_ID_PARAM, subjectId.getFullId());
700        }
701
702        final var updated = jdbcTemplate.queryForObject(lastUpdatedQuery, parameterSource, Timestamp.class);
703        if (updated != null) {
704            return updated.toInstant();
705        }
706        return null;
707    }
708
709    /**
710     * Perform a commit of operations stored in the specified transaction
711     * @param tx transaction
712     */
713    public void commitTransaction(final Transaction tx) {
714        if (!tx.isShortLived()) {
715            tx.ensureCommitting();
716            final Map<String, String> parameterSource = Map.of(
717                    TX_ID_PARAM, tx.getId(),
718                    ADD_OP_PARAM, ADD_OPERATION,
719                    DELETE_OP_PARAM, DELETE_OPERATION,
720                    FORCE_PARAM, FORCE_FLAG);
721
722            jdbcTemplate.update(COMMIT_DELETES, parameterSource);
723            final int ends = jdbcTemplate.update(COMMIT_ENDS_MAP.get(this.dbPlatform), parameterSource);
724            final int adds = jdbcTemplate.update(COMMIT_ADDS, parameterSource);
725            final int cleaned = jdbcTemplate.update(DELETE_TRANSACTION, parameterSource);
726
727            log.debug("Completed commit, {} ended, {} adds, {} operations", ends, adds, cleaned);
728        }
729    }
730
731    /**
732     * Delete all entries related to a transaction
733     * @param tx transaction
734     */
735    @Transactional(propagation = Propagation.NOT_SUPPORTED)
736    public void deleteTransaction(final Transaction tx) {
737        if (!tx.isShortLived()) {
738            final Map<String, String> parameterSource = Map.of(TX_ID_PARAM, tx.getId());
739            jdbcTemplate.update(DELETE_TRANSACTION, parameterSource);
740        }
741    }
742
743    /**
744     * Format an instant to a timestamp without milliseconds, due to precision
745     * issues with memento datetimes.
746     * @param instant the instant
747     * @return a Timestamp
748     */
749    private Timestamp formatInstant(final Instant instant) {
750        final var timestamp = Timestamp.from(instant);
751        timestamp.setNanos(0);
752        return timestamp;
753    }
754
755    /**
756     * Clear all entries from the index
757     */
758    public void clearIndex() {
759        jdbcTemplate.update(TRUNCATE_MEMBERSHIP, Map.of());
760        jdbcTemplate.update(TRUNCATE_MEMBERSHIP_TX, Map.of());
761    }
762
763    /**
764     * Log all membership entries, for debugging usage only
765     */
766    protected void logMembership() {
767        log.info("source_id, proxy_id, subject_id, property, object_id, start_time, end_time, last_updated");
768        jdbcTemplate.query(SELECT_ALL_MEMBERSHIP, new RowCallbackHandler() {
769            @Override
770            public void processRow(final ResultSet rs) throws SQLException {
771                log.info("{}, {}, {}, {}, {}, {}, {}, {}",
772                        rs.getString("source_id"), rs.getString("proxy_id"), rs.getString("subject_id"),
773                        rs.getString("property"), rs.getString("object_id"), rs.getTimestamp("start_time"),
774                        rs.getTimestamp("end_time"), rs.getTimestamp("last_updated"));
775            }
776        });
777    }
778
779    /**
780     * Log all membership operations, for debugging usage only
781     */
782    protected void logOperations() {
783        log.info("source_id, proxy_id, subject_id, property, object_id, start_time, end_time,"
784                + " last_updated, tx_id, operation, force_flag");
785        jdbcTemplate.query(SELECT_ALL_OPERATIONS, new RowCallbackHandler() {
786            @Override
787            public void processRow(final ResultSet rs) throws SQLException {
788                log.info("{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}",
789                        rs.getString("source_id"), rs.getString("proxy_id"), rs.getString("subject_id"),
790                        rs.getString("property"), rs.getString("object_id"), rs.getTimestamp("start_time"),
791                        rs.getTimestamp("end_time"), rs.getTimestamp("last_updated"), rs.getString("tx_id"),
792                        rs.getString("operation"), rs.getString("force_flag"));
793            }
794        });
795    }
796
797    /**
798     * Set the JDBC datastore.
799     * @param dataSource the dataStore.
800     */
801    public void setDataSource(final DataSource dataSource) {
802        this.dataSource = dataSource;
803    }
804
805    /**
806     * Get the JDBC datastore.
807     * @return the dataStore.
808     */
809    public DataSource getDataSource() {
810        return dataSource;
811    }
812
813    /**
814     * Private class to back a stream with a paged DB query.
815     *
816     * If this needs to be run in parallel we will have to override trySplit() and determine a good method to split on.
817     */
818    private class MembershipIterator extends Spliterators.AbstractSpliterator<Triple> {
819        final Queue<Triple> children = new ConcurrentLinkedQueue<>();
820        int numOffsets = 0;
821        final String queryToUse;
822        final MapSqlParameterSource parameterSource;
823        final RowMapper<Triple> rowMapper;
824
825        public MembershipIterator(final String query, final MapSqlParameterSource parameters,
826                                  final RowMapper<Triple> mapper) {
827            super(Long.MAX_VALUE, Spliterator.ORDERED);
828            queryToUse = query;
829            parameterSource = parameters;
830            rowMapper = mapper;
831            parameterSource.addValue(ADD_OP_PARAM, ADD_OPERATION);
832            parameterSource.addValue(DELETE_OP_PARAM, DELETE_OPERATION);
833            parameterSource.addValue(LIMIT_PARAM, MEMBERSHIP_LIMIT);
834        }
835
836        @Override
837        public boolean tryAdvance(final Consumer<? super Triple> action) {
838            try {
839                action.accept(children.remove());
840            } catch (final NoSuchElementException e) {
841                parameterSource.addValue(OFFSET_PARAM, numOffsets * MEMBERSHIP_LIMIT);
842                numOffsets += 1;
843                children.addAll(jdbcTemplate.query(queryToUse, parameterSource, rowMapper));
844                if (children.size() == 0) {
845                    // no more elements.
846                    return false;
847                }
848                action.accept(children.remove());
849            }
850            return true;
851        }
852    }
853}