001/*
002 * The contents of this file are subject to the license and copyright
003 * detailed in the LICENSE and NOTICE files at the root of the source
004 * tree.
005 */
006package org.fcrepo.persistence.ocfl.impl;
007
008import static java.lang.String.format;
009import static java.util.concurrent.TimeUnit.MILLISECONDS;
010import static org.apache.jena.graph.NodeFactory.createURI;
011import static org.apache.jena.rdf.model.ModelFactory.createDefaultModel;
012
013import java.io.IOException;
014import java.io.InputStream;
015import java.time.Instant;
016import java.util.ArrayList;
017import java.util.Collections;
018import java.util.HashMap;
019import java.util.List;
020import java.util.Map;
021import java.util.TreeMap;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.Phaser;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.TimeoutException;
026import java.util.stream.Collectors;
027
028import org.fcrepo.kernel.api.RdfStream;
029import org.fcrepo.kernel.api.Transaction;
030import org.fcrepo.kernel.api.identifiers.FedoraId;
031import org.fcrepo.kernel.api.models.ResourceHeaders;
032import org.fcrepo.kernel.api.operations.ResourceOperation;
033import org.fcrepo.kernel.api.rdf.DefaultRdfStream;
034import org.fcrepo.persistence.api.PersistentStorageSession;
035import org.fcrepo.persistence.api.exceptions.PersistentItemNotFoundException;
036import org.fcrepo.persistence.api.exceptions.PersistentSessionClosedException;
037import org.fcrepo.persistence.api.exceptions.PersistentStorageException;
038import org.fcrepo.persistence.ocfl.api.FedoraOcflMappingNotFoundException;
039import org.fcrepo.persistence.ocfl.api.FedoraToOcflObjectIndex;
040import org.fcrepo.persistence.ocfl.api.Persister;
041import org.fcrepo.storage.ocfl.OcflObjectSession;
042import org.fcrepo.storage.ocfl.OcflObjectSessionFactory;
043import org.fcrepo.storage.ocfl.OcflVersionInfo;
044
045import org.apache.jena.rdf.model.Model;
046import org.apache.jena.riot.RDFDataMgr;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import com.github.benmanes.caffeine.cache.Caffeine;
051
052/**
053 * OCFL Persistent Storage class.
054 *
055 * @author whikloj
056 * @since 2019-09-20
057 */
058public class OcflPersistentStorageSession implements PersistentStorageSession {
059
060    private static final Logger LOGGER = LoggerFactory.getLogger(OcflPersistentStorageSession.class);
061
062    private static final long AWAIT_TIMEOUT = 30000L;
063
064    /**
065     * Externally generated Transaction for the session.
066     */
067    private final Transaction transaction;
068
069    private final FedoraToOcflObjectIndex fedoraOcflIndex;
070
071    private final Map<String, OcflObjectSession> sessionMap;
072
073    private final ReindexService reindexSerivce;
074
075    private Map<String, OcflObjectSession> sessionsToRollback;
076
077    private final Phaser phaser = new Phaser();
078
079    private final List<Persister> persisterList = new ArrayList<>();
080
081    private State state = State.COMMIT_NOT_STARTED;
082
083    private final OcflObjectSessionFactory objectSessionFactory;
084
085    private enum State {
086        COMMIT_NOT_STARTED(true),
087        PREPARE_STARTED(false),
088        PREPARED(true),
089        PREPARE_FAILED(true),
090        COMMIT_STARTED(false),
091        COMMITTED(true),
092        COMMIT_FAILED(true),
093        ROLLING_BACK(false),
094        ROLLED_BACK(false),
095        ROLLBACK_FAILED(false);
096
097        final boolean rollbackAllowed;
098
099        State(final boolean rollbackAllowed) {
100            this.rollbackAllowed = rollbackAllowed;
101        }
102
103    }
104
105    /**
106     * Constructor
107     *
108     * @param tx                   the transaction.
109     * @param fedoraOcflIndex      the index
110     * @param objectSessionFactory the session factory
111     */
112    protected OcflPersistentStorageSession(final Transaction tx,
113                                           final FedoraToOcflObjectIndex fedoraOcflIndex,
114                                           final OcflObjectSessionFactory objectSessionFactory,
115                                           final ReindexService reindexService) {
116        this.transaction = tx;
117        this.fedoraOcflIndex = fedoraOcflIndex;
118        this.objectSessionFactory = objectSessionFactory;
119        this.reindexSerivce = reindexService;
120        this.sessionsToRollback = new HashMap<>();
121
122        if (!tx.isReadOnly()) {
123            this.sessionMap = new ConcurrentHashMap<>();
124        } else {
125            // The read-only session is never closed, so it needs to periodically expire object sessions
126            this.sessionMap = Caffeine.newBuilder()
127                    .maximumSize(512)
128                    .expireAfterAccess(10, TimeUnit.MINUTES)
129                    .<String, OcflObjectSession>build()
130                    .asMap();
131        }
132
133        //load the persister list if empty
134        persisterList.add(new CreateRdfSourcePersister(this.fedoraOcflIndex));
135        persisterList.add(new UpdateRdfSourcePersister(this.fedoraOcflIndex));
136        persisterList.add(new UpdateNonRdfSourceHeadersPersister(this.fedoraOcflIndex));
137        persisterList.add(new CreateNonRdfSourcePersister(this.fedoraOcflIndex));
138        persisterList.add(new UpdateNonRdfSourcePersister(this.fedoraOcflIndex));
139        persisterList.add(new DeleteResourcePersister(this.fedoraOcflIndex));
140        persisterList.add(new CreateVersionPersister(this.fedoraOcflIndex));
141        persisterList.add(new PurgeResourcePersister(this.fedoraOcflIndex));
142        persisterList.add(new ReindexResourcePersister(this.reindexSerivce));
143
144    }
145
146    @Override
147    public String getId() {
148        return this.transaction.getId();
149    }
150
151    @Override
152    public void persist(final ResourceOperation operation) throws PersistentStorageException {
153        actionNeedsWrite();
154        ensureCommitNotStarted();
155
156        try {
157            phaser.register();
158
159            //resolve the persister based on the operation
160            final var persister = persisterList.stream().filter(p -> p.handle(operation)).findFirst().orElse(null);
161
162            if (persister == null) {
163                throw new UnsupportedOperationException(format("The %s is not yet supported", operation.getClass()));
164            }
165
166            //perform the operation
167            persister.persist(this, operation);
168
169        } finally {
170            phaser.arriveAndDeregister();
171        }
172
173    }
174
175    private void ensureCommitNotStarted() throws PersistentSessionClosedException {
176        if (!state.equals(State.COMMIT_NOT_STARTED)) {
177            throw new PersistentSessionClosedException(
178                    String.format("Storage session %s is already closed", transaction));
179        }
180    }
181
182    private void ensurePrepared() throws PersistentSessionClosedException {
183        if (!state.equals(State.PREPARED)) {
184            throw new PersistentStorageException(
185                    String.format("Storage session %s cannot be committed because it is not in the correct state: %s",
186                            transaction, state));
187        }
188    }
189
190    OcflObjectSession findOrCreateSession(final String ocflId) {
191        return this.sessionMap.computeIfAbsent(ocflId, key -> {
192            return new FcrepoOcflObjectSessionWrapper(this.objectSessionFactory.newSession(key));
193        });
194    }
195
196    @Override
197    public ResourceHeaders getHeaders(final FedoraId identifier, final Instant version)
198            throws PersistentStorageException {
199        ensureCommitNotStarted();
200
201        final FedoraOcflMapping mapping = getFedoraOcflMapping(identifier);
202        final OcflObjectSession objSession = findOrCreateSession(mapping.getOcflObjectId());
203
204        final var versionId = resolveVersionNumber(objSession, identifier, version);
205        final var headers = objSession.readHeaders(identifier.getResourceId(), versionId);
206
207        return new ResourceHeadersAdapter(headers).asKernelHeaders();
208    }
209
210    private FedoraOcflMapping getFedoraOcflMapping(final FedoraId identifier)
211            throws PersistentStorageException {
212        try {
213            return fedoraOcflIndex.getMapping(transaction, identifier);
214        } catch (final FedoraOcflMappingNotFoundException e) {
215            throw new PersistentItemNotFoundException(String.format("Resource %s not found",
216                    identifier.getFullIdPath()), e);
217        }
218    }
219
220    @Override
221    public RdfStream getTriples(final FedoraId identifier, final Instant version)
222            throws PersistentStorageException {
223        ensureCommitNotStarted();
224
225        LOGGER.debug("Getting triples for {} at {}", identifier, version);
226
227        try (final InputStream is = getBinaryContent(identifier, version)) {
228            final Model model = createDefaultModel();
229            RDFDataMgr.read(model, is, OcflPersistentStorageUtils.getRdfFormat().getLang());
230            final FedoraId topic = resolveTopic(identifier);
231            return DefaultRdfStream.fromModel(createURI(topic.getFullId()), model);
232        } catch (final IOException ex) {
233            throw new PersistentStorageException(format("unable to read %s ;  version = %s", identifier, version), ex);
234        }
235    }
236
237    @Override
238    public List<Instant> listVersions(final FedoraId fedoraIdentifier)
239            throws PersistentStorageException {
240        final var mapping = getFedoraOcflMapping(fedoraIdentifier);
241        final var objSession = findOrCreateSession(mapping.getOcflObjectId());
242
243        return objSession.listVersions(fedoraIdentifier.getResourceId()).stream()
244                .map(OcflVersionInfo::getCreated)
245                .collect(Collectors.toList());
246    }
247
248    @Override
249    public InputStream getBinaryContent(final FedoraId identifier, final Instant version)
250            throws PersistentStorageException {
251        ensureCommitNotStarted();
252
253        final var mapping = getFedoraOcflMapping(identifier);
254        final var objSession = findOrCreateSession(mapping.getOcflObjectId());
255
256        final var versionNumber = resolveVersionNumber(objSession, identifier, version);
257
258        return objSession.readContent(identifier.getResourceId(), versionNumber)
259                .getContentStream()
260                .orElseThrow(() -> new PersistentItemNotFoundException("No binary content found for resource "
261                        + identifier.getFullId()));
262    }
263
264    @Override
265    public synchronized void prepare() {
266        ensureCommitNotStarted();
267        if (isReadOnly()) {
268            // No changes to commit.
269            return;
270        }
271
272        this.state = State.PREPARE_STARTED;
273        LOGGER.debug("Starting storage session {} prepare for commit", transaction);
274
275        if (this.phaser.getRegisteredParties() > 0) {
276            this.phaser.awaitAdvance(0);
277        }
278
279        LOGGER.trace("All persisters are complete in session {}", transaction);
280
281        try {
282            fedoraOcflIndex.commit(transaction);
283            state = State.PREPARED;
284        } catch (final RuntimeException e) {
285            state = State.PREPARE_FAILED;
286            throw new PersistentStorageException(String.format("Failed to prepare storage session <%s> for commit",
287                    transaction), e);
288        }
289    }
290
291    @Override
292    public synchronized void commit() throws PersistentStorageException {
293        ensurePrepared();
294        if (isReadOnly()) {
295            // No changes to commit.
296            return;
297        }
298
299        this.state = State.COMMIT_STARTED;
300        LOGGER.debug("Starting storage session {} commit", transaction);
301
302        // order map for testing
303        final var sessions = new TreeMap<>(sessionMap);
304        commitObjectSessions(sessions);
305
306        LOGGER.debug("Committed storage session {}", transaction);
307    }
308
309    private void commitObjectSessions(final Map<String, OcflObjectSession> sessions)
310            throws PersistentStorageException {
311        this.sessionsToRollback = new HashMap<>(sessionMap.size());
312
313        for (final var entry : sessions.entrySet()) {
314            final var id = entry.getKey();
315            final var session = entry.getValue();
316            try {
317                session.commit();
318                sessionsToRollback.put(id, session);
319            } catch (final Exception e) {
320                this.state = State.COMMIT_FAILED;
321                throw new PersistentStorageException(String.format("Failed to commit object <%s> in session <%s>",
322                        id, transaction), e);
323            }
324        }
325
326        state = State.COMMITTED;
327    }
328
329    @Override
330    public void rollback() throws PersistentStorageException {
331        if (isReadOnly()) {
332            // No changes to rollback
333            return;
334        }
335
336        if (!state.rollbackAllowed) {
337            throw new PersistentStorageException("This session cannot be rolled back in this state: " + state);
338        }
339
340        final boolean commitWasStarted = this.state != State.COMMIT_NOT_STARTED;
341
342        this.state = State.ROLLING_BACK;
343        LOGGER.debug("Rolling back storage session {}", transaction);
344
345        if (!commitWasStarted) {
346            //if the commit had not been started at the time this method was invoked
347            //we must ensure that all persist operations are complete before we close any
348            //ocfl object sessions. If the commit had been started then this synchronization step
349            //will have already occurred and is thus unnecessary.
350            if (this.phaser.getRegisteredParties() > 0) {
351                try {
352                    this.phaser.awaitAdvanceInterruptibly(0, AWAIT_TIMEOUT, MILLISECONDS);
353                } catch (final InterruptedException | TimeoutException e) {
354                    throw new PersistentStorageException(
355                            "Waiting for operations to complete took too long, rollback failed");
356                }
357            }
358        }
359
360        closeUncommittedSessions();
361
362        if (commitWasStarted) {
363            rollbackCommittedSessions();
364        }
365
366        this.state = State.ROLLED_BACK;
367        LOGGER.trace("Successfully rolled back storage session {}", transaction);
368    }
369
370    /**
371     * Resolve an instant to a version
372     *
373     * @param objSession session
374     * @param fedoraId the FedoraId of the resource
375     * @param version version time
376     * @return name of version
377     * @throws PersistentStorageException thrown if version not found
378     */
379    private String resolveVersionNumber(final OcflObjectSession objSession,
380                                       final FedoraId fedoraId,
381                                       final Instant version)
382            throws PersistentStorageException {
383        if (version != null) {
384            final var versions = objSession.listVersions(fedoraId.getResourceId());
385            // reverse order so that the most recent version is matched first
386            Collections.reverse(versions);
387            return versions.stream()
388                    .filter(vd -> vd.getCreated().equals(version))
389                    .map(OcflVersionInfo::getVersionNumber)
390                    .findFirst()
391                    .orElseThrow(() -> {
392                        return new PersistentItemNotFoundException(format(
393                                "There is no version in %s with a created date matching %s",
394                                fedoraId, version));
395                    });
396        }
397
398        return null;
399    }
400
401    private void closeUncommittedSessions() {
402        this.sessionMap.entrySet().stream()
403                .filter(entry -> !sessionsToRollback.containsKey(entry.getKey()))
404                .map(Map.Entry::getValue)
405                .forEach(OcflObjectSession::abort);
406    }
407
408    private void rollbackCommittedSessions() throws PersistentStorageException {
409        final List<String> rollbackFailures = new ArrayList<>(this.sessionsToRollback.size());
410
411        for (final var entry : this.sessionsToRollback.entrySet()) {
412            final var id = entry.getKey();
413            final var session = entry.getValue();
414
415            try {
416                session.rollback();
417            } catch (final Exception e) {
418                rollbackFailures.add(String.format("Failed to rollback object <%s> in session <%s>: %s",
419                        id, session.sessionId(), e.getMessage()));
420            }
421        }
422
423        try {
424            fedoraOcflIndex.rollback(transaction);
425        } catch (final Exception e) {
426            rollbackFailures.add(String.format("Failed to rollback OCFL index updates in transaction <%s>: %s",
427                    transaction, e.getMessage()));
428        }
429
430        //throw an exception if any sessions could not be rolled back.
431        if (rollbackFailures.size() > 0) {
432            state = State.ROLLBACK_FAILED;
433            final StringBuilder builder = new StringBuilder()
434                    .append("Unable to rollback storage session ")
435                    .append(transaction)
436                    .append(" completely due to the following errors: \n");
437
438            for (final String failures : rollbackFailures) {
439                builder.append("\t").append(failures).append("\n");
440            }
441
442            throw new PersistentStorageException(builder.toString());
443        }
444    }
445
446    /**
447     * Check if we are in a read-only session.
448     *
449     * @return whether we are read-only (ie. no transaction).
450     */
451    private boolean isReadOnly() {
452        return this.transaction.isReadOnly();
453    }
454
455    /**
456     * Utility to throw exception if trying to perform write operation on read-only session.
457     */
458    private void actionNeedsWrite() throws PersistentStorageException {
459        if (isReadOnly()) {
460            throw new PersistentStorageException("Session is read-only");
461        }
462    }
463
464    /**
465     * Returns the RDF topic to be returned for a given resource identifier
466     * For example:  passing info:fedora/resource1/fcr:metadata would return
467     *  info:fedora/resource1 since  info:fedora/resource1 would be the expected
468     *  topic.
469     * @param fedoraIdentifier The fedora identifier
470     * @return The resolved topic
471     */
472    private FedoraId resolveTopic(final FedoraId fedoraIdentifier) {
473        if (fedoraIdentifier.isDescription()) {
474            return fedoraIdentifier.asBaseId();
475        } else {
476            return fedoraIdentifier;
477        }
478    }
479
480    @Override
481    public String toString() {
482        return "OcflPersistentStorageSession{" +
483                "sessionId='" + transaction + '\'' +
484                ", state=" + state +
485                '}';
486    }
487
488}