001/** 002 * Copyright 2015 DuraSpace, Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.fcrepo.kernel.impl.utils.impl; 017 018import static org.apache.commons.io.IOUtils.copy; 019import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM; 020 021import java.io.InputStream; 022import java.io.Serializable; 023import java.net.URI; 024import java.security.MessageDigest; 025import java.util.Collection; 026import java.util.Set; 027 028import org.fcrepo.kernel.impl.utils.FixityInputStream; 029import org.fcrepo.kernel.impl.utils.FixityResultImpl; 030import org.fcrepo.kernel.impl.utils.infinispan.CacheLoaderChunkInputStream; 031import org.fcrepo.kernel.utils.ContentDigest; 032import org.fcrepo.kernel.utils.FixityResult; 033import org.infinispan.Cache; 034import org.infinispan.CacheImpl; 035import org.infinispan.distexec.DistributedCallable; 036import org.infinispan.persistence.manager.PersistenceManager; 037import org.infinispan.persistence.spi.CacheLoader; 038 039import com.google.common.collect.ImmutableSet; 040 041/** 042 * Infinispan DistributedCallable for checking the fixity of a binary key in every cache loader 043 * 044 * @author cabeer 045 */ 046public class DistributedFixityCheck implements DistributedCallable<String, byte[], Collection<FixityResult>>, 047 Serializable { 048 private static final long serialVersionUID = 1L; 049 050 private final String dataKey; 051 private final String digest; 052 private final int chunkSize; 053 private final long length; 054 private Cache<String, byte[]> cache; 055 056 /** 057 * 058 * @param dataKey 059 */ 060 public DistributedFixityCheck(final String dataKey, final String digest, final int chunkSize, final long length) { 061 this.dataKey = dataKey; 062 this.digest = digest; 063 this.chunkSize = chunkSize; 064 this.length = length; 065 } 066 067 @Override 068 public Collection<FixityResult> call() throws Exception { 069 final ImmutableSet.Builder<FixityResult> fixityResults = new ImmutableSet.Builder<>(); 070 071 for (final CacheLoader<String, byte[]> store : stores()) { 072 073 try (final InputStream cacheLoaderChunkInputStream = new CacheLoaderChunkInputStream( 074 store, dataKey, chunkSize, length); 075 076 final FixityInputStream fixityInputStream = new FixityInputStream( 077 cacheLoaderChunkInputStream, MessageDigest.getInstance(digest))) { 078 079 copy(fixityInputStream, NULL_OUTPUT_STREAM); 080 081 final URI calculatedChecksum = 082 ContentDigest.asURI(digest, fixityInputStream.getMessageDigest().digest()); 083 fixityResults.add( 084 new FixityResultImpl(getExternalIdentifier(store), fixityInputStream.getByteCount(), 085 calculatedChecksum)); 086 } 087 } 088 089 return fixityResults.build(); 090 } 091 092 private String getExternalIdentifier(final CacheLoader<String, byte[]> store) { 093 final String address; 094 095 if (cache.getCacheManager().getAddress() != null) { 096 address = cache.getCacheManager().getAddress().toString(); 097 } else { 098 address = "localhost"; 099 } 100 101 return "infinispan-cache-loader:" + address + "/" + store.toString() + "#" + dataKey; 102 } 103 104 @Override 105 public void setEnvironment(final Cache<String, byte[]> cache, final Set<String> inputKeys) { 106 this.cache = cache; 107 } 108 109 @SuppressWarnings("rawtypes") 110 private Set<CacheLoader> stores() { 111 return ((CacheImpl<String, byte[]>)cache).getComponentRegistry().getLocalComponent(PersistenceManager.class) 112 .getStores(CacheLoader.class); 113 } 114}