001/* 002 * SPDX-License-Identifier: Apache-2.0 003 * 004 * Copyright 2024-2025 The Enola <https://enola.dev> Authors 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * https://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package dev.enola.common.function; 019 020import com.google.common.collect.Streams; 021 022import java.util.Iterator; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.stream.Stream; 025 026/** Static utility methods related to {@code Stream} instances. {@link Streams} has more. */ 027public final class MoreStreams { 028 029 // TODO Use more unchecked instead of checked exceptions in Enola, to reduce the need for 030 // this... 031 032 // TODO Move to package dev.enola.common.collect (but must figure out Sneaker relationship) 033 034 // TODO Eventually adopting one of (but which?) real reactive frameworks in Enola overall 035 // and rm this may be better? (That would likely be better than doing something such as e.g. 036 // https://stackoverflow.com/questions/30117134/aggregate-runtime-exceptions-in-java-8-streams) 037 038 public static <T> CloseableIterable<T> toIterable(Stream<T> stream) { 039 // NOT return stream::iterator; because: 040 // (a) This doesn't truly achieve "lazy" transformation in the sense that the Stream's 041 // pipeline elements (like map, filter) are only executed when the Iterator.next() is 042 // called. This is as lazy as a Stream gets. 043 // (b) The resulting Iterable is single-use. If you try to iterate over it a second time, 044 // you'll get an IllegalStateException because the underlying Stream is closed after 045 // its first consumption. 046 return new StreamSingleSupplierCloseableIterable<>(stream); 047 } 048 049 // See also dev.enola.common.collect.MoreIterators#SingleIterable 050 static final class StreamSingleSupplierCloseableIterable<T> implements CloseableIterable<T> { 051 private final AtomicBoolean supplied = new AtomicBoolean(false); 052 private final Stream<T> stream; 053 054 private StreamSingleSupplierCloseableIterable(Stream<T> stream) { 055 this.stream = stream; 056 } 057 058 @Override 059 public Iterator<T> iterator() { 060 if (!supplied.compareAndSet(false, true)) 061 throw new IllegalStateException("Value already supplied"); 062 return stream.iterator(); 063 } 064 065 @Override 066 public void close() { 067 stream.close(); 068 } 069 } 070 071 // While waiting for e.g. something like https://bugs.openjdk.org/browse/JDK-8148917 072 public static <T, E extends Exception> void forEach( 073 Stream<T> stream, CheckedConsumer<T, E> action) throws E { 074 if (stream.isParallel()) { 075 stream.forEach(Sneaker.sneakyConsumer(action)); 076 } else { 077 forEachInSeq(stream, action); 078 } 079 } 080 081 /** 082 * Returns a stream consisting of the results of applying the given function to the elements of 083 * this stream, allowing the mapping function to throw a checked exception. 084 * 085 * <p>This is an <a 086 * href="https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps"> 087 * intermediate operation</a>. The {@code mapper} function is not executed, and no exception is 088 * thrown, until a <a 089 * href="https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps"> 090 * terminal operation</a> is invoked on the returned stream. 091 * 092 * @param stream the stream to map 093 * @param mapper a <a 094 * href="https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#Statelessness"> 095 * non-interfering, stateless</a> function to apply to each element, which may throw a 096 * checked exception 097 * @param <T> The type of the input elements of the stream 098 * @param <R> The type of the output elements of the stream 099 * @param <E> The type of the checked exception that can be thrown by the mapper 100 * @return the new stream 101 * @throws E when a terminal operation is executed on the returned stream and the mapper throws 102 * a checked exception. Note that with parallel streams, the exception may (!) be wrapped in 103 * a {@link RuntimeException}. 104 */ 105 public static <T, R, E extends Exception> Stream<R> map( 106 Stream<T> stream, CheckedFunction<T, R, E> mapper) throws E { 107 return stream.map(Sneaker.sneakyFunction(mapper)); 108 } 109 110 // This (probably, not verified) loose parallelism, if the stream even has it? 111 private static <T, E extends Exception> void forEachInSeq( 112 Stream<T> stream, CheckedConsumer<T, E> action) throws E { 113 // https://stackoverflow.com/questions/20129762/why-does-streamt-not-implement-iterablet/20130475#20130475 114 var iterator = stream.iterator(); 115 while (iterator.hasNext()) { 116 T element = iterator.next(); 117 action.accept(element); 118 } 119 } 120 121 private MoreStreams() {} 122}