001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.persistence.ocfl.impl;
019
020import static java.lang.String.format;
021import static java.util.concurrent.TimeUnit.MILLISECONDS;
022import static org.apache.jena.graph.NodeFactory.createURI;
023import static org.apache.jena.rdf.model.ModelFactory.createDefaultModel;
024
025import java.io.IOException;
026import java.io.InputStream;
027import java.time.Instant;
028import java.util.ArrayList;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.TreeMap;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.Phaser;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.stream.Collectors;
039
040import org.fcrepo.kernel.api.RdfStream;
041import org.fcrepo.kernel.api.Transaction;
042import org.fcrepo.kernel.api.identifiers.FedoraId;
043import org.fcrepo.kernel.api.models.ResourceHeaders;
044import org.fcrepo.kernel.api.operations.ResourceOperation;
045import org.fcrepo.kernel.api.rdf.DefaultRdfStream;
046import org.fcrepo.persistence.api.PersistentStorageSession;
047import org.fcrepo.persistence.api.exceptions.PersistentItemNotFoundException;
048import org.fcrepo.persistence.api.exceptions.PersistentSessionClosedException;
049import org.fcrepo.persistence.api.exceptions.PersistentStorageException;
050import org.fcrepo.persistence.ocfl.api.FedoraOcflMappingNotFoundException;
051import org.fcrepo.persistence.ocfl.api.FedoraToOcflObjectIndex;
052import org.fcrepo.persistence.ocfl.api.Persister;
053import org.fcrepo.storage.ocfl.OcflObjectSession;
054import org.fcrepo.storage.ocfl.OcflObjectSessionFactory;
055import org.fcrepo.storage.ocfl.OcflVersionInfo;
056
057import org.apache.jena.rdf.model.Model;
058import org.apache.jena.riot.RDFDataMgr;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import com.github.benmanes.caffeine.cache.Caffeine;
063
064/**
065 * OCFL Persistent Storage class.
066 *
067 * @author whikloj
068 * @since 2019-09-20
069 */
070public class OcflPersistentStorageSession implements PersistentStorageSession {
071
072    private static final Logger LOGGER = LoggerFactory.getLogger(OcflPersistentStorageSession.class);
073
074    private static final long AWAIT_TIMEOUT = 30000L;
075
076    /**
077     * Externally generated Transaction for the session.
078     */
079    private final Transaction transaction;
080
081    private final FedoraToOcflObjectIndex fedoraOcflIndex;
082
083    private final Map<String, OcflObjectSession> sessionMap;
084
085    private final ReindexService reindexSerivce;
086
087    private Map<String, OcflObjectSession> sessionsToRollback;
088
089    private final Phaser phaser = new Phaser();
090
091    private final List<Persister> persisterList = new ArrayList<>();
092
093    private State state = State.COMMIT_NOT_STARTED;
094
095    private final OcflObjectSessionFactory objectSessionFactory;
096
097    private enum State {
098        COMMIT_NOT_STARTED(true),
099        PREPARE_STARTED(false),
100        PREPARED(true),
101        PREPARE_FAILED(true),
102        COMMIT_STARTED(false),
103        COMMITTED(true),
104        COMMIT_FAILED(true),
105        ROLLING_BACK(false),
106        ROLLED_BACK(false),
107        ROLLBACK_FAILED(false);
108
109        final boolean rollbackAllowed;
110
111        State(final boolean rollbackAllowed) {
112            this.rollbackAllowed = rollbackAllowed;
113        }
114
115    }
116
117    /**
118     * Constructor
119     *
120     * @param tx                   the transaction.
121     * @param fedoraOcflIndex      the index
122     * @param objectSessionFactory the session factory
123     */
124    protected OcflPersistentStorageSession(final Transaction tx,
125                                           final FedoraToOcflObjectIndex fedoraOcflIndex,
126                                           final OcflObjectSessionFactory objectSessionFactory,
127                                           final ReindexService reindexService) {
128        this.transaction = tx;
129        this.fedoraOcflIndex = fedoraOcflIndex;
130        this.objectSessionFactory = objectSessionFactory;
131        this.reindexSerivce = reindexService;
132        this.sessionsToRollback = new HashMap<>();
133
134        if (!tx.isReadOnly()) {
135            this.sessionMap = new ConcurrentHashMap<>();
136        } else {
137            // The read-only session is never closed, so it needs to periodically expire object sessions
138            this.sessionMap = Caffeine.newBuilder()
139                    .maximumSize(512)
140                    .expireAfterAccess(10, TimeUnit.MINUTES)
141                    .<String, OcflObjectSession>build()
142                    .asMap();
143        }
144
145        //load the persister list if empty
146        persisterList.add(new CreateRdfSourcePersister(this.fedoraOcflIndex));
147        persisterList.add(new UpdateRdfSourcePersister(this.fedoraOcflIndex));
148        persisterList.add(new CreateNonRdfSourcePersister(this.fedoraOcflIndex));
149        persisterList.add(new UpdateNonRdfSourcePersister(this.fedoraOcflIndex));
150        persisterList.add(new DeleteResourcePersister(this.fedoraOcflIndex));
151        persisterList.add(new CreateVersionPersister(this.fedoraOcflIndex));
152        persisterList.add(new PurgeResourcePersister(this.fedoraOcflIndex));
153        persisterList.add(new ReindexResourcePersister(this.reindexSerivce));
154
155    }
156
157    @Override
158    public String getId() {
159        return this.transaction.getId();
160    }
161
162    @Override
163    public void persist(final ResourceOperation operation) throws PersistentStorageException {
164        actionNeedsWrite();
165        ensureCommitNotStarted();
166
167        try {
168            phaser.register();
169
170            //resolve the persister based on the operation
171            final var persister = persisterList.stream().filter(p -> p.handle(operation)).findFirst().orElse(null);
172
173            if (persister == null) {
174                throw new UnsupportedOperationException(format("The %s is not yet supported", operation.getClass()));
175            }
176
177            //perform the operation
178            persister.persist(this, operation);
179
180        } finally {
181            phaser.arriveAndDeregister();
182        }
183
184    }
185
186    private void ensureCommitNotStarted() throws PersistentSessionClosedException {
187        if (!state.equals(State.COMMIT_NOT_STARTED)) {
188            throw new PersistentSessionClosedException(
189                    String.format("Storage session %s is already closed", transaction));
190        }
191    }
192
193    private void ensurePrepared() throws PersistentSessionClosedException {
194        if (!state.equals(State.PREPARED)) {
195            throw new PersistentStorageException(
196                    String.format("Storage session %s cannot be committed because it is not in the correct state: %s",
197                            transaction, state));
198        }
199    }
200
201    OcflObjectSession findOrCreateSession(final String ocflId) {
202        return this.sessionMap.computeIfAbsent(ocflId, key -> {
203            return new FcrepoOcflObjectSessionWrapper(this.objectSessionFactory.newSession(key));
204        });
205    }
206
207    @Override
208    public ResourceHeaders getHeaders(final FedoraId identifier, final Instant version)
209            throws PersistentStorageException {
210        ensureCommitNotStarted();
211
212        final FedoraOcflMapping mapping = getFedoraOcflMapping(identifier);
213        final OcflObjectSession objSession = findOrCreateSession(mapping.getOcflObjectId());
214
215        final var versionId = resolveVersionNumber(objSession, identifier, version);
216        final var headers = objSession.readHeaders(identifier.getResourceId(), versionId);
217
218        return new ResourceHeadersAdapter(headers).asKernelHeaders();
219    }
220
221    private FedoraOcflMapping getFedoraOcflMapping(final FedoraId identifier)
222            throws PersistentStorageException {
223        try {
224            return fedoraOcflIndex.getMapping(transaction, identifier);
225        } catch (final FedoraOcflMappingNotFoundException e) {
226            throw new PersistentItemNotFoundException(String.format("Resource %s not found",
227                    identifier.getFullIdPath()), e);
228        }
229    }
230
231    @Override
232    public RdfStream getTriples(final FedoraId identifier, final Instant version)
233            throws PersistentStorageException {
234        ensureCommitNotStarted();
235
236        try (final InputStream is = getBinaryContent(identifier, version)) {
237            final Model model = createDefaultModel();
238            RDFDataMgr.read(model, is, OcflPersistentStorageUtils.getRdfFormat().getLang());
239            final FedoraId topic = resolveTopic(identifier);
240            return DefaultRdfStream.fromModel(createURI(topic.getFullId()), model);
241        } catch (final IOException ex) {
242            throw new PersistentStorageException(format("unable to read %s ;  version = %s", identifier, version), ex);
243        }
244    }
245
246    @Override
247    public List<Instant> listVersions(final FedoraId fedoraIdentifier)
248            throws PersistentStorageException {
249        final var mapping = getFedoraOcflMapping(fedoraIdentifier);
250        final var objSession = findOrCreateSession(mapping.getOcflObjectId());
251
252        return objSession.listVersions(fedoraIdentifier.getResourceId()).stream()
253                .map(OcflVersionInfo::getCreated)
254                .collect(Collectors.toList());
255    }
256
257    @Override
258    public InputStream getBinaryContent(final FedoraId identifier, final Instant version)
259            throws PersistentStorageException {
260        ensureCommitNotStarted();
261
262        final var mapping = getFedoraOcflMapping(identifier);
263        final var objSession = findOrCreateSession(mapping.getOcflObjectId());
264
265        final var versionNumber = resolveVersionNumber(objSession, identifier, version);
266
267        return objSession.readContent(identifier.getResourceId(), versionNumber)
268                .getContentStream()
269                .orElseThrow(() -> new PersistentItemNotFoundException("No binary content found for resource "
270                        + identifier.getFullId()));
271    }
272
273    @Override
274    public synchronized void prepare() {
275        ensureCommitNotStarted();
276        if (isReadOnly()) {
277            // No changes to commit.
278            return;
279        }
280
281        this.state = State.PREPARE_STARTED;
282        LOGGER.debug("Starting storage session {} prepare for commit", transaction);
283
284        if (this.phaser.getRegisteredParties() > 0) {
285            this.phaser.awaitAdvance(0);
286        }
287
288        LOGGER.trace("All persisters are complete in session {}", transaction);
289
290        try {
291            fedoraOcflIndex.commit(transaction);
292            state = State.PREPARED;
293        } catch (final RuntimeException e) {
294            state = State.PREPARE_FAILED;
295            throw new PersistentStorageException(String.format("Failed to prepare storage session <%s> for commit",
296                    transaction), e);
297        }
298    }
299
300    @Override
301    public synchronized void commit() throws PersistentStorageException {
302        ensurePrepared();
303        if (isReadOnly()) {
304            // No changes to commit.
305            return;
306        }
307
308        this.state = State.COMMIT_STARTED;
309        LOGGER.debug("Starting storage session {} commit", transaction);
310
311        // order map for testing
312        final var sessions = new TreeMap<>(sessionMap);
313        commitObjectSessions(sessions);
314
315        LOGGER.debug("Committed storage session {}", transaction);
316    }
317
318    private void commitObjectSessions(final Map<String, OcflObjectSession> sessions)
319            throws PersistentStorageException {
320        this.sessionsToRollback = new HashMap<>(sessionMap.size());
321
322        for (final var entry : sessions.entrySet()) {
323            final var id = entry.getKey();
324            final var session = entry.getValue();
325            try {
326                session.commit();
327                sessionsToRollback.put(id, session);
328            } catch (final Exception e) {
329                this.state = State.COMMIT_FAILED;
330                throw new PersistentStorageException(String.format("Failed to commit object <%s> in session <%s>",
331                        id, transaction), e);
332            }
333        }
334
335        state = State.COMMITTED;
336    }
337
338    @Override
339    public void rollback() throws PersistentStorageException {
340        if (isReadOnly()) {
341            // No changes to rollback
342            return;
343        }
344
345        if (!state.rollbackAllowed) {
346            throw new PersistentStorageException("This session cannot be rolled back in this state: " + state);
347        }
348
349        final boolean commitWasStarted = this.state != State.COMMIT_NOT_STARTED;
350
351        this.state = State.ROLLING_BACK;
352        LOGGER.debug("Rolling back storage session {}", transaction);
353
354        if (!commitWasStarted) {
355            //if the commit had not been started at the time this method was invoked
356            //we must ensure that all persist operations are complete before we close any
357            //ocfl object sessions. If the commit had been started then this synchronization step
358            //will have already occurred and is thus unnecessary.
359            if (this.phaser.getRegisteredParties() > 0) {
360                try {
361                    this.phaser.awaitAdvanceInterruptibly(0, AWAIT_TIMEOUT, MILLISECONDS);
362                } catch (final InterruptedException | TimeoutException e) {
363                    throw new PersistentStorageException(
364                            "Waiting for operations to complete took too long, rollback failed");
365                }
366            }
367        }
368
369        closeUncommittedSessions();
370
371        if (commitWasStarted) {
372            rollbackCommittedSessions();
373        }
374
375        this.state = State.ROLLED_BACK;
376        LOGGER.trace("Successfully rolled back storage session {}", transaction);
377    }
378
379    /**
380     * Resolve an instant to a version
381     *
382     * @param objSession session
383     * @param fedoraId the FedoraId of the resource
384     * @param version version time
385     * @return name of version
386     * @throws PersistentStorageException thrown if version not found
387     */
388    private String resolveVersionNumber(final OcflObjectSession objSession,
389                                       final FedoraId fedoraId,
390                                       final Instant version)
391            throws PersistentStorageException {
392        if (version != null) {
393            final var versions = objSession.listVersions(fedoraId.getResourceId());
394            // reverse order so that the most recent version is matched first
395            Collections.reverse(versions);
396            return versions.stream()
397                    .filter(vd -> vd.getCreated().equals(version))
398                    .map(OcflVersionInfo::getVersionNumber)
399                    .findFirst()
400                    .orElseThrow(() -> {
401                        return new PersistentItemNotFoundException(format(
402                                "There is no version in %s with a created date matching %s",
403                                fedoraId, version));
404                    });
405        }
406
407        return null;
408    }
409
410    private void closeUncommittedSessions() {
411        this.sessionMap.entrySet().stream()
412                .filter(entry -> !sessionsToRollback.containsKey(entry.getKey()))
413                .map(Map.Entry::getValue)
414                .forEach(OcflObjectSession::abort);
415    }
416
417    private void rollbackCommittedSessions() throws PersistentStorageException {
418        final List<String> rollbackFailures = new ArrayList<>(this.sessionsToRollback.size());
419
420        for (final var entry : this.sessionsToRollback.entrySet()) {
421            final var id = entry.getKey();
422            final var session = entry.getValue();
423
424            try {
425                session.rollback();
426            } catch (final Exception e) {
427                rollbackFailures.add(String.format("Failed to rollback object <%s> in session <%s>: %s",
428                        id, session.sessionId(), e.getMessage()));
429            }
430        }
431
432        try {
433            fedoraOcflIndex.rollback(transaction);
434        } catch (final Exception e) {
435            rollbackFailures.add(String.format("Failed to rollback OCFL index updates in transaction <%s>: %s",
436                    transaction, e.getMessage()));
437        }
438
439        //throw an exception if any sessions could not be rolled back.
440        if (rollbackFailures.size() > 0) {
441            state = State.ROLLBACK_FAILED;
442            final StringBuilder builder = new StringBuilder()
443                    .append("Unable to rollback storage session ")
444                    .append(transaction)
445                    .append(" completely due to the following errors: \n");
446
447            for (final String failures : rollbackFailures) {
448                builder.append("\t").append(failures).append("\n");
449            }
450
451            throw new PersistentStorageException(builder.toString());
452        }
453    }
454
455    /**
456     * Check if we are in a read-only session.
457     *
458     * @return whether we are read-only (ie. no transaction).
459     */
460    private boolean isReadOnly() {
461        return this.transaction.isReadOnly();
462    }
463
464    /**
465     * Utility to throw exception if trying to perform write operation on read-only session.
466     */
467    private void actionNeedsWrite() throws PersistentStorageException {
468        if (isReadOnly()) {
469            throw new PersistentStorageException("Session is read-only");
470        }
471    }
472
473    /**
474     * Returns the RDF topic to be returned for a given resource identifier
475     * For example:  passing info:fedora/resource1/fcr:metadata would return
476     *  info:fedora/resource1 since  info:fedora/resource1 would be the expected
477     *  topic.
478     * @param fedoraIdentifier The fedora identifier
479     * @return The resolved topic
480     */
481    private FedoraId resolveTopic(final FedoraId fedoraIdentifier) {
482        if (fedoraIdentifier.isDescription()) {
483            return fedoraIdentifier.asBaseId();
484        } else {
485            return fedoraIdentifier;
486        }
487    }
488
489    @Override
490    public String toString() {
491        return "OcflPersistentStorageSession{" +
492                "sessionId='" + transaction + '\'' +
493                ", state=" + state +
494                '}';
495    }
496
497}