001/**
002 * Copyright 2015 DuraSpace, Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.fcrepo.kernel.impl.utils.impl;
017
018import static com.google.common.base.Throwables.propagate;
019import static java.util.concurrent.TimeUnit.MILLISECONDS;
020import static org.slf4j.LoggerFactory.getLogger;
021
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.List;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.Future;
027import java.util.concurrent.TimeoutException;
028
029import javax.jcr.Property;
030
031import org.fcrepo.kernel.impl.services.functions.GetClusterExecutor;
032import org.fcrepo.kernel.utils.FixityResult;
033import org.infinispan.distexec.DistributedExecutorService;
034import org.modeshape.jcr.value.BinaryKey;
035import org.modeshape.jcr.value.binary.infinispan.ChunkBinaryMetadata;
036import org.modeshape.jcr.value.binary.infinispan.InfinispanBinaryStore;
037import org.modeshape.jcr.value.binary.infinispan.InfinispanUtils;
038import org.slf4j.Logger;
039
040import com.google.common.collect.ImmutableSet;
041
042/**
043 * @author cabeer
044 */
045public class InfinispanCacheStoreEntry extends LocalBinaryStoreEntry {
046    private static final Logger LOGGER = getLogger(InfinispanCacheStoreEntry.class);
047
048    private static final GetClusterExecutor EXECUTOR_FACTORY = new GetClusterExecutor();
049    /**
050     *
051     * @param store the store
052     * @param property the property
053     */
054    public InfinispanCacheStoreEntry(final InfinispanBinaryStore store, final Property property) {
055        super(store, property);
056    }
057
058    @Override
059    public Collection<FixityResult> checkFixity(final String algorithm) {
060        final BinaryKey key = binaryKey();
061        final ImmutableSet.Builder<FixityResult> fixityResults = new ImmutableSet.Builder<>();
062
063        if (store().hasBinary(key)) {
064            final String dataKey = InfinispanUtils.dataKeyFrom((InfinispanBinaryStore)store(), key);
065            final ChunkBinaryMetadata metadata = InfinispanUtils.getMetadata((InfinispanBinaryStore)store(), key);
066
067            final DistributedFixityCheck task = new DistributedFixityCheck(dataKey, algorithm, metadata.getChunkSize(),
068                    metadata.getLength());
069
070            final List<Future<Collection<FixityResult>>> futures
071                = clusterExecutor().submitEverywhere(task, dataKey + "-0");
072
073            while (!futures.isEmpty()) {
074                final Iterator<Future<Collection<FixityResult>>> iterator = futures.iterator();
075                while (iterator.hasNext()) {
076                    final Future<Collection<FixityResult>> future = iterator.next();
077                    try {
078                        final Collection<FixityResult> result = future.get(100, MILLISECONDS);
079                        iterator.remove();
080
081                        fixityResults.addAll(result);
082                    } catch (final TimeoutException e) {
083                        LOGGER.trace("Going to retry cluster transform after timeout", e);
084                    } catch (InterruptedException | ExecutionException e) {
085                        throw propagate(e);
086                    }
087                }
088            }
089        }
090        return fixityResults.build();
091    }
092
093    private DistributedExecutorService clusterExecutor() {
094        return EXECUTOR_FACTORY.apply((InfinispanBinaryStore)store());
095    }
096}