001/* 002 * The contents of this file are subject to the license and copyright 003 * detailed in the LICENSE and NOTICE files at the root of the source 004 * tree. 005 */ 006package org.fcrepo.persistence.ocfl.impl; 007 008import static java.lang.String.format; 009import static java.util.concurrent.TimeUnit.MILLISECONDS; 010import static org.apache.jena.graph.NodeFactory.createURI; 011import static org.apache.jena.rdf.model.ModelFactory.createDefaultModel; 012 013import java.io.IOException; 014import java.io.InputStream; 015import java.time.Instant; 016import java.util.ArrayList; 017import java.util.Collections; 018import java.util.HashMap; 019import java.util.List; 020import java.util.Map; 021import java.util.TreeMap; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.Phaser; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.TimeoutException; 026import java.util.stream.Collectors; 027 028import org.fcrepo.kernel.api.RdfStream; 029import org.fcrepo.kernel.api.Transaction; 030import org.fcrepo.kernel.api.identifiers.FedoraId; 031import org.fcrepo.kernel.api.models.ResourceHeaders; 032import org.fcrepo.kernel.api.operations.ResourceOperation; 033import org.fcrepo.kernel.api.rdf.DefaultRdfStream; 034import org.fcrepo.persistence.api.PersistentStorageSession; 035import org.fcrepo.persistence.api.exceptions.PersistentItemNotFoundException; 036import org.fcrepo.persistence.api.exceptions.PersistentSessionClosedException; 037import org.fcrepo.persistence.api.exceptions.PersistentStorageException; 038import org.fcrepo.persistence.ocfl.api.FedoraOcflMappingNotFoundException; 039import org.fcrepo.persistence.ocfl.api.FedoraToOcflObjectIndex; 040import org.fcrepo.persistence.ocfl.api.Persister; 041import org.fcrepo.storage.ocfl.OcflObjectSession; 042import org.fcrepo.storage.ocfl.OcflObjectSessionFactory; 043import org.fcrepo.storage.ocfl.OcflVersionInfo; 044 045import org.apache.jena.rdf.model.Model; 046import org.apache.jena.riot.RDFDataMgr; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import com.github.benmanes.caffeine.cache.Caffeine; 051 052/** 053 * OCFL Persistent Storage class. 054 * 055 * @author whikloj 056 * @since 2019-09-20 057 */ 058public class OcflPersistentStorageSession implements PersistentStorageSession { 059 060 private static final Logger LOGGER = LoggerFactory.getLogger(OcflPersistentStorageSession.class); 061 062 private static final long AWAIT_TIMEOUT = 30000L; 063 064 /** 065 * Externally generated Transaction for the session. 066 */ 067 private final Transaction transaction; 068 069 private final FedoraToOcflObjectIndex fedoraOcflIndex; 070 071 private final Map<String, OcflObjectSession> sessionMap; 072 073 private final ReindexService reindexSerivce; 074 075 private Map<String, OcflObjectSession> sessionsToRollback; 076 077 private final Phaser phaser = new Phaser(); 078 079 private final List<Persister> persisterList = new ArrayList<>(); 080 081 private State state = State.COMMIT_NOT_STARTED; 082 083 private final OcflObjectSessionFactory objectSessionFactory; 084 085 private enum State { 086 COMMIT_NOT_STARTED(true), 087 PREPARE_STARTED(false), 088 PREPARED(true), 089 PREPARE_FAILED(true), 090 COMMIT_STARTED(false), 091 COMMITTED(true), 092 COMMIT_FAILED(true), 093 ROLLING_BACK(false), 094 ROLLED_BACK(false), 095 ROLLBACK_FAILED(false); 096 097 final boolean rollbackAllowed; 098 099 State(final boolean rollbackAllowed) { 100 this.rollbackAllowed = rollbackAllowed; 101 } 102 103 } 104 105 /** 106 * Constructor 107 * 108 * @param tx the transaction. 109 * @param fedoraOcflIndex the index 110 * @param objectSessionFactory the session factory 111 */ 112 protected OcflPersistentStorageSession(final Transaction tx, 113 final FedoraToOcflObjectIndex fedoraOcflIndex, 114 final OcflObjectSessionFactory objectSessionFactory, 115 final ReindexService reindexService) { 116 this.transaction = tx; 117 this.fedoraOcflIndex = fedoraOcflIndex; 118 this.objectSessionFactory = objectSessionFactory; 119 this.reindexSerivce = reindexService; 120 this.sessionsToRollback = new HashMap<>(); 121 122 if (!tx.isReadOnly()) { 123 this.sessionMap = new ConcurrentHashMap<>(); 124 } else { 125 // The read-only session is never closed, so it needs to periodically expire object sessions 126 this.sessionMap = Caffeine.newBuilder() 127 .maximumSize(512) 128 .expireAfterAccess(10, TimeUnit.MINUTES) 129 .<String, OcflObjectSession>build() 130 .asMap(); 131 } 132 133 //load the persister list if empty 134 persisterList.add(new CreateRdfSourcePersister(this.fedoraOcflIndex)); 135 persisterList.add(new UpdateRdfSourcePersister(this.fedoraOcflIndex)); 136 persisterList.add(new UpdateNonRdfSourceHeadersPersister(this.fedoraOcflIndex)); 137 persisterList.add(new CreateNonRdfSourcePersister(this.fedoraOcflIndex)); 138 persisterList.add(new UpdateNonRdfSourcePersister(this.fedoraOcflIndex)); 139 persisterList.add(new DeleteResourcePersister(this.fedoraOcflIndex)); 140 persisterList.add(new CreateVersionPersister(this.fedoraOcflIndex)); 141 persisterList.add(new PurgeResourcePersister(this.fedoraOcflIndex)); 142 persisterList.add(new ReindexResourcePersister(this.reindexSerivce)); 143 144 } 145 146 @Override 147 public String getId() { 148 return this.transaction.getId(); 149 } 150 151 @Override 152 public void persist(final ResourceOperation operation) throws PersistentStorageException { 153 actionNeedsWrite(); 154 ensureCommitNotStarted(); 155 156 try { 157 phaser.register(); 158 159 //resolve the persister based on the operation 160 final var persister = persisterList.stream().filter(p -> p.handle(operation)).findFirst().orElse(null); 161 162 if (persister == null) { 163 throw new UnsupportedOperationException(format("The %s is not yet supported", operation.getClass())); 164 } 165 166 //perform the operation 167 persister.persist(this, operation); 168 169 } finally { 170 phaser.arriveAndDeregister(); 171 } 172 173 } 174 175 private void ensureCommitNotStarted() throws PersistentSessionClosedException { 176 if (!state.equals(State.COMMIT_NOT_STARTED)) { 177 throw new PersistentSessionClosedException( 178 String.format("Storage session %s is already closed", transaction)); 179 } 180 } 181 182 private void ensurePrepared() throws PersistentSessionClosedException { 183 if (!state.equals(State.PREPARED)) { 184 throw new PersistentStorageException( 185 String.format("Storage session %s cannot be committed because it is not in the correct state: %s", 186 transaction, state)); 187 } 188 } 189 190 OcflObjectSession findOrCreateSession(final String ocflId) { 191 return this.sessionMap.computeIfAbsent(ocflId, key -> { 192 return new FcrepoOcflObjectSessionWrapper(this.objectSessionFactory.newSession(key)); 193 }); 194 } 195 196 @Override 197 public ResourceHeaders getHeaders(final FedoraId identifier, final Instant version) 198 throws PersistentStorageException { 199 ensureCommitNotStarted(); 200 201 final FedoraOcflMapping mapping = getFedoraOcflMapping(identifier); 202 final OcflObjectSession objSession = findOrCreateSession(mapping.getOcflObjectId()); 203 204 final var versionId = resolveVersionNumber(objSession, identifier, version); 205 final var headers = objSession.readHeaders(identifier.getResourceId(), versionId); 206 207 return new ResourceHeadersAdapter(headers).asKernelHeaders(); 208 } 209 210 private FedoraOcflMapping getFedoraOcflMapping(final FedoraId identifier) 211 throws PersistentStorageException { 212 try { 213 return fedoraOcflIndex.getMapping(transaction, identifier); 214 } catch (final FedoraOcflMappingNotFoundException e) { 215 throw new PersistentItemNotFoundException(String.format("Resource %s not found", 216 identifier.getFullIdPath()), e); 217 } 218 } 219 220 @Override 221 public RdfStream getTriples(final FedoraId identifier, final Instant version) 222 throws PersistentStorageException { 223 ensureCommitNotStarted(); 224 225 LOGGER.debug("Getting triples for {} at {}", identifier, version); 226 227 try (final InputStream is = getBinaryContent(identifier, version)) { 228 final Model model = createDefaultModel(); 229 RDFDataMgr.read(model, is, OcflPersistentStorageUtils.getRdfFormat().getLang()); 230 final FedoraId topic = resolveTopic(identifier); 231 return DefaultRdfStream.fromModel(createURI(topic.getFullId()), model); 232 } catch (final IOException ex) { 233 throw new PersistentStorageException(format("unable to read %s ; version = %s", identifier, version), ex); 234 } 235 } 236 237 @Override 238 public List<Instant> listVersions(final FedoraId fedoraIdentifier) 239 throws PersistentStorageException { 240 final var mapping = getFedoraOcflMapping(fedoraIdentifier); 241 final var objSession = findOrCreateSession(mapping.getOcflObjectId()); 242 243 return objSession.listVersions(fedoraIdentifier.getResourceId()).stream() 244 .map(OcflVersionInfo::getCreated) 245 .collect(Collectors.toList()); 246 } 247 248 @Override 249 public InputStream getBinaryContent(final FedoraId identifier, final Instant version) 250 throws PersistentStorageException { 251 ensureCommitNotStarted(); 252 253 final var mapping = getFedoraOcflMapping(identifier); 254 final var objSession = findOrCreateSession(mapping.getOcflObjectId()); 255 256 final var versionNumber = resolveVersionNumber(objSession, identifier, version); 257 258 return objSession.readContent(identifier.getResourceId(), versionNumber) 259 .getContentStream() 260 .orElseThrow(() -> new PersistentItemNotFoundException("No binary content found for resource " 261 + identifier.getFullId())); 262 } 263 264 @Override 265 public synchronized void prepare() { 266 ensureCommitNotStarted(); 267 if (isReadOnly()) { 268 // No changes to commit. 269 return; 270 } 271 272 this.state = State.PREPARE_STARTED; 273 LOGGER.debug("Starting storage session {} prepare for commit", transaction); 274 275 if (this.phaser.getRegisteredParties() > 0) { 276 this.phaser.awaitAdvance(0); 277 } 278 279 LOGGER.trace("All persisters are complete in session {}", transaction); 280 281 try { 282 fedoraOcflIndex.commit(transaction); 283 state = State.PREPARED; 284 } catch (final RuntimeException e) { 285 state = State.PREPARE_FAILED; 286 throw new PersistentStorageException(String.format("Failed to prepare storage session <%s> for commit", 287 transaction), e); 288 } 289 } 290 291 @Override 292 public synchronized void commit() throws PersistentStorageException { 293 ensurePrepared(); 294 if (isReadOnly()) { 295 // No changes to commit. 296 return; 297 } 298 299 this.state = State.COMMIT_STARTED; 300 LOGGER.debug("Starting storage session {} commit", transaction); 301 302 // order map for testing 303 final var sessions = new TreeMap<>(sessionMap); 304 commitObjectSessions(sessions); 305 306 LOGGER.debug("Committed storage session {}", transaction); 307 } 308 309 private void commitObjectSessions(final Map<String, OcflObjectSession> sessions) 310 throws PersistentStorageException { 311 this.sessionsToRollback = new HashMap<>(sessionMap.size()); 312 313 for (final var entry : sessions.entrySet()) { 314 final var id = entry.getKey(); 315 final var session = entry.getValue(); 316 try { 317 session.commit(); 318 sessionsToRollback.put(id, session); 319 } catch (final Exception e) { 320 this.state = State.COMMIT_FAILED; 321 throw new PersistentStorageException(String.format("Failed to commit object <%s> in session <%s>", 322 id, transaction), e); 323 } 324 } 325 326 state = State.COMMITTED; 327 } 328 329 @Override 330 public void rollback() throws PersistentStorageException { 331 if (isReadOnly()) { 332 // No changes to rollback 333 return; 334 } 335 336 if (!state.rollbackAllowed) { 337 throw new PersistentStorageException("This session cannot be rolled back in this state: " + state); 338 } 339 340 final boolean commitWasStarted = this.state != State.COMMIT_NOT_STARTED; 341 342 this.state = State.ROLLING_BACK; 343 LOGGER.debug("Rolling back storage session {}", transaction); 344 345 if (!commitWasStarted) { 346 //if the commit had not been started at the time this method was invoked 347 //we must ensure that all persist operations are complete before we close any 348 //ocfl object sessions. If the commit had been started then this synchronization step 349 //will have already occurred and is thus unnecessary. 350 if (this.phaser.getRegisteredParties() > 0) { 351 try { 352 this.phaser.awaitAdvanceInterruptibly(0, AWAIT_TIMEOUT, MILLISECONDS); 353 } catch (final InterruptedException | TimeoutException e) { 354 throw new PersistentStorageException( 355 "Waiting for operations to complete took too long, rollback failed"); 356 } 357 } 358 } 359 360 closeUncommittedSessions(); 361 362 if (commitWasStarted) { 363 rollbackCommittedSessions(); 364 } 365 366 this.state = State.ROLLED_BACK; 367 LOGGER.trace("Successfully rolled back storage session {}", transaction); 368 } 369 370 /** 371 * Resolve an instant to a version 372 * 373 * @param objSession session 374 * @param fedoraId the FedoraId of the resource 375 * @param version version time 376 * @return name of version 377 * @throws PersistentStorageException thrown if version not found 378 */ 379 private String resolveVersionNumber(final OcflObjectSession objSession, 380 final FedoraId fedoraId, 381 final Instant version) 382 throws PersistentStorageException { 383 if (version != null) { 384 final var versions = objSession.listVersions(fedoraId.getResourceId()); 385 // reverse order so that the most recent version is matched first 386 Collections.reverse(versions); 387 return versions.stream() 388 .filter(vd -> vd.getCreated().equals(version)) 389 .map(OcflVersionInfo::getVersionNumber) 390 .findFirst() 391 .orElseThrow(() -> { 392 return new PersistentItemNotFoundException(format( 393 "There is no version in %s with a created date matching %s", 394 fedoraId, version)); 395 }); 396 } 397 398 return null; 399 } 400 401 private void closeUncommittedSessions() { 402 this.sessionMap.entrySet().stream() 403 .filter(entry -> !sessionsToRollback.containsKey(entry.getKey())) 404 .map(Map.Entry::getValue) 405 .forEach(OcflObjectSession::abort); 406 } 407 408 private void rollbackCommittedSessions() throws PersistentStorageException { 409 final List<String> rollbackFailures = new ArrayList<>(this.sessionsToRollback.size()); 410 411 for (final var entry : this.sessionsToRollback.entrySet()) { 412 final var id = entry.getKey(); 413 final var session = entry.getValue(); 414 415 try { 416 session.rollback(); 417 } catch (final Exception e) { 418 rollbackFailures.add(String.format("Failed to rollback object <%s> in session <%s>: %s", 419 id, session.sessionId(), e.getMessage())); 420 } 421 } 422 423 try { 424 fedoraOcflIndex.rollback(transaction); 425 } catch (final Exception e) { 426 rollbackFailures.add(String.format("Failed to rollback OCFL index updates in transaction <%s>: %s", 427 transaction, e.getMessage())); 428 } 429 430 //throw an exception if any sessions could not be rolled back. 431 if (rollbackFailures.size() > 0) { 432 state = State.ROLLBACK_FAILED; 433 final StringBuilder builder = new StringBuilder() 434 .append("Unable to rollback storage session ") 435 .append(transaction) 436 .append(" completely due to the following errors: \n"); 437 438 for (final String failures : rollbackFailures) { 439 builder.append("\t").append(failures).append("\n"); 440 } 441 442 throw new PersistentStorageException(builder.toString()); 443 } 444 } 445 446 /** 447 * Check if we are in a read-only session. 448 * 449 * @return whether we are read-only (ie. no transaction). 450 */ 451 private boolean isReadOnly() { 452 return this.transaction.isReadOnly(); 453 } 454 455 /** 456 * Utility to throw exception if trying to perform write operation on read-only session. 457 */ 458 private void actionNeedsWrite() throws PersistentStorageException { 459 if (isReadOnly()) { 460 throw new PersistentStorageException("Session is read-only"); 461 } 462 } 463 464 /** 465 * Returns the RDF topic to be returned for a given resource identifier 466 * For example: passing info:fedora/resource1/fcr:metadata would return 467 * info:fedora/resource1 since info:fedora/resource1 would be the expected 468 * topic. 469 * @param fedoraIdentifier The fedora identifier 470 * @return The resolved topic 471 */ 472 private FedoraId resolveTopic(final FedoraId fedoraIdentifier) { 473 if (fedoraIdentifier.isDescription()) { 474 return fedoraIdentifier.asBaseId(); 475 } else { 476 return fedoraIdentifier; 477 } 478 } 479 480 @Override 481 public String toString() { 482 return "OcflPersistentStorageSession{" + 483 "sessionId='" + transaction + '\'' + 484 ", state=" + state + 485 '}'; 486 } 487 488}