001/*
002 * SPDX-License-Identifier: Apache-2.0
003 *
004 * Copyright 2024-2025 The Enola <https://enola.dev> Authors
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *     https://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package dev.enola.common.function;
019
020import com.google.common.collect.Streams;
021
022import java.util.Iterator;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.stream.Stream;
025
026/** Static utility methods related to {@code Stream} instances. {@link Streams} has more. */
027public final class MoreStreams {
028
029    // TODO Use more unchecked instead of checked exceptions in Enola, to reduce the need for
030    // this...
031
032    // TODO Move to package dev.enola.common.collect (but must figure out Sneaker relationship)
033
034    // TODO Eventually adopting one of (but which?) real reactive frameworks in Enola overall
035    // and rm this may be better? (That would likely be better than doing something such as e.g.
036    // https://stackoverflow.com/questions/30117134/aggregate-runtime-exceptions-in-java-8-streams)
037
038    public static <T> CloseableIterable<T> toIterable(Stream<T> stream) {
039        // NOT return stream::iterator; because:
040        //   (a) This doesn't truly achieve "lazy" transformation in the sense that the Stream's
041        //       pipeline  elements (like map, filter) are only executed when the Iterator.next() is
042        //       called. This is as lazy as a Stream gets.
043        //   (b) The resulting Iterable is single-use. If you try to iterate over it a second time,
044        //       you'll get an IllegalStateException because the underlying Stream is closed after
045        //       its first consumption.
046        return new StreamSingleSupplierCloseableIterable<>(stream);
047    }
048
049    // See also dev.enola.common.collect.MoreIterators#SingleIterable
050    static final class StreamSingleSupplierCloseableIterable<T> implements CloseableIterable<T> {
051        private final AtomicBoolean supplied = new AtomicBoolean(false);
052        private final Stream<T> stream;
053
054        private StreamSingleSupplierCloseableIterable(Stream<T> stream) {
055            this.stream = stream;
056        }
057
058        @Override
059        public Iterator<T> iterator() {
060            if (!supplied.compareAndSet(false, true))
061                throw new IllegalStateException("Value already supplied");
062            return stream.iterator();
063        }
064
065        @Override
066        public void close() {
067            stream.close();
068        }
069    }
070
071    // While waiting for e.g. something like https://bugs.openjdk.org/browse/JDK-8148917
072    public static <T, E extends Exception> void forEach(
073            Stream<T> stream, CheckedConsumer<T, E> action) throws E {
074        if (stream.isParallel()) {
075            stream.forEach(Sneaker.sneakyConsumer(action));
076        } else {
077            forEachInSeq(stream, action);
078        }
079    }
080
081    /**
082     * Returns a stream consisting of the results of applying the given function to the elements of
083     * this stream, allowing the mapping function to throw a checked exception.
084     *
085     * <p>This is an <a
086     * href="https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps">
087     * intermediate operation</a>. The {@code mapper} function is not executed, and no exception is
088     * thrown, until a <a
089     * href="https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps">
090     * terminal operation</a> is invoked on the returned stream.
091     *
092     * @param stream the stream to map
093     * @param mapper a <a
094     *     href="https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#Statelessness">
095     *     non-interfering, stateless</a> function to apply to each element, which may throw a
096     *     checked exception
097     * @param <T> The type of the input elements of the stream
098     * @param <R> The type of the output elements of the stream
099     * @param <E> The type of the checked exception that can be thrown by the mapper
100     * @return the new stream
101     * @throws E when a terminal operation is executed on the returned stream and the mapper throws
102     *     a checked exception. Note that with parallel streams, the exception may (!) be wrapped in
103     *     a {@link RuntimeException}.
104     */
105    public static <T, R, E extends Exception> Stream<R> map(
106            Stream<T> stream, CheckedFunction<T, R, E> mapper) throws E {
107        return stream.map(Sneaker.sneakyFunction(mapper));
108    }
109
110    // This (probably, not verified) loose parallelism, if the stream even has it?
111    private static <T, E extends Exception> void forEachInSeq(
112            Stream<T> stream, CheckedConsumer<T, E> action) throws E {
113        // https://stackoverflow.com/questions/20129762/why-does-streamt-not-implement-iterablet/20130475#20130475
114        var iterator = stream.iterator();
115        while (iterator.hasNext()) {
116            T element = iterator.next();
117            action.accept(element);
118        }
119    }
120
121    private MoreStreams() {}
122}