001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.fcrepo.persistence.ocfl.impl; 020 021import java.util.Collections; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.TimeUnit; 025 026import javax.annotation.Nonnull; 027import javax.annotation.PostConstruct; 028import javax.inject.Inject; 029import javax.sql.DataSource; 030 031import org.fcrepo.common.db.DbPlatform; 032import org.fcrepo.kernel.api.Transaction; 033import org.fcrepo.config.OcflPropsConfig; 034import org.fcrepo.kernel.api.exception.InvalidResourceIdentifierException; 035import org.fcrepo.kernel.api.exception.RepositoryRuntimeException; 036import org.fcrepo.kernel.api.identifiers.FedoraId; 037import org.fcrepo.persistence.ocfl.api.FedoraOcflMappingNotFoundException; 038import org.fcrepo.persistence.ocfl.api.FedoraToOcflObjectIndex; 039import org.fcrepo.storage.ocfl.cache.Cache; 040import org.fcrepo.storage.ocfl.cache.CaffeineCache; 041 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044import org.springframework.beans.factory.annotation.Autowired; 045import org.springframework.dao.DataIntegrityViolationException; 046import org.springframework.dao.EmptyResultDataAccessException; 047import org.springframework.jdbc.BadSqlGrammarException; 048import org.springframework.jdbc.core.RowMapper; 049import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; 050import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; 051import org.springframework.stereotype.Component; 052import org.springframework.transaction.annotation.Propagation; 053import org.springframework.transaction.annotation.Transactional; 054 055import com.github.benmanes.caffeine.cache.Caffeine; 056 057/** 058 * Maps Fedora IDs to the OCFL IDs of the OCFL objects the Fedora resource is stored in. This implementation is backed 059 * by a relational database. 060 * 061 * @author pwinckles 062 */ 063@Component("ocflIndexImpl") 064public class DbFedoraToOcflObjectIndex implements FedoraToOcflObjectIndex { 065 066 private static final Logger LOGGER = LoggerFactory.getLogger(DbFedoraToOcflObjectIndex.class); 067 068 private static final String MAPPING_TABLE = "ocfl_id_map"; 069 070 private static final String FEDORA_ID_COLUMN = "fedora_id"; 071 072 private static final String FEDORA_ROOT_ID_COLUMN = "fedora_root_id"; 073 074 private static final String OCFL_ID_COLUMN = "ocfl_id"; 075 076 private static final String TRANSACTION_OPERATIONS_TABLE = "ocfl_id_map_session_operations"; 077 078 private static final String TRANSACTION_ID_COLUMN = "session_id"; 079 080 private static final String OPERATION_COLUMN = "operation"; 081 082 /* 083 * Lookup all mappings for the resource id. 084 */ 085 private static final String LOOKUP_MAPPING = "SELECT " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " + 086 MAPPING_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :fedoraId"; 087 088 /* 089 * Lookup all mappings from the mapping table as well as any new 'add's and excluding any 'delete's in this 090 * transaction. 091 */ 092 private static final String LOOKUP_MAPPING_IN_TRANSACTION = "SELECT x." + FEDORA_ROOT_ID_COLUMN + "," + 093 " x." + OCFL_ID_COLUMN + " FROM" + 094 " (SELECT " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " + MAPPING_TABLE + " WHERE " + 095 FEDORA_ID_COLUMN + " = :fedoraId" + 096 " UNION SELECT " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " + TRANSACTION_OPERATIONS_TABLE + 097 " WHERE " + FEDORA_ID_COLUMN + " = :fedoraId AND " + TRANSACTION_ID_COLUMN + " = :transactionId" + 098 " AND " + OPERATION_COLUMN + " = 'add') x"; 099 100 /* 101 * Add an 'add' operation to the transaction table. 102 */ 103 private static final String UPSERT_MAPPING_TX_POSTGRESQL = "INSERT INTO " + TRANSACTION_OPERATIONS_TABLE + 104 " ( " + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ", " + 105 TRANSACTION_ID_COLUMN + ", " + OPERATION_COLUMN + ") VALUES (:fedoraId, :fedoraRootId, :ocflId," + 106 " :transactionId, :operation) ON CONFLICT (" + FEDORA_ID_COLUMN + ", " + TRANSACTION_ID_COLUMN + ")" + 107 " DO UPDATE SET " + FEDORA_ROOT_ID_COLUMN + " = EXCLUDED." + FEDORA_ROOT_ID_COLUMN + ", " + 108 OCFL_ID_COLUMN + " = EXCLUDED." + OCFL_ID_COLUMN + ", " + OPERATION_COLUMN + " = EXCLUDED." + 109 OPERATION_COLUMN; 110 111 private static final String UPSERT_MAPPING_TX_MYSQL_MARIA = "INSERT INTO " + TRANSACTION_OPERATIONS_TABLE + 112 " (" + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ", " + 113 TRANSACTION_ID_COLUMN + ", " + OPERATION_COLUMN + ")" + 114 " VALUES (:fedoraId, :fedoraRootId, :ocflId, :transactionId, :operation) ON DUPLICATE KEY UPDATE " + 115 FEDORA_ROOT_ID_COLUMN + " = VALUES(" + FEDORA_ROOT_ID_COLUMN + "), " + OCFL_ID_COLUMN + " = VALUES(" + 116 OCFL_ID_COLUMN + "), " + OPERATION_COLUMN + " = VALUES(" + OPERATION_COLUMN + ")"; 117 118 private static final String UPSERT_MAPPING_TX_H2 = "MERGE INTO " + TRANSACTION_OPERATIONS_TABLE + 119 " (" + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ", " + 120 TRANSACTION_ID_COLUMN + ", " + OPERATION_COLUMN + ")" + 121 " KEY (" + FEDORA_ID_COLUMN + ", " + TRANSACTION_ID_COLUMN + ")" + 122 " VALUES (:fedoraId, :fedoraRootId, :ocflId, :transactionId, :operation)"; 123 124 private static final String DIRECT_INSERT_MAPPING = "INSERT INTO " + MAPPING_TABLE + 125 " (" + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ")" + 126 " VALUES (:fedoraId, :fedoraRootId, :ocflId)"; 127 128 /** 129 * Map of database product to UPSERT into operations table SQL. 130 */ 131 private static final Map<DbPlatform, String> UPSERT_MAPPING_TX_MAP = Map.of( 132 DbPlatform.MYSQL, UPSERT_MAPPING_TX_MYSQL_MARIA, 133 DbPlatform.H2, UPSERT_MAPPING_TX_H2, 134 DbPlatform.POSTGRESQL, UPSERT_MAPPING_TX_POSTGRESQL, 135 DbPlatform.MARIADB, UPSERT_MAPPING_TX_MYSQL_MARIA 136 ); 137 138 private static final String DIRECT_DELETE_MAPPING = "DELETE FROM ocfl_id_map WHERE fedora_id = :fedoraId"; 139 140 private static final String COMMIT_ADD_MAPPING_POSTGRESQL = "INSERT INTO " + MAPPING_TABLE + 141 " ( " + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ") SELECT " + 142 FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " + 143 TRANSACTION_OPERATIONS_TABLE + " WHERE " + OPERATION_COLUMN + " = 'add' AND " + TRANSACTION_ID_COLUMN + 144 " = :transactionId ON CONFLICT ( " + FEDORA_ID_COLUMN + " )" + 145 " DO UPDATE SET " + FEDORA_ROOT_ID_COLUMN + " = EXCLUDED." + FEDORA_ROOT_ID_COLUMN + ", " + 146 OCFL_ID_COLUMN + " = EXCLUDED." + OCFL_ID_COLUMN; 147 148 private static final String COMMIT_ADD_MAPPING_MYSQL_MARIA = "INSERT INTO " + MAPPING_TABLE + 149 " (" + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ") SELECT " + 150 FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " + 151 TRANSACTION_OPERATIONS_TABLE + " WHERE " + OPERATION_COLUMN + " = 'add' AND " + TRANSACTION_ID_COLUMN + 152 " = :transactionId ON DUPLICATE KEY UPDATE " + 153 FEDORA_ROOT_ID_COLUMN + " = VALUES(" + FEDORA_ROOT_ID_COLUMN + "), " + OCFL_ID_COLUMN + " = VALUES(" + 154 OCFL_ID_COLUMN + ")"; 155 156 private static final String COMMIT_ADD_MAPPING_H2 = "MERGE INTO " + MAPPING_TABLE + 157 " (" + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + ")" + 158 " SELECT " + FEDORA_ID_COLUMN + ", " + FEDORA_ROOT_ID_COLUMN + ", " + OCFL_ID_COLUMN + " FROM " + 159 TRANSACTION_OPERATIONS_TABLE + " WHERE " + OPERATION_COLUMN + " = 'add'"; 160 161 /** 162 * Map of database product name to COMMIT to mapping table from operations table 163 */ 164 private static final Map<DbPlatform, String> COMMIT_ADD_MAPPING_MAP = Map.of( 165 DbPlatform.MYSQL, COMMIT_ADD_MAPPING_MYSQL_MARIA, 166 DbPlatform.H2, COMMIT_ADD_MAPPING_H2, 167 DbPlatform.POSTGRESQL, COMMIT_ADD_MAPPING_POSTGRESQL, 168 DbPlatform.MARIADB, COMMIT_ADD_MAPPING_MYSQL_MARIA 169 ); 170 171 /* 172 * Delete records from the mapping table that are to be deleted in this transaction. 173 */ 174 private static final String COMMIT_DELETE_RECORDS = "DELETE FROM " + MAPPING_TABLE + " WHERE " + 175 "EXISTS (SELECT * FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " + 176 TRANSACTION_ID_COLUMN + " = :transactionId AND " + OPERATION_COLUMN + " = 'delete' AND " + 177 MAPPING_TABLE + "." + FEDORA_ID_COLUMN + " = " + TRANSACTION_OPERATIONS_TABLE + "." + FEDORA_ID_COLUMN + 178 ")"; 179 180 /* 181 * Collect IDs to invalidate on transaction commit. 182 */ 183 private static final String GET_DELETE_IDS = "SELECT " + FEDORA_ID_COLUMN + " FROM " + 184 TRANSACTION_OPERATIONS_TABLE + " WHERE " + TRANSACTION_ID_COLUMN + " = :transactionId AND " + 185 OPERATION_COLUMN + " = 'delete'"; 186 187 private static final String TRUNCATE_MAPPINGS = "TRUNCATE TABLE " + MAPPING_TABLE; 188 189 private static final String TRUNCATE_TRANSACTIONS = "TRUNCATE TABLE " + TRANSACTION_OPERATIONS_TABLE; 190 191 /* 192 * Delete all records from the transaction table for the specified transaction. 193 */ 194 private static final String DELETE_ENTIRE_TRANSACTION = "DELETE FROM " + TRANSACTION_OPERATIONS_TABLE + " WHERE " + 195 TRANSACTION_ID_COLUMN + " = :transactionId"; 196 197 /* 198 * Row mapper for the Lookup queries. 199 */ 200 private static final RowMapper<FedoraOcflMapping> GET_MAPPING_ROW_MAPPER = (resultSet, i) -> new FedoraOcflMapping( 201 FedoraId.create(resultSet.getString(1)), 202 resultSet.getString(2) 203 ); 204 205 private Cache<String, FedoraOcflMapping> mappingCache; 206 207 private final DataSource dataSource; 208 209 private final NamedParameterJdbcTemplate jdbcTemplate; 210 211 private DbPlatform dbPlatform; 212 213 @Inject 214 private OcflPropsConfig ocflPropsConfig; 215 216 public DbFedoraToOcflObjectIndex(@Autowired final DataSource dataSource) { 217 this.dataSource = dataSource; 218 this.jdbcTemplate = new NamedParameterJdbcTemplate(dataSource); 219 } 220 221 @PostConstruct 222 public void setup() { 223 dbPlatform = DbPlatform.fromDataSource(dataSource); 224 final var cache = Caffeine.newBuilder() 225 .maximumSize(ocflPropsConfig.getFedoraToOcflCacheSize()) 226 .expireAfterAccess(ocflPropsConfig.getFedoraToOcflCacheTimeout(), TimeUnit.MINUTES) 227 .build(); 228 this.mappingCache = new CaffeineCache<>(cache); 229 } 230 231 @Override 232 public FedoraOcflMapping getMapping(final Transaction transaction, final FedoraId fedoraId) 233 throws FedoraOcflMappingNotFoundException { 234 try { 235 if (transaction.isOpenLongRunning()) { 236 final MapSqlParameterSource parameterSource = new MapSqlParameterSource(); 237 parameterSource.addValue("fedoraId", fedoraId.getResourceId()); 238 parameterSource.addValue("transactionId", transaction.getId()); 239 return jdbcTemplate.queryForObject(LOOKUP_MAPPING_IN_TRANSACTION, parameterSource, 240 GET_MAPPING_ROW_MAPPER); 241 } else { 242 return this.mappingCache.get(fedoraId.getResourceId(), key -> 243 jdbcTemplate.queryForObject(LOOKUP_MAPPING, Map.of("fedoraId", key), GET_MAPPING_ROW_MAPPER) 244 ); 245 } 246 } catch (final EmptyResultDataAccessException e) { 247 throw new FedoraOcflMappingNotFoundException("No OCFL mapping found for " + fedoraId); 248 } 249 } 250 251 @Override 252 public FedoraOcflMapping addMapping(@Nonnull final Transaction transaction, final FedoraId fedoraId, 253 final FedoraId fedoraRootId, final String ocflId) { 254 transaction.doInTx(() -> { 255 if (!transaction.isShortLived()) { 256 upsert(transaction, fedoraId, "add", fedoraRootId, ocflId); 257 } else { 258 directInsert(fedoraId, fedoraRootId, ocflId); 259 } 260 }); 261 262 return new FedoraOcflMapping(fedoraRootId, ocflId); 263 } 264 265 @Override 266 public void removeMapping(@Nonnull final Transaction transaction, final FedoraId fedoraId) { 267 transaction.doInTx(() -> { 268 if (!transaction.isShortLived()) { 269 upsert(transaction, fedoraId, "delete"); 270 } else { 271 final MapSqlParameterSource parameterSource = new MapSqlParameterSource(); 272 parameterSource.addValue("fedoraId", fedoraId.getResourceId()); 273 jdbcTemplate.update(DIRECT_DELETE_MAPPING, parameterSource); 274 this.mappingCache.invalidate(fedoraId.getResourceId()); 275 } 276 }); 277 } 278 279 private void upsert(final Transaction transaction, final FedoraId fedoraId, final String operation) { 280 upsert(transaction, fedoraId, operation, null, null); 281 } 282 283 /** 284 * Perform the upsert to the operations table. 285 * 286 * @param transaction the transaction/session id. 287 * @param fedoraId the resource id. 288 * @param operation the operation we are performing (add or delete) 289 * @param fedoraRootId the fedora root id (for add only) 290 * @param ocflId the ocfl id (for add only). 291 */ 292 private void upsert(final Transaction transaction, final FedoraId fedoraId, final String operation, 293 final FedoraId fedoraRootId, final String ocflId) { 294 final MapSqlParameterSource parameterSource = new MapSqlParameterSource(); 295 parameterSource.addValue("fedoraId", fedoraId.getResourceId()); 296 parameterSource.addValue("fedoraRootId", fedoraRootId == null ? null : fedoraRootId.getResourceId()); 297 parameterSource.addValue("ocflId", ocflId); 298 parameterSource.addValue("transactionId", transaction.getId()); 299 parameterSource.addValue("operation", operation); 300 try { 301 jdbcTemplate.update(UPSERT_MAPPING_TX_MAP.get(dbPlatform), parameterSource); 302 } catch (final DataIntegrityViolationException | BadSqlGrammarException e) { 303 handleInsertException(e); 304 } 305 } 306 307 private void directInsert(final FedoraId fedoraId, final FedoraId fedoraRootId, final String ocflId) { 308 final MapSqlParameterSource parameterSource = new MapSqlParameterSource(); 309 parameterSource.addValue("fedoraId", fedoraId.getResourceId()); 310 parameterSource.addValue("fedoraRootId", fedoraRootId == null ? null : fedoraRootId.getResourceId()); 311 parameterSource.addValue("ocflId", ocflId); 312 try { 313 jdbcTemplate.update(DIRECT_INSERT_MAPPING, parameterSource); 314 } catch (final DataIntegrityViolationException | BadSqlGrammarException e) { 315 handleInsertException(e); 316 } 317 } 318 319 @Override 320 public void reset() { 321 try { 322 jdbcTemplate.update(TRUNCATE_MAPPINGS, Collections.emptyMap()); 323 jdbcTemplate.update(TRUNCATE_TRANSACTIONS, Collections.emptyMap()); 324 this.mappingCache.invalidateAll(); 325 } catch (final Exception e) { 326 throw new RepositoryRuntimeException("Failed to truncate FedoraToOcfl index tables", e); 327 } 328 } 329 330 @Override 331 public void commit(@Nonnull final Transaction transaction) { 332 if (!transaction.isShortLived()) { 333 transaction.ensureCommitting(); 334 335 LOGGER.debug("Committing FedoraToOcfl index changes from transaction {}", transaction.getId()); 336 final Map<String, String> map = Map.of("transactionId", transaction.getId()); 337 try { 338 final List<String> deleteIds = jdbcTemplate.queryForList(GET_DELETE_IDS, map, String.class); 339 jdbcTemplate.update(COMMIT_DELETE_RECORDS, map); 340 jdbcTemplate.update(COMMIT_ADD_MAPPING_MAP.get(dbPlatform), map); 341 jdbcTemplate.update(DELETE_ENTIRE_TRANSACTION, map); 342 this.mappingCache.invalidateAll(deleteIds); 343 } catch (final Exception e) { 344 LOGGER.warn("Unable to commit FedoraToOcfl index transaction {}: {}", transaction, e.getMessage()); 345 throw new RepositoryRuntimeException("Unable to commit FedoraToOcfl index transaction", e); 346 } 347 } 348 } 349 350 @Transactional(propagation = Propagation.NOT_SUPPORTED) 351 @Override 352 public void rollback(@Nonnull final Transaction transaction) { 353 if (!transaction.isShortLived()) { 354 jdbcTemplate.update(DELETE_ENTIRE_TRANSACTION, Map.of("transactionId", transaction.getId())); 355 } 356 } 357 358 private void handleInsertException(final Exception e) { 359 if (e.getMessage().contains("too long for")) { 360 throw new InvalidResourceIdentifierException("Database error - Fedora ID path too long",e); 361 } else { 362 throw new RepositoryRuntimeException("Database error - error during upsert",e); 363 } 364 } 365 366}