001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.impl.utils.impl;
017
018import static org.apache.commons.io.IOUtils.copy;
019import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM;
020
021import java.io.InputStream;
022import java.io.Serializable;
023import java.net.URI;
024import java.security.MessageDigest;
025import java.util.Collection;
026import java.util.Set;
027
028import org.fcrepo.kernel.impl.utils.FixityInputStream;
029import org.fcrepo.kernel.impl.utils.FixityResultImpl;
030import org.fcrepo.kernel.impl.utils.infinispan.CacheLoaderChunkInputStream;
031import org.fcrepo.kernel.utils.ContentDigest;
032import org.fcrepo.kernel.utils.FixityResult;
033import org.infinispan.Cache;
034import org.infinispan.CacheImpl;
035import org.infinispan.distexec.DistributedCallable;
036import org.infinispan.persistence.manager.PersistenceManager;
037import org.infinispan.persistence.spi.CacheLoader;
038
039import com.google.common.collect.ImmutableSet;
040
041/**
042 * Infinispan DistributedCallable for checking the fixity of a binary key in every cache loader
043 *
044 * @author cabeer
045 */
046public class DistributedFixityCheck implements DistributedCallable<String, byte[], Collection<FixityResult>>,
047                                                   Serializable {
048    private static final long serialVersionUID = 1L;
049
050    private final String dataKey;
051    private final String digest;
052    private final int chunkSize;
053    private final long length;
054    private Cache<String, byte[]> cache;
055
056    /**
057     *
058     * @param dataKey the data key
059     * @param digest the digest
060     * @param chunkSize the chunk size
061     * @param length the given length
062     */
063    public DistributedFixityCheck(final String dataKey, final String digest, final int chunkSize, final long length) {
064        this.dataKey = dataKey;
065        this.digest = digest;
066        this.chunkSize = chunkSize;
067        this.length = length;
068    }
069
070    @Override
071    public Collection<FixityResult> call() throws Exception {
072        final ImmutableSet.Builder<FixityResult> fixityResults = new ImmutableSet.Builder<>();
073
074        for (final CacheLoader<String, byte[]> store : stores()) {
075
076            try (final InputStream cacheLoaderChunkInputStream = new CacheLoaderChunkInputStream(
077                    store, dataKey, chunkSize, length);
078
079                    final FixityInputStream fixityInputStream = new FixityInputStream(
080                            cacheLoaderChunkInputStream, MessageDigest.getInstance(digest))) {
081
082                copy(fixityInputStream, NULL_OUTPUT_STREAM);
083
084                final URI calculatedChecksum =
085                        ContentDigest.asURI(digest, fixityInputStream.getMessageDigest().digest());
086                fixityResults.add(
087                        new FixityResultImpl(getExternalIdentifier(store), fixityInputStream.getByteCount(),
088                                calculatedChecksum));
089            }
090        }
091
092        return fixityResults.build();
093    }
094
095    private String getExternalIdentifier(final CacheLoader<String, byte[]> store) {
096        final String address;
097
098        if (cache.getCacheManager().getAddress() != null) {
099            address = cache.getCacheManager().getAddress().toString();
100        } else {
101            address = "localhost";
102        }
103
104        return "infinispan-cache-loader:" + address + "/" + store.toString() + "#" +  dataKey;
105    }
106
107    @Override
108    public void setEnvironment(final Cache<String, byte[]> cache, final Set<String> inputKeys) {
109        this.cache = cache;
110    }
111
112    @SuppressWarnings("rawtypes")
113    private Set<CacheLoader> stores() {
114        return ((CacheImpl<String, byte[]>)cache).getComponentRegistry().getLocalComponent(PersistenceManager.class)
115                .getStores(CacheLoader.class);
116    }
117}