001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.kernel.impl.utils.impl; 017 018import static org.apache.commons.io.IOUtils.copy; 019import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM; 020 021import java.io.InputStream; 022import java.io.Serializable; 023import java.net.URI; 024import java.security.MessageDigest; 025import java.util.Collection; 026import java.util.Set; 027 028import org.fcrepo.kernel.impl.utils.FixityInputStream; 029import org.fcrepo.kernel.impl.utils.FixityResultImpl; 030import org.fcrepo.kernel.impl.utils.infinispan.CacheLoaderChunkInputStream; 031import org.fcrepo.kernel.utils.ContentDigest; 032import org.fcrepo.kernel.utils.FixityResult; 033import org.infinispan.Cache; 034import org.infinispan.CacheImpl; 035import org.infinispan.distexec.DistributedCallable; 036import org.infinispan.persistence.manager.PersistenceManager; 037import org.infinispan.persistence.spi.CacheLoader; 038 039import com.google.common.collect.ImmutableSet; 040 041/** 042 * Infinispan DistributedCallable for checking the fixity of a binary key in every cache loader 043 * 044 * @author cabeer 045 */ 046public class DistributedFixityCheck implements DistributedCallable<String, byte[], Collection<FixityResult>>, 047 Serializable { 048 private static final long serialVersionUID = 1L; 049 050 private final String dataKey; 051 private final String digest; 052 private final int chunkSize; 053 private final long length; 054 private Cache<String, byte[]> cache; 055 056 /** 057 * 058 * @param dataKey the data key 059 * @param digest the digest 060 * @param chunkSize the chunk size 061 * @param length the given length 062 */ 063 public DistributedFixityCheck(final String dataKey, final String digest, final int chunkSize, final long length) { 064 this.dataKey = dataKey; 065 this.digest = digest; 066 this.chunkSize = chunkSize; 067 this.length = length; 068 } 069 070 @Override 071 public Collection<FixityResult> call() throws Exception { 072 final ImmutableSet.Builder<FixityResult> fixityResults = new ImmutableSet.Builder<>(); 073 074 for (final CacheLoader<String, byte[]> store : stores()) { 075 076 try (final InputStream cacheLoaderChunkInputStream = new CacheLoaderChunkInputStream( 077 store, dataKey, chunkSize, length); 078 079 final FixityInputStream fixityInputStream = new FixityInputStream( 080 cacheLoaderChunkInputStream, MessageDigest.getInstance(digest))) { 081 082 copy(fixityInputStream, NULL_OUTPUT_STREAM); 083 084 final URI calculatedChecksum = 085 ContentDigest.asURI(digest, fixityInputStream.getMessageDigest().digest()); 086 fixityResults.add( 087 new FixityResultImpl(getExternalIdentifier(store), fixityInputStream.getByteCount(), 088 calculatedChecksum)); 089 } 090 } 091 092 return fixityResults.build(); 093 } 094 095 private String getExternalIdentifier(final CacheLoader<String, byte[]> store) { 096 final String address; 097 098 if (cache.getCacheManager().getAddress() != null) { 099 address = cache.getCacheManager().getAddress().toString(); 100 } else { 101 address = "localhost"; 102 } 103 104 return "infinispan-cache-loader:" + address + "/" + store.toString() + "#" + dataKey; 105 } 106 107 @Override 108 public void setEnvironment(final Cache<String, byte[]> cache, final Set<String> inputKeys) { 109 this.cache = cache; 110 } 111 112 @SuppressWarnings("rawtypes") 113 private Set<CacheLoader> stores() { 114 return ((CacheImpl<String, byte[]>)cache).getComponentRegistry().getLocalComponent(PersistenceManager.class) 115 .getStores(CacheLoader.class); 116 } 117}