001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.kernel.impl.utils.infinispan; 017 018import org.infinispan.persistence.spi.CacheLoader; 019import org.modeshape.common.logging.Logger; 020 021import java.io.IOException; 022import java.io.InputStream; 023 024/** 025 * @author cabeer 026 */ 027public class CacheLoaderChunkInputStream extends InputStream { 028 029 private static final Logger LOGGER = Logger.getLogger(CacheLoaderChunkInputStream.class); 030 031 private final CacheLoader<String, byte[]> blobCache; 032 private final String key; 033 private final int chunkSize; 034 private final long totalSize; 035 private final int chunksCount; 036 037 protected int indexInBuffer; 038 protected byte[] buffer; 039 private int chunkNumber; 040 041 /** 042 * Constructor 043 * @param blobCache the blob cache 044 * @param key the key 045 * @param chunkSize the chunk size 046 * @param totalSize the total size 047 */ 048 public CacheLoaderChunkInputStream( final CacheLoader<String, byte[]> blobCache, 049 final String key, 050 final int chunkSize, 051 final long totalSize ) { 052 this.blobCache = blobCache; 053 this.key = key; 054 this.chunkSize = chunkSize; 055 this.totalSize = totalSize; 056 this.chunkNumber = 0; 057 this.indexInBuffer = 0; 058 final int remainderSize = (int) (totalSize % chunkSize); 059 final int numberOfChunks = (int) totalSize / chunkSize; 060 this.chunksCount = remainderSize > 0 ? numberOfChunks + 1 : numberOfChunks; 061 } 062 063 @Override 064 public int read() throws IOException { 065 if (indexInBuffer == -1) { 066 return -1; 067 } 068 if (buffer == null) { 069 fillBufferWithFirstChunk(); 070 return read(); 071 } else if (indexInBuffer >= buffer.length) { 072 fillBufferWithNextChunk(); 073 return read(); 074 } 075 return buffer[indexInBuffer++] & 0xff; 076 } 077 078 @Override 079 public int read( final byte[] b, 080 final int off, 081 final int len ) throws IOException { 082 if (indexInBuffer == -1) { 083 return -1; 084 } 085 if (buffer == null) { 086 fillBufferWithFirstChunk(); 087 return read(b, off, len); 088 } 089 if (indexInBuffer >= buffer.length) { 090 return -1; 091 } 092 final int newlen; 093 if (indexInBuffer + len > buffer.length) { 094 newlen = buffer.length - indexInBuffer; 095 } else { 096 newlen = len; 097 } 098 System.arraycopy(buffer, indexInBuffer, b, off, newlen); 099 indexInBuffer += newlen; 100 if (indexInBuffer >= buffer.length) { 101 fillBufferWithNextChunk(); 102 } 103 return newlen; 104 } 105 106 @Override 107 public int available() { 108 if (buffer == null) { 109 fillBufferWithFirstChunk(); 110 } 111 return buffer.length - indexInBuffer; 112 } 113 114 @Override 115 public final long skip( final long n ) { 116 if (n <= 0 || indexInBuffer == -1 || totalSize == 0) { 117 return 0; 118 } 119 return directSkip(n); 120 } 121 122 @Override 123 public void close() { 124 endOfStream(); 125 } 126 127 private long directSkip( final long n ) { 128 final long availableInBuffer = buffer != null ? (buffer.length - indexInBuffer) : chunkSize; 129 130 if (n < availableInBuffer) { 131 //we can skip "n" without requiring any additional chunks 132 if (buffer == null) { 133 //we haven't been initialized yet, so load the first chunk 134 fillBufferWithFirstChunk(); 135 } 136 indexInBuffer += n; 137 return n; 138 } 139 140 //we need to skip past the current chunk, so find the chunk which needs to be loaded 141 final long lastChunkSize = totalSize - (chunksCount - 1) * chunkSize; 142 final int chunksAvailableToSkip = chunksCount - chunkNumber - 1; 143 final long bytesAvailableToSkip = (chunksAvailableToSkip - 1) * chunkSize + lastChunkSize; 144 145 final long stillRequiredToSkip = n - availableInBuffer; 146 final int chunksToSkipOver = (int) (stillRequiredToSkip / chunkSize); 147 final int leftToReadAfterSkip = (int) (stillRequiredToSkip % chunkSize); 148 chunkNumber = chunkNumber + chunksToSkipOver + 1; //chunk# is 0 based 149 150 if (chunkNumber >= chunksCount) { 151 //we would need to skip more chunks than we have 152 endOfStream(); 153 return availableInBuffer + bytesAvailableToSkip; 154 } 155 //move directly to the required chunk 156 fillBuffer(chunkNumber); 157 if (buffer.length > leftToReadAfterSkip) { 158 //move the pointer in this chunk 159 indexInBuffer = leftToReadAfterSkip; 160 return n; 161 } 162 //we jumped to a valid chunk, but it doesn't have enough data 163 endOfStream(); 164 return availableInBuffer + bytesAvailableToSkip; 165 } 166 167 private void fillBufferWithNextChunk() { 168 this.chunkNumber++; 169 fillBuffer(this.chunkNumber); 170 } 171 172 private void fillBufferWithFirstChunk() { 173 fillBuffer(0); 174 } 175 176 private void fillBuffer(final int chunkNumber) { 177 buffer = readChunk(chunkNumber); 178 if (buffer == null) { 179 endOfStream(); 180 } else { 181 indexInBuffer = 0; 182 } 183 } 184 185 private void endOfStream() { 186 buffer = new byte[0]; 187 indexInBuffer = -1; 188 chunkNumber = -1; 189 } 190 191 private byte[] readChunk( final int chunkNumber ) { 192 final String chunkKey = key + "-" + chunkNumber; 193 LOGGER.debug("Read chunk {0}", chunkKey); 194 if (blobCache.contains(chunkKey)) { 195 return blobCache.load(chunkKey).getValue(); 196 } 197 return null; 198 } 199}