001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.fcrepo.persistence.ocfl.impl;
020
021import io.micrometer.core.instrument.Metrics;
022import io.micrometer.core.instrument.Timer;
023import org.fcrepo.common.lang.CheckedRunnable;
024import org.fcrepo.common.metrics.MetricsHelper;
025import org.fcrepo.persistence.api.exceptions.PersistentItemNotFoundException;
026import org.fcrepo.persistence.api.exceptions.PersistentStorageException;
027import org.fcrepo.storage.ocfl.CommitType;
028import org.fcrepo.storage.ocfl.OcflObjectSession;
029import org.fcrepo.storage.ocfl.OcflVersionInfo;
030import org.fcrepo.storage.ocfl.ResourceContent;
031import org.fcrepo.storage.ocfl.ResourceHeaders;
032import org.fcrepo.storage.ocfl.exception.NotFoundException;
033
034import java.io.InputStream;
035import java.time.OffsetDateTime;
036import java.util.List;
037import java.util.concurrent.Callable;
038import java.util.stream.Stream;
039
040/**
041 * Wrapper around an OcflObjectSession to convert exceptions into fcrepo exceptions and time operations
042 *
043 * @author pwinckles
044 */
045public class FcrepoOcflObjectSessionWrapper implements OcflObjectSession {
046
047    private final OcflObjectSession inner;
048
049    private static final String METRIC_NAME = "fcrepo.storage.ocfl.object";
050    private static final String OPERATION = "operation";
051    private static final Timer writeTimer = Metrics.timer(METRIC_NAME, OPERATION, "write");
052    private static final Timer writeHeadersTimer = Metrics.timer(METRIC_NAME, OPERATION, "writeHeaders");
053    private static final Timer deleteContentTimer = Metrics.timer(METRIC_NAME, OPERATION, "deleteContent");
054    private static final Timer deleteResourceTimer = Metrics.timer(METRIC_NAME, OPERATION, "deleteResource");
055    private static final Timer containsResourceTimer = Metrics.timer(METRIC_NAME, OPERATION, "containsResource");
056    private static final Timer readHeadersTimer = Metrics.timer(METRIC_NAME, OPERATION, "readHeaders");
057    private static final Timer readContentTimer = Metrics.timer(METRIC_NAME, OPERATION, "readContent");
058    private static final Timer listVersionsTimer = Metrics.timer(METRIC_NAME, OPERATION, "listVersions");
059    private static final Timer commitTimer = Metrics.timer(METRIC_NAME, OPERATION, "commit");
060
061    /**
062     * @param inner the session to wrap
063     */
064    public FcrepoOcflObjectSessionWrapper(final OcflObjectSession inner) {
065        this.inner = inner;
066    }
067
068    @Override
069    public String sessionId() {
070        return inner.sessionId();
071    }
072
073    @Override
074    public String ocflObjectId() {
075        return inner.ocflObjectId();
076    }
077
078    @Override
079    public ResourceHeaders writeResource(final ResourceHeaders headers, final InputStream content) {
080        return MetricsHelper.time(writeTimer, () -> {
081            return exec(() -> inner.writeResource(headers, content));
082        });
083    }
084
085    @Override
086    public void writeHeaders(final ResourceHeaders headers) {
087        writeHeadersTimer.record(() -> {
088            exec(() -> inner.writeHeaders(headers));
089        });
090    }
091
092    @Override
093    public void deleteContentFile(final ResourceHeaders headers) {
094        deleteContentTimer.record(() -> {
095            exec(() -> inner.deleteContentFile(headers));
096        });
097    }
098
099    @Override
100    public void deleteResource(final String resourceId) {
101        deleteResourceTimer.record(() -> {
102            exec(() -> inner.deleteResource(resourceId));
103        });
104    }
105
106    @Override
107    public boolean containsResource(final String resourceId) {
108        return MetricsHelper.time(containsResourceTimer, () -> {
109            return exec(() -> inner.containsResource(resourceId));
110        });
111    }
112
113    @Override
114    public ResourceHeaders readHeaders(final String resourceId) {
115        return MetricsHelper.time(readHeadersTimer, () -> {
116            return exec(() -> inner.readHeaders(resourceId));
117        });
118    }
119
120    @Override
121    public ResourceHeaders readHeaders(final String resourceId, final String versionNumber) {
122        return MetricsHelper.time(readHeadersTimer, () -> {
123            return exec(() -> inner.readHeaders(resourceId, versionNumber));
124        });
125    }
126
127    @Override
128    public ResourceContent readContent(final String resourceId) {
129        return MetricsHelper.time(readContentTimer, () -> {
130            return exec(() -> inner.readContent(resourceId));
131        });
132    }
133
134    @Override
135    public ResourceContent readContent(final String resourceId, final String versionNumber) {
136        return MetricsHelper.time(readContentTimer, () -> {
137            return exec(() -> inner.readContent(resourceId, versionNumber));
138        });
139    }
140
141    @Override
142    public List<OcflVersionInfo> listVersions(final String resourceId) {
143        return MetricsHelper.time(listVersionsTimer, () -> {
144            return exec(() -> inner.listVersions(resourceId));
145        });
146    }
147
148    @Override
149    public Stream<ResourceHeaders> streamResourceHeaders() {
150        return exec(inner::streamResourceHeaders);
151    }
152
153    @Override
154    public void versionCreationTimestamp(final OffsetDateTime timestamp) {
155        inner.versionCreationTimestamp(timestamp);
156    }
157
158    @Override
159    public void versionAuthor(final String name, final String address) {
160        inner.versionAuthor(name, address);
161    }
162
163    @Override
164    public void versionMessage(final String message) {
165        inner.versionMessage(message);
166    }
167
168    @Override
169    public void commitType(final CommitType commitType) {
170        inner.commitType(commitType);
171    }
172
173    @Override
174    public void commit() {
175        commitTimer.record(() -> {
176            exec(inner::commit);
177        });
178    }
179
180    @Override
181    public void rollback() {
182        exec(inner::rollback);
183    }
184
185    @Override
186    public void abort() {
187        exec(inner::abort);
188    }
189
190    @Override
191    public boolean isOpen() {
192        return inner.isOpen();
193    }
194
195    @Override
196    public void close() {
197        exec(inner::close);
198    }
199
200    private <T> T exec(final Callable<T> callable) throws PersistentStorageException {
201        try {
202            return callable.call();
203        } catch (final NotFoundException e) {
204            throw new PersistentItemNotFoundException(e.getMessage(), e);
205        } catch (final Exception e) {
206            throw new PersistentStorageException(e.getMessage(), e);
207        }
208    }
209
210    private void exec(final CheckedRunnable runnable) throws PersistentStorageException {
211        try {
212            runnable.run();
213        } catch (final NotFoundException e) {
214            throw new PersistentItemNotFoundException(e.getMessage(), e);
215        } catch (final Exception e) {
216            throw new PersistentStorageException(e.getMessage(), e);
217        }
218    }
219
220}