001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.fcrepo.persistence.ocfl.impl; 019 020import static java.lang.String.format; 021import static java.util.concurrent.TimeUnit.MILLISECONDS; 022import static org.apache.jena.graph.NodeFactory.createURI; 023import static org.apache.jena.rdf.model.ModelFactory.createDefaultModel; 024 025import java.io.IOException; 026import java.io.InputStream; 027import java.time.Instant; 028import java.util.ArrayList; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.TreeMap; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.Phaser; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import java.util.stream.Collectors; 039 040import org.fcrepo.kernel.api.RdfStream; 041import org.fcrepo.kernel.api.Transaction; 042import org.fcrepo.kernel.api.identifiers.FedoraId; 043import org.fcrepo.kernel.api.models.ResourceHeaders; 044import org.fcrepo.kernel.api.operations.ResourceOperation; 045import org.fcrepo.kernel.api.rdf.DefaultRdfStream; 046import org.fcrepo.persistence.api.PersistentStorageSession; 047import org.fcrepo.persistence.api.exceptions.PersistentItemNotFoundException; 048import org.fcrepo.persistence.api.exceptions.PersistentSessionClosedException; 049import org.fcrepo.persistence.api.exceptions.PersistentStorageException; 050import org.fcrepo.persistence.ocfl.api.FedoraOcflMappingNotFoundException; 051import org.fcrepo.persistence.ocfl.api.FedoraToOcflObjectIndex; 052import org.fcrepo.persistence.ocfl.api.Persister; 053import org.fcrepo.storage.ocfl.OcflObjectSession; 054import org.fcrepo.storage.ocfl.OcflObjectSessionFactory; 055import org.fcrepo.storage.ocfl.OcflVersionInfo; 056 057import org.apache.jena.rdf.model.Model; 058import org.apache.jena.riot.RDFDataMgr; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import com.github.benmanes.caffeine.cache.Caffeine; 063 064/** 065 * OCFL Persistent Storage class. 066 * 067 * @author whikloj 068 * @since 2019-09-20 069 */ 070public class OcflPersistentStorageSession implements PersistentStorageSession { 071 072 private static final Logger LOGGER = LoggerFactory.getLogger(OcflPersistentStorageSession.class); 073 074 private static final long AWAIT_TIMEOUT = 30000L; 075 076 /** 077 * Externally generated Transaction for the session. 078 */ 079 private final Transaction transaction; 080 081 private final FedoraToOcflObjectIndex fedoraOcflIndex; 082 083 private final Map<String, OcflObjectSession> sessionMap; 084 085 private final ReindexService reindexSerivce; 086 087 private Map<String, OcflObjectSession> sessionsToRollback; 088 089 private final Phaser phaser = new Phaser(); 090 091 private final List<Persister> persisterList = new ArrayList<>(); 092 093 private State state = State.COMMIT_NOT_STARTED; 094 095 private final OcflObjectSessionFactory objectSessionFactory; 096 097 private enum State { 098 COMMIT_NOT_STARTED(true), 099 PREPARE_STARTED(false), 100 PREPARED(true), 101 PREPARE_FAILED(true), 102 COMMIT_STARTED(false), 103 COMMITTED(true), 104 COMMIT_FAILED(true), 105 ROLLING_BACK(false), 106 ROLLED_BACK(false), 107 ROLLBACK_FAILED(false); 108 109 final boolean rollbackAllowed; 110 111 State(final boolean rollbackAllowed) { 112 this.rollbackAllowed = rollbackAllowed; 113 } 114 115 } 116 117 /** 118 * Constructor 119 * 120 * @param tx the transaction. 121 * @param fedoraOcflIndex the index 122 * @param objectSessionFactory the session factory 123 */ 124 protected OcflPersistentStorageSession(final Transaction tx, 125 final FedoraToOcflObjectIndex fedoraOcflIndex, 126 final OcflObjectSessionFactory objectSessionFactory, 127 final ReindexService reindexService) { 128 this.transaction = tx; 129 this.fedoraOcflIndex = fedoraOcflIndex; 130 this.objectSessionFactory = objectSessionFactory; 131 this.reindexSerivce = reindexService; 132 this.sessionsToRollback = new HashMap<>(); 133 134 if (!tx.isReadOnly()) { 135 this.sessionMap = new ConcurrentHashMap<>(); 136 } else { 137 // The read-only session is never closed, so it needs to periodically expire object sessions 138 this.sessionMap = Caffeine.newBuilder() 139 .maximumSize(512) 140 .expireAfterAccess(10, TimeUnit.MINUTES) 141 .<String, OcflObjectSession>build() 142 .asMap(); 143 } 144 145 //load the persister list if empty 146 persisterList.add(new CreateRdfSourcePersister(this.fedoraOcflIndex)); 147 persisterList.add(new UpdateRdfSourcePersister(this.fedoraOcflIndex)); 148 persisterList.add(new CreateNonRdfSourcePersister(this.fedoraOcflIndex)); 149 persisterList.add(new UpdateNonRdfSourcePersister(this.fedoraOcflIndex)); 150 persisterList.add(new DeleteResourcePersister(this.fedoraOcflIndex)); 151 persisterList.add(new CreateVersionPersister(this.fedoraOcflIndex)); 152 persisterList.add(new PurgeResourcePersister(this.fedoraOcflIndex)); 153 persisterList.add(new ReindexResourcePersister(this.reindexSerivce)); 154 155 } 156 157 @Override 158 public String getId() { 159 return this.transaction.getId(); 160 } 161 162 @Override 163 public void persist(final ResourceOperation operation) throws PersistentStorageException { 164 actionNeedsWrite(); 165 ensureCommitNotStarted(); 166 167 try { 168 phaser.register(); 169 170 //resolve the persister based on the operation 171 final var persister = persisterList.stream().filter(p -> p.handle(operation)).findFirst().orElse(null); 172 173 if (persister == null) { 174 throw new UnsupportedOperationException(format("The %s is not yet supported", operation.getClass())); 175 } 176 177 //perform the operation 178 persister.persist(this, operation); 179 180 } finally { 181 phaser.arriveAndDeregister(); 182 } 183 184 } 185 186 private void ensureCommitNotStarted() throws PersistentSessionClosedException { 187 if (!state.equals(State.COMMIT_NOT_STARTED)) { 188 throw new PersistentSessionClosedException( 189 String.format("Storage session %s is already closed", transaction)); 190 } 191 } 192 193 private void ensurePrepared() throws PersistentSessionClosedException { 194 if (!state.equals(State.PREPARED)) { 195 throw new PersistentStorageException( 196 String.format("Storage session %s cannot be committed because it is not in the correct state: %s", 197 transaction, state)); 198 } 199 } 200 201 OcflObjectSession findOrCreateSession(final String ocflId) { 202 return this.sessionMap.computeIfAbsent(ocflId, key -> { 203 return new FcrepoOcflObjectSessionWrapper(this.objectSessionFactory.newSession(key)); 204 }); 205 } 206 207 @Override 208 public ResourceHeaders getHeaders(final FedoraId identifier, final Instant version) 209 throws PersistentStorageException { 210 ensureCommitNotStarted(); 211 212 final FedoraOcflMapping mapping = getFedoraOcflMapping(identifier); 213 final OcflObjectSession objSession = findOrCreateSession(mapping.getOcflObjectId()); 214 215 final var versionId = resolveVersionNumber(objSession, identifier, version); 216 final var headers = objSession.readHeaders(identifier.getResourceId(), versionId); 217 218 return new ResourceHeadersAdapter(headers).asKernelHeaders(); 219 } 220 221 private FedoraOcflMapping getFedoraOcflMapping(final FedoraId identifier) 222 throws PersistentStorageException { 223 try { 224 return fedoraOcflIndex.getMapping(transaction, identifier); 225 } catch (final FedoraOcflMappingNotFoundException e) { 226 throw new PersistentItemNotFoundException(String.format("Resource %s not found", 227 identifier.getFullIdPath()), e); 228 } 229 } 230 231 @Override 232 public RdfStream getTriples(final FedoraId identifier, final Instant version) 233 throws PersistentStorageException { 234 ensureCommitNotStarted(); 235 236 try (final InputStream is = getBinaryContent(identifier, version)) { 237 final Model model = createDefaultModel(); 238 RDFDataMgr.read(model, is, OcflPersistentStorageUtils.getRdfFormat().getLang()); 239 final FedoraId topic = resolveTopic(identifier); 240 return DefaultRdfStream.fromModel(createURI(topic.getFullId()), model); 241 } catch (final IOException ex) { 242 throw new PersistentStorageException(format("unable to read %s ; version = %s", identifier, version), ex); 243 } 244 } 245 246 @Override 247 public List<Instant> listVersions(final FedoraId fedoraIdentifier) 248 throws PersistentStorageException { 249 final var mapping = getFedoraOcflMapping(fedoraIdentifier); 250 final var objSession = findOrCreateSession(mapping.getOcflObjectId()); 251 252 return objSession.listVersions(fedoraIdentifier.getResourceId()).stream() 253 .map(OcflVersionInfo::getCreated) 254 .collect(Collectors.toList()); 255 } 256 257 @Override 258 public InputStream getBinaryContent(final FedoraId identifier, final Instant version) 259 throws PersistentStorageException { 260 ensureCommitNotStarted(); 261 262 final var mapping = getFedoraOcflMapping(identifier); 263 final var objSession = findOrCreateSession(mapping.getOcflObjectId()); 264 265 final var versionNumber = resolveVersionNumber(objSession, identifier, version); 266 267 return objSession.readContent(identifier.getResourceId(), versionNumber) 268 .getContentStream() 269 .orElseThrow(() -> new PersistentItemNotFoundException("No binary content found for resource " 270 + identifier.getFullId())); 271 } 272 273 @Override 274 public synchronized void prepare() { 275 ensureCommitNotStarted(); 276 if (isReadOnly()) { 277 // No changes to commit. 278 return; 279 } 280 281 this.state = State.PREPARE_STARTED; 282 LOGGER.debug("Starting storage session {} prepare for commit", transaction); 283 284 if (this.phaser.getRegisteredParties() > 0) { 285 this.phaser.awaitAdvance(0); 286 } 287 288 LOGGER.trace("All persisters are complete in session {}", transaction); 289 290 try { 291 fedoraOcflIndex.commit(transaction); 292 state = State.PREPARED; 293 } catch (final RuntimeException e) { 294 state = State.PREPARE_FAILED; 295 throw new PersistentStorageException(String.format("Failed to prepare storage session <%s> for commit", 296 transaction), e); 297 } 298 } 299 300 @Override 301 public synchronized void commit() throws PersistentStorageException { 302 ensurePrepared(); 303 if (isReadOnly()) { 304 // No changes to commit. 305 return; 306 } 307 308 this.state = State.COMMIT_STARTED; 309 LOGGER.debug("Starting storage session {} commit", transaction); 310 311 // order map for testing 312 final var sessions = new TreeMap<>(sessionMap); 313 commitObjectSessions(sessions); 314 315 LOGGER.debug("Committed storage session {}", transaction); 316 } 317 318 private void commitObjectSessions(final Map<String, OcflObjectSession> sessions) 319 throws PersistentStorageException { 320 this.sessionsToRollback = new HashMap<>(sessionMap.size()); 321 322 for (final var entry : sessions.entrySet()) { 323 final var id = entry.getKey(); 324 final var session = entry.getValue(); 325 try { 326 session.commit(); 327 sessionsToRollback.put(id, session); 328 } catch (final Exception e) { 329 this.state = State.COMMIT_FAILED; 330 throw new PersistentStorageException(String.format("Failed to commit object <%s> in session <%s>", 331 id, transaction), e); 332 } 333 } 334 335 state = State.COMMITTED; 336 } 337 338 @Override 339 public void rollback() throws PersistentStorageException { 340 if (isReadOnly()) { 341 // No changes to rollback 342 return; 343 } 344 345 if (!state.rollbackAllowed) { 346 throw new PersistentStorageException("This session cannot be rolled back in this state: " + state); 347 } 348 349 final boolean commitWasStarted = this.state != State.COMMIT_NOT_STARTED; 350 351 this.state = State.ROLLING_BACK; 352 LOGGER.debug("Rolling back storage session {}", transaction); 353 354 if (!commitWasStarted) { 355 //if the commit had not been started at the time this method was invoked 356 //we must ensure that all persist operations are complete before we close any 357 //ocfl object sessions. If the commit had been started then this synchronization step 358 //will have already occurred and is thus unnecessary. 359 if (this.phaser.getRegisteredParties() > 0) { 360 try { 361 this.phaser.awaitAdvanceInterruptibly(0, AWAIT_TIMEOUT, MILLISECONDS); 362 } catch (final InterruptedException | TimeoutException e) { 363 throw new PersistentStorageException( 364 "Waiting for operations to complete took too long, rollback failed"); 365 } 366 } 367 } 368 369 closeUncommittedSessions(); 370 371 if (commitWasStarted) { 372 rollbackCommittedSessions(); 373 } 374 375 this.state = State.ROLLED_BACK; 376 LOGGER.trace("Successfully rolled back storage session {}", transaction); 377 } 378 379 /** 380 * Resolve an instant to a version 381 * 382 * @param objSession session 383 * @param fedoraId the FedoraId of the resource 384 * @param version version time 385 * @return name of version 386 * @throws PersistentStorageException thrown if version not found 387 */ 388 private String resolveVersionNumber(final OcflObjectSession objSession, 389 final FedoraId fedoraId, 390 final Instant version) 391 throws PersistentStorageException { 392 if (version != null) { 393 final var versions = objSession.listVersions(fedoraId.getResourceId()); 394 // reverse order so that the most recent version is matched first 395 Collections.reverse(versions); 396 return versions.stream() 397 .filter(vd -> vd.getCreated().equals(version)) 398 .map(OcflVersionInfo::getVersionNumber) 399 .findFirst() 400 .orElseThrow(() -> { 401 return new PersistentItemNotFoundException(format( 402 "There is no version in %s with a created date matching %s", 403 fedoraId, version)); 404 }); 405 } 406 407 return null; 408 } 409 410 private void closeUncommittedSessions() { 411 this.sessionMap.entrySet().stream() 412 .filter(entry -> !sessionsToRollback.containsKey(entry.getKey())) 413 .map(Map.Entry::getValue) 414 .forEach(OcflObjectSession::abort); 415 } 416 417 private void rollbackCommittedSessions() throws PersistentStorageException { 418 final List<String> rollbackFailures = new ArrayList<>(this.sessionsToRollback.size()); 419 420 for (final var entry : this.sessionsToRollback.entrySet()) { 421 final var id = entry.getKey(); 422 final var session = entry.getValue(); 423 424 try { 425 session.rollback(); 426 } catch (final Exception e) { 427 rollbackFailures.add(String.format("Failed to rollback object <%s> in session <%s>: %s", 428 id, session.sessionId(), e.getMessage())); 429 } 430 } 431 432 try { 433 fedoraOcflIndex.rollback(transaction); 434 } catch (final Exception e) { 435 rollbackFailures.add(String.format("Failed to rollback OCFL index updates in transaction <%s>: %s", 436 transaction, e.getMessage())); 437 } 438 439 //throw an exception if any sessions could not be rolled back. 440 if (rollbackFailures.size() > 0) { 441 state = State.ROLLBACK_FAILED; 442 final StringBuilder builder = new StringBuilder() 443 .append("Unable to rollback storage session ") 444 .append(transaction) 445 .append(" completely due to the following errors: \n"); 446 447 for (final String failures : rollbackFailures) { 448 builder.append("\t").append(failures).append("\n"); 449 } 450 451 throw new PersistentStorageException(builder.toString()); 452 } 453 } 454 455 /** 456 * Check if we are in a read-only session. 457 * 458 * @return whether we are read-only (ie. no transaction). 459 */ 460 private boolean isReadOnly() { 461 return this.transaction.isReadOnly(); 462 } 463 464 /** 465 * Utility to throw exception if trying to perform write operation on read-only session. 466 */ 467 private void actionNeedsWrite() throws PersistentStorageException { 468 if (isReadOnly()) { 469 throw new PersistentStorageException("Session is read-only"); 470 } 471 } 472 473 /** 474 * Returns the RDF topic to be returned for a given resource identifier 475 * For example: passing info:fedora/resource1/fcr:metadata would return 476 * info:fedora/resource1 since info:fedora/resource1 would be the expected 477 * topic. 478 * @param fedoraIdentifier The fedora identifier 479 * @return The resolved topic 480 */ 481 private FedoraId resolveTopic(final FedoraId fedoraIdentifier) { 482 if (fedoraIdentifier.isDescription()) { 483 return fedoraIdentifier.asBaseId(); 484 } else { 485 return fedoraIdentifier; 486 } 487 } 488 489 @Override 490 public String toString() { 491 return "OcflPersistentStorageSession{" + 492 "sessionId='" + transaction + '\'' + 493 ", state=" + state + 494 '}'; 495 } 496 497}