001/* 002 * Licensed to DuraSpace under one or more contributor license agreements. 003 * See the NOTICE file distributed with this work for additional information 004 * regarding copyright ownership. 005 * 006 * DuraSpace licenses this file to you under the Apache License, 007 * Version 2.0 (the "License"); you may not use this file except in 008 * compliance with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.fcrepo.persistence.ocfl.impl; 020 021import io.micrometer.core.instrument.Metrics; 022import io.micrometer.core.instrument.Timer; 023import org.fcrepo.common.lang.CheckedRunnable; 024import org.fcrepo.common.metrics.MetricsHelper; 025import org.fcrepo.persistence.api.exceptions.PersistentItemNotFoundException; 026import org.fcrepo.persistence.api.exceptions.PersistentStorageException; 027import org.fcrepo.storage.ocfl.CommitType; 028import org.fcrepo.storage.ocfl.OcflObjectSession; 029import org.fcrepo.storage.ocfl.OcflVersionInfo; 030import org.fcrepo.storage.ocfl.ResourceContent; 031import org.fcrepo.storage.ocfl.ResourceHeaders; 032import org.fcrepo.storage.ocfl.exception.NotFoundException; 033 034import java.io.InputStream; 035import java.time.OffsetDateTime; 036import java.util.List; 037import java.util.concurrent.Callable; 038import java.util.stream.Stream; 039 040/** 041 * Wrapper around an OcflObjectSession to convert exceptions into fcrepo exceptions and time operations 042 * 043 * @author pwinckles 044 */ 045public class FcrepoOcflObjectSessionWrapper implements OcflObjectSession { 046 047 private final OcflObjectSession inner; 048 049 private static final String METRIC_NAME = "fcrepo.storage.ocfl.object"; 050 private static final String OPERATION = "operation"; 051 private static final Timer writeTimer = Metrics.timer(METRIC_NAME, OPERATION, "write"); 052 private static final Timer writeHeadersTimer = Metrics.timer(METRIC_NAME, OPERATION, "writeHeaders"); 053 private static final Timer deleteContentTimer = Metrics.timer(METRIC_NAME, OPERATION, "deleteContent"); 054 private static final Timer deleteResourceTimer = Metrics.timer(METRIC_NAME, OPERATION, "deleteResource"); 055 private static final Timer containsResourceTimer = Metrics.timer(METRIC_NAME, OPERATION, "containsResource"); 056 private static final Timer readHeadersTimer = Metrics.timer(METRIC_NAME, OPERATION, "readHeaders"); 057 private static final Timer readContentTimer = Metrics.timer(METRIC_NAME, OPERATION, "readContent"); 058 private static final Timer listVersionsTimer = Metrics.timer(METRIC_NAME, OPERATION, "listVersions"); 059 private static final Timer commitTimer = Metrics.timer(METRIC_NAME, OPERATION, "commit"); 060 061 /** 062 * @param inner the session to wrap 063 */ 064 public FcrepoOcflObjectSessionWrapper(final OcflObjectSession inner) { 065 this.inner = inner; 066 } 067 068 @Override 069 public String sessionId() { 070 return inner.sessionId(); 071 } 072 073 @Override 074 public String ocflObjectId() { 075 return inner.ocflObjectId(); 076 } 077 078 @Override 079 public ResourceHeaders writeResource(final ResourceHeaders headers, final InputStream content) { 080 return MetricsHelper.time(writeTimer, () -> { 081 return exec(() -> inner.writeResource(headers, content)); 082 }); 083 } 084 085 @Override 086 public void writeHeaders(final ResourceHeaders headers) { 087 writeHeadersTimer.record(() -> { 088 exec(() -> inner.writeHeaders(headers)); 089 }); 090 } 091 092 @Override 093 public void deleteContentFile(final ResourceHeaders headers) { 094 deleteContentTimer.record(() -> { 095 exec(() -> inner.deleteContentFile(headers)); 096 }); 097 } 098 099 @Override 100 public void deleteResource(final String resourceId) { 101 deleteResourceTimer.record(() -> { 102 exec(() -> inner.deleteResource(resourceId)); 103 }); 104 } 105 106 @Override 107 public boolean containsResource(final String resourceId) { 108 return MetricsHelper.time(containsResourceTimer, () -> { 109 return exec(() -> inner.containsResource(resourceId)); 110 }); 111 } 112 113 @Override 114 public ResourceHeaders readHeaders(final String resourceId) { 115 return MetricsHelper.time(readHeadersTimer, () -> { 116 return exec(() -> inner.readHeaders(resourceId)); 117 }); 118 } 119 120 @Override 121 public ResourceHeaders readHeaders(final String resourceId, final String versionNumber) { 122 return MetricsHelper.time(readHeadersTimer, () -> { 123 return exec(() -> inner.readHeaders(resourceId, versionNumber)); 124 }); 125 } 126 127 @Override 128 public ResourceContent readContent(final String resourceId) { 129 return MetricsHelper.time(readContentTimer, () -> { 130 return exec(() -> inner.readContent(resourceId)); 131 }); 132 } 133 134 @Override 135 public ResourceContent readContent(final String resourceId, final String versionNumber) { 136 return MetricsHelper.time(readContentTimer, () -> { 137 return exec(() -> inner.readContent(resourceId, versionNumber)); 138 }); 139 } 140 141 @Override 142 public List<OcflVersionInfo> listVersions(final String resourceId) { 143 return MetricsHelper.time(listVersionsTimer, () -> { 144 return exec(() -> inner.listVersions(resourceId)); 145 }); 146 } 147 148 @Override 149 public Stream<ResourceHeaders> streamResourceHeaders() { 150 return exec(inner::streamResourceHeaders); 151 } 152 153 @Override 154 public void versionCreationTimestamp(final OffsetDateTime timestamp) { 155 inner.versionCreationTimestamp(timestamp); 156 } 157 158 @Override 159 public void versionAuthor(final String name, final String address) { 160 inner.versionAuthor(name, address); 161 } 162 163 @Override 164 public void versionMessage(final String message) { 165 inner.versionMessage(message); 166 } 167 168 @Override 169 public void commitType(final CommitType commitType) { 170 inner.commitType(commitType); 171 } 172 173 @Override 174 public void commit() { 175 commitTimer.record(() -> { 176 exec(inner::commit); 177 }); 178 } 179 180 @Override 181 public void rollback() { 182 exec(inner::rollback); 183 } 184 185 @Override 186 public void abort() { 187 exec(inner::abort); 188 } 189 190 @Override 191 public boolean isOpen() { 192 return inner.isOpen(); 193 } 194 195 @Override 196 public void close() { 197 exec(inner::close); 198 } 199 200 private <T> T exec(final Callable<T> callable) throws PersistentStorageException { 201 try { 202 return callable.call(); 203 } catch (final NotFoundException e) { 204 throw new PersistentItemNotFoundException(e.getMessage(), e); 205 } catch (final Exception e) { 206 throw new PersistentStorageException(e.getMessage(), e); 207 } 208 } 209 210 private void exec(final CheckedRunnable runnable) throws PersistentStorageException { 211 try { 212 runnable.run(); 213 } catch (final NotFoundException e) { 214 throw new PersistentItemNotFoundException(e.getMessage(), e); 215 } catch (final Exception e) { 216 throw new PersistentStorageException(e.getMessage(), e); 217 } 218 } 219 220}