001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.kernel.modeshape.utils;
019
020import static java.util.Spliterator.IMMUTABLE;
021import static java.util.Spliterators.spliteratorUnknownSize;
022import static java.util.stream.StreamSupport.stream;
023import static org.slf4j.LoggerFactory.getLogger;
024
025import java.util.Iterator;
026import java.util.stream.Stream;
027
028import org.slf4j.Logger;
029
030/**
031 * @author acoburn
032 * @since February 10, 2016
033 */
034public class StreamUtils {
035
036    private static final Logger LOGGER = getLogger(StreamUtils.class);
037
038    private static Boolean enableParallel = false;
039
040    private static final String FCREPO_STEAMING_PARALLEL_KEY =
041            "fcrepo.streaming.parallel";
042    static {
043        final String enableParallelVal = System.getProperty(FCREPO_STEAMING_PARALLEL_KEY, "false")
044                .trim()
045                .toLowerCase();
046        if (!enableParallelVal.equals("true") && !enableParallelVal.equals("false")) {
047            LOGGER.warn(
048                    "The {} parameter contains an invalid value of {}:  " +
049                            "allowed values are 'true' and 'false'. " +
050                            "The default value of {} remain unchanged.",
051                    FCREPO_STEAMING_PARALLEL_KEY, enableParallelVal, enableParallel);
052        } else {
053            StreamUtils.enableParallel = Boolean.valueOf(enableParallelVal);
054        }
055    }
056
057    /**
058     * Convert an Iterator to a Stream
059     *
060     * @param iterator the iterator
061     * @param <T> the type of the Stream
062     * @return the stream
063     */
064    public static <T> Stream<T> iteratorToStream(final Iterator<T> iterator) {
065        return iteratorToStream(iterator, enableParallel);
066    }
067
068    /**
069     * Convert an Iterator to a Stream
070     *
071     * @param <T> the type of the Stream
072     * @param iterator the iterator
073     * @param parallel whether to parallelize the stream
074     * @return the stream
075     */
076    public static <T> Stream<T> iteratorToStream(final Iterator<T> iterator, final Boolean parallel) {
077        return stream(spliteratorUnknownSize(iterator, IMMUTABLE), parallel);
078    }
079    private StreamUtils() {
080        // prevent instantiation
081    }
082}
083