001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.impl.utils.impl;
017
018import static org.apache.commons.io.IOUtils.copy;
019import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM;
020
021import java.io.InputStream;
022import java.io.Serializable;
023import java.net.URI;
024import java.security.MessageDigest;
025import java.util.Collection;
026import java.util.Set;
027
028import org.fcrepo.kernel.impl.utils.FixityInputStream;
029import org.fcrepo.kernel.impl.utils.FixityResultImpl;
030import org.fcrepo.kernel.impl.utils.infinispan.CacheLoaderChunkInputStream;
031import org.fcrepo.kernel.utils.ContentDigest;
032import org.fcrepo.kernel.utils.FixityResult;
033import org.infinispan.Cache;
034import org.infinispan.CacheImpl;
035import org.infinispan.distexec.DistributedCallable;
036import org.infinispan.persistence.manager.PersistenceManager;
037import org.infinispan.persistence.spi.CacheLoader;
038
039import com.google.common.collect.ImmutableSet;
040
041/**
042 * Infinispan DistributedCallable for checking the fixity of a binary key in every cache loader
043 *
044 * @author cabeer
045 */
046public class DistributedFixityCheck implements DistributedCallable<String, byte[], Collection<FixityResult>>,
047                                                   Serializable {
048    private static final long serialVersionUID = 1L;
049
050    private final String dataKey;
051    private final String digest;
052    private final int chunkSize;
053    private final long length;
054    private Cache<String, byte[]> cache;
055
056    /**
057     *
058     * @param dataKey
059     */
060    public DistributedFixityCheck(final String dataKey, final String digest, final int chunkSize, final long length) {
061        this.dataKey = dataKey;
062        this.digest = digest;
063        this.chunkSize = chunkSize;
064        this.length = length;
065    }
066
067    @Override
068    public Collection<FixityResult> call() throws Exception {
069        final ImmutableSet.Builder<FixityResult> fixityResults = new ImmutableSet.Builder<>();
070
071        for (final CacheLoader<String, byte[]> store : stores()) {
072
073            try (final InputStream cacheLoaderChunkInputStream = new CacheLoaderChunkInputStream(
074                    store, dataKey, chunkSize, length);
075
076                    final FixityInputStream fixityInputStream = new FixityInputStream(
077                            cacheLoaderChunkInputStream, MessageDigest.getInstance(digest))) {
078
079                copy(fixityInputStream, NULL_OUTPUT_STREAM);
080
081                final URI calculatedChecksum =
082                        ContentDigest.asURI(digest, fixityInputStream.getMessageDigest().digest());
083                fixityResults.add(
084                        new FixityResultImpl(getExternalIdentifier(store), fixityInputStream.getByteCount(),
085                                calculatedChecksum));
086            }
087        }
088
089        return fixityResults.build();
090    }
091
092    private String getExternalIdentifier(final CacheLoader<String, byte[]> store) {
093        final String address;
094
095        if (cache.getCacheManager().getAddress() != null) {
096            address = cache.getCacheManager().getAddress().toString();
097        } else {
098            address = "localhost";
099        }
100
101        return "infinispan-cache-loader:" + address + "/" + store.toString() + "#" +  dataKey;
102    }
103
104    @Override
105    public void setEnvironment(final Cache<String, byte[]> cache, final Set<String> inputKeys) {
106        this.cache = cache;
107    }
108
109    @SuppressWarnings("rawtypes")
110    private Set<CacheLoader> stores() {
111        return ((CacheImpl<String, byte[]>)cache).getComponentRegistry().getLocalComponent(PersistenceManager.class)
112                .getStores(CacheLoader.class);
113    }
114}