001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.fcrepo.persistence.ocfl.impl;
020
021import java.util.Collections;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.TimeUnit;
025
026import javax.annotation.Nonnull;
027import javax.annotation.PostConstruct;
028import javax.inject.Inject;
029import javax.sql.DataSource;
030
031import org.fcrepo.common.db.DbPlatform;
032import org.fcrepo.kernel.api.Transaction;
033import org.fcrepo.config.OcflPropsConfig;
034import org.fcrepo.kernel.api.exception.InvalidResourceIdentifierException;
035import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
036import org.fcrepo.kernel.api.identifiers.FedoraId;
037import org.fcrepo.persistence.ocfl.api.FedoraOcflMappingNotFoundException;
038import org.fcrepo.persistence.ocfl.api.FedoraToOcflObjectIndex;
039import org.fcrepo.storage.ocfl.cache.Cache;
040import org.fcrepo.storage.ocfl.cache.CaffeineCache;
041
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.springframework.beans.factory.annotation.Autowired;
045import org.springframework.dao.DataIntegrityViolationException;
046import org.springframework.dao.EmptyResultDataAccessException;
047import org.springframework.jdbc.BadSqlGrammarException;
048import org.springframework.jdbc.core.RowMapper;
049import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
050import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
051import org.springframework.stereotype.Component;
052import org.springframework.transaction.annotation.Propagation;
053import org.springframework.transaction.annotation.Transactional;
054
055import com.github.benmanes.caffeine.cache.Caffeine;
056
057/**
058 * Maps Fedora IDs to the OCFL IDs of the OCFL objects the Fedora resource is stored in. This implementation is backed
059 * by a relational database.
060 *
061 * @author pwinckles
062 */
063@Component("ocflIndexImpl")
064public class DbFedoraToOcflObjectIndex implements FedoraToOcflObjectIndex {
065
066    private static final Logger LOGGER = LoggerFactory.getLogger(DbFedoraToOcflObjectIndex.class);
067
068    private static final String MAPPING_TABLE = "ocfl_id_map";
069
070    private static final String FEDORA_ID_COLUMN = "fedora_id";
071
072    private static final String FEDORA_ROOT_ID_COLUMN = "fedora_root_id";
073
074    private static final String OCFL_ID_COLUMN = "ocfl_id";
075
076    private static final String TRANSACTION_OPERATIONS_TABLE = "ocfl_id_map_session_operations";
077
078    private static final String TRANSACTION_ID_COLUMN = "session_id";
079
080    private static final String OPERATION_COLUMN = "operation";
081
082    /*
083     * Lookup all mappings for the resource id.
084     */
085    private static final String LOOKUP_MAPPING = "SELECT " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " +
086            MAPPING_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :fedoraId";
087
088    /*
089     * Lookup all mappings from the mapping table as well as any new 'add's and excluding any 'delete's in this
090     * transaction.
091     */
092    private static final String LOOKUP_MAPPING_IN_TRANSACTION = "SELECT x." + FEDORA_ROOT_ID_COLUMN + "," +
093            " x." + OCFL_ID_COLUMN + " FROM" +
094            " (SELECT " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " + MAPPING_TABLE + " WHERE " +
095            FEDORA_ID_COLUMN + " = :fedoraId" +
096            " UNION SELECT " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " + TRANSACTION_OPERATIONS_TABLE +
097            " WHERE " + FEDORA_ID_COLUMN + " = :fedoraId AND " + TRANSACTION_ID_COLUMN + " = :transactionId" +
098            " AND " + OPERATION_COLUMN + " = 'add') x";
099
100    /*
101     * Add an 'add' operation to the transaction table.
102     */
103    private static final String UPSERT_MAPPING_TX_POSTGRESQL = "INSERT INTO " + TRANSACTION_OPERATIONS_TABLE +
104            " ( " + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ", " +
105            TRANSACTION_ID_COLUMN + ", " + OPERATION_COLUMN + ") VALUES (:fedoraId, :fedoraRootId, :ocflId," +
106            " :transactionId, :operation) ON CONFLICT (" + FEDORA_ID_COLUMN + ", " + TRANSACTION_ID_COLUMN + ")" +
107            " DO UPDATE SET " + FEDORA_ROOT_ID_COLUMN + " = EXCLUDED." + FEDORA_ROOT_ID_COLUMN + ", " +
108            OCFL_ID_COLUMN + " = EXCLUDED." + OCFL_ID_COLUMN + ", " + OPERATION_COLUMN + " = EXCLUDED." +
109            OPERATION_COLUMN;
110
111    private static final String UPSERT_MAPPING_TX_MYSQL_MARIA = "INSERT INTO " + TRANSACTION_OPERATIONS_TABLE +
112            " (" + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ", " +
113            TRANSACTION_ID_COLUMN + ", " + OPERATION_COLUMN + ")" +
114            " VALUES (:fedoraId, :fedoraRootId, :ocflId, :transactionId, :operation) ON DUPLICATE KEY UPDATE " +
115            FEDORA_ROOT_ID_COLUMN + " = VALUES(" + FEDORA_ROOT_ID_COLUMN + "), " + OCFL_ID_COLUMN + " = VALUES(" +
116            OCFL_ID_COLUMN + "), " + OPERATION_COLUMN + " = VALUES(" + OPERATION_COLUMN + ")";
117
118    private static final String UPSERT_MAPPING_TX_H2 = "MERGE INTO " + TRANSACTION_OPERATIONS_TABLE +
119            " (" + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ", " +
120            TRANSACTION_ID_COLUMN + ", " + OPERATION_COLUMN + ")" +
121            " KEY (" + FEDORA_ID_COLUMN + ", " + TRANSACTION_ID_COLUMN + ")" +
122            " VALUES (:fedoraId, :fedoraRootId, :ocflId, :transactionId, :operation)";
123
124    private static final String DIRECT_INSERT_MAPPING = "INSERT INTO " + MAPPING_TABLE +
125            " (" + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ")" +
126            " VALUES (:fedoraId, :fedoraRootId, :ocflId)";
127
128    /**
129     * Map of database product to UPSERT into operations table SQL.
130     */
131    private static final Map<DbPlatform, String> UPSERT_MAPPING_TX_MAP = Map.of(
132            DbPlatform.MYSQL, UPSERT_MAPPING_TX_MYSQL_MARIA,
133            DbPlatform.H2, UPSERT_MAPPING_TX_H2,
134            DbPlatform.POSTGRESQL, UPSERT_MAPPING_TX_POSTGRESQL,
135            DbPlatform.MARIADB, UPSERT_MAPPING_TX_MYSQL_MARIA
136    );
137
138    private static final String DIRECT_DELETE_MAPPING = "DELETE FROM ocfl_id_map WHERE fedora_id = :fedoraId";
139
140    private static final String COMMIT_ADD_MAPPING_POSTGRESQL = "INSERT INTO " + MAPPING_TABLE +
141            " ( " + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ") SELECT " +
142            FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " +
143            TRANSACTION_OPERATIONS_TABLE + " WHERE " + OPERATION_COLUMN + " = 'add' AND " + TRANSACTION_ID_COLUMN +
144            " = :transactionId ON CONFLICT ( " +  FEDORA_ID_COLUMN + " )" +
145            " DO UPDATE SET " + FEDORA_ROOT_ID_COLUMN + " = EXCLUDED." + FEDORA_ROOT_ID_COLUMN + ", " +
146            OCFL_ID_COLUMN + " = EXCLUDED." + OCFL_ID_COLUMN;
147
148    private static final String COMMIT_ADD_MAPPING_MYSQL_MARIA = "INSERT INTO " + MAPPING_TABLE +
149            " (" + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ") SELECT " +
150            FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " +
151            TRANSACTION_OPERATIONS_TABLE + " WHERE " + OPERATION_COLUMN + " = 'add' AND " + TRANSACTION_ID_COLUMN +
152            " = :transactionId ON DUPLICATE KEY UPDATE " +
153            FEDORA_ROOT_ID_COLUMN + " = VALUES(" + FEDORA_ROOT_ID_COLUMN + "), " + OCFL_ID_COLUMN + " = VALUES(" +
154            OCFL_ID_COLUMN + ")";
155
156    private static final String COMMIT_ADD_MAPPING_H2 = "MERGE INTO " + MAPPING_TABLE +
157            " (" + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ")" +
158            " SELECT " + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " +
159            TRANSACTION_OPERATIONS_TABLE + " WHERE " + OPERATION_COLUMN + " = 'add'";
160
161    /**
162     * Map of database product name to COMMIT to mapping table from operations table
163     */
164    private static final Map<DbPlatform, String> COMMIT_ADD_MAPPING_MAP = Map.of(
165            DbPlatform.MYSQL, COMMIT_ADD_MAPPING_MYSQL_MARIA,
166            DbPlatform.H2, COMMIT_ADD_MAPPING_H2,
167            DbPlatform.POSTGRESQL, COMMIT_ADD_MAPPING_POSTGRESQL,
168            DbPlatform.MARIADB, COMMIT_ADD_MAPPING_MYSQL_MARIA
169    );
170
171    /*
172     * Delete records from the mapping table that are to be deleted in this transaction.
173     */
174    private static final String COMMIT_DELETE_RECORDS = "DELETE FROM " + MAPPING_TABLE + " WHERE " +
175            "EXISTS (SELECT * FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " +
176            TRANSACTION_ID_COLUMN + " = :transactionId AND " +  OPERATION_COLUMN + " = 'delete' AND " +
177            MAPPING_TABLE + "." + FEDORA_ID_COLUMN + " = " + TRANSACTION_OPERATIONS_TABLE + "." + FEDORA_ID_COLUMN +
178            ")";
179
180    /*
181     * Collect IDs to invalidate on transaction commit.
182     */
183    private static final String GET_DELETE_IDS = "SELECT " + FEDORA_ID_COLUMN + " FROM " +
184            TRANSACTION_OPERATIONS_TABLE + " WHERE " + TRANSACTION_ID_COLUMN + " = :transactionId AND " +
185            OPERATION_COLUMN + " = 'delete'";
186
187    private static final String TRUNCATE_MAPPINGS = "TRUNCATE TABLE " + MAPPING_TABLE;
188
189    private static final String TRUNCATE_TRANSACTIONS = "TRUNCATE TABLE " + TRANSACTION_OPERATIONS_TABLE;
190
191    /*
192     * Delete all records from the transaction table for the specified transaction.
193     */
194    private static final String DELETE_ENTIRE_TRANSACTION = "DELETE FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " +
195            TRANSACTION_ID_COLUMN + " = :transactionId";
196
197    /*
198     * Row mapper for the Lookup queries.
199     */
200    private static final RowMapper<FedoraOcflMapping> GET_MAPPING_ROW_MAPPER = (resultSet, i) -> new FedoraOcflMapping(
201            FedoraId.create(resultSet.getString(1)),
202            resultSet.getString(2)
203    );
204
205    private Cache<String, FedoraOcflMapping> mappingCache;
206
207    private final DataSource dataSource;
208
209    private final NamedParameterJdbcTemplate jdbcTemplate;
210
211    private DbPlatform dbPlatform;
212
213    @Inject
214    private OcflPropsConfig ocflPropsConfig;
215
216    public DbFedoraToOcflObjectIndex(@Autowired final DataSource dataSource) {
217        this.dataSource = dataSource;
218        this.jdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
219    }
220
221    @PostConstruct
222    public void setup() {
223        dbPlatform = DbPlatform.fromDataSource(dataSource);
224        final var cache = Caffeine.newBuilder()
225                .maximumSize(ocflPropsConfig.getFedoraToOcflCacheSize())
226                .expireAfterAccess(ocflPropsConfig.getFedoraToOcflCacheTimeout(), TimeUnit.MINUTES)
227                .build();
228        this.mappingCache = new CaffeineCache<>(cache);
229    }
230
231    @Override
232    public FedoraOcflMapping getMapping(final Transaction transaction, final FedoraId fedoraId)
233            throws FedoraOcflMappingNotFoundException {
234        try {
235            if (transaction.isOpenLongRunning()) {
236                final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
237                parameterSource.addValue("fedoraId", fedoraId.getResourceId());
238                parameterSource.addValue("transactionId", transaction.getId());
239                return jdbcTemplate.queryForObject(LOOKUP_MAPPING_IN_TRANSACTION, parameterSource,
240                        GET_MAPPING_ROW_MAPPER);
241            } else {
242                return this.mappingCache.get(fedoraId.getResourceId(), key ->
243                        jdbcTemplate.queryForObject(LOOKUP_MAPPING, Map.of("fedoraId", key), GET_MAPPING_ROW_MAPPER)
244                );
245            }
246        } catch (final EmptyResultDataAccessException e) {
247            throw new FedoraOcflMappingNotFoundException("No OCFL mapping found for " + fedoraId);
248        }
249    }
250
251    @Override
252    public FedoraOcflMapping addMapping(@Nonnull final Transaction transaction, final FedoraId fedoraId,
253                                        final FedoraId fedoraRootId, final String ocflId) {
254        transaction.doInTx(() -> {
255            if (!transaction.isShortLived()) {
256                upsert(transaction, fedoraId, "add", fedoraRootId, ocflId);
257            } else {
258                directInsert(fedoraId, fedoraRootId, ocflId);
259            }
260        });
261
262        return new FedoraOcflMapping(fedoraRootId, ocflId);
263    }
264
265    @Override
266    public void removeMapping(@Nonnull final Transaction transaction, final FedoraId fedoraId) {
267        transaction.doInTx(() -> {
268            if (!transaction.isShortLived()) {
269                upsert(transaction, fedoraId, "delete");
270            } else {
271                final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
272                parameterSource.addValue("fedoraId", fedoraId.getResourceId());
273                jdbcTemplate.update(DIRECT_DELETE_MAPPING, parameterSource);
274                this.mappingCache.invalidate(fedoraId.getResourceId());
275            }
276        });
277    }
278
279    private void upsert(final Transaction transaction, final FedoraId fedoraId, final String operation) {
280        upsert(transaction, fedoraId, operation, null, null);
281    }
282
283    /**
284     * Perform the upsert to the operations table.
285     *
286     * @param transaction the transaction/session id.
287     * @param fedoraId the resource id.
288     * @param operation the operation we are performing (add or delete)
289     * @param fedoraRootId the fedora root id (for add only)
290     * @param ocflId the ocfl id (for add only).
291     */
292    private void upsert(final Transaction transaction, final FedoraId fedoraId, final String operation,
293                        final FedoraId fedoraRootId, final String ocflId) {
294        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
295        parameterSource.addValue("fedoraId", fedoraId.getResourceId());
296        parameterSource.addValue("fedoraRootId", fedoraRootId == null ? null : fedoraRootId.getResourceId());
297        parameterSource.addValue("ocflId", ocflId);
298        parameterSource.addValue("transactionId", transaction.getId());
299        parameterSource.addValue("operation", operation);
300        try {
301            jdbcTemplate.update(UPSERT_MAPPING_TX_MAP.get(dbPlatform), parameterSource);
302        } catch (final DataIntegrityViolationException | BadSqlGrammarException e) {
303            handleInsertException(e);
304        }
305    }
306
307    private void directInsert(final FedoraId fedoraId, final FedoraId fedoraRootId, final String ocflId) {
308        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
309        parameterSource.addValue("fedoraId", fedoraId.getResourceId());
310        parameterSource.addValue("fedoraRootId", fedoraRootId == null ? null : fedoraRootId.getResourceId());
311        parameterSource.addValue("ocflId", ocflId);
312        try {
313            jdbcTemplate.update(DIRECT_INSERT_MAPPING, parameterSource);
314        } catch (final DataIntegrityViolationException | BadSqlGrammarException e) {
315            handleInsertException(e);
316        }
317    }
318
319    @Override
320    public void reset() {
321        try {
322            jdbcTemplate.update(TRUNCATE_MAPPINGS, Collections.emptyMap());
323            jdbcTemplate.update(TRUNCATE_TRANSACTIONS, Collections.emptyMap());
324            this.mappingCache.invalidateAll();
325        } catch (final Exception e) {
326            throw new RepositoryRuntimeException("Failed to truncate FedoraToOcfl index tables", e);
327        }
328    }
329
330    @Override
331    public void commit(@Nonnull final Transaction transaction) {
332        if (!transaction.isShortLived()) {
333            transaction.ensureCommitting();
334
335            LOGGER.debug("Committing FedoraToOcfl index changes from transaction {}", transaction.getId());
336            final Map<String, String> map = Map.of("transactionId", transaction.getId());
337            try {
338                final List<String> deleteIds = jdbcTemplate.queryForList(GET_DELETE_IDS, map, String.class);
339                jdbcTemplate.update(COMMIT_DELETE_RECORDS, map);
340                jdbcTemplate.update(COMMIT_ADD_MAPPING_MAP.get(dbPlatform), map);
341                jdbcTemplate.update(DELETE_ENTIRE_TRANSACTION, map);
342                this.mappingCache.invalidateAll(deleteIds);
343            } catch (final Exception e) {
344                LOGGER.warn("Unable to commit FedoraToOcfl index transaction {}: {}", transaction, e.getMessage());
345                throw new RepositoryRuntimeException("Unable to commit FedoraToOcfl index transaction", e);
346            }
347        }
348    }
349
350    @Transactional(propagation = Propagation.NOT_SUPPORTED)
351    @Override
352    public void rollback(@Nonnull final Transaction transaction) {
353        if (!transaction.isShortLived()) {
354            jdbcTemplate.update(DELETE_ENTIRE_TRANSACTION, Map.of("transactionId", transaction.getId()));
355        }
356    }
357
358    private void handleInsertException(final Exception e) {
359        if (e.getMessage().contains("too long for")) {
360            throw new InvalidResourceIdentifierException("Database error - Fedora ID path too long",e);
361        } else {
362            throw new RepositoryRuntimeException("Database error - error during upsert",e);
363        }
364    }
365
366}