From 9bcd913582b6822f5e6cd18ac8ac4341914e44c5 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sun, 29 Mar 2026 23:46:56 +0800 Subject: [PATCH 1/2] stream: add gather operator and tighten coverage Motivation: add the gather operator across the Scala and Java DSLs, document it, tighten its execution semantics, and broaden its statefulMap-equivalent coverage. Modification: implement the gather stage and DSL wiring, add Scala/Java/docs examples and tests, add JMH coverage, optimize the public gather hot path, and fix the one-to-one backpressure bug found in review. Result: gather is now documented and verified end-to-end with stronger semantics, broader regression coverage, and near-parity zipWithIndex performance against statefulMap. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../pekko/stream/ZipWithIndexBenchmark.scala | 142 ++++ docs/gather_zero_allocation_evaluation.md | 38 + .../stream/operators/Source-or-Flow/gather.md | 62 ++ .../main/paradox/stream/operators/index.md | 2 + .../jdocs/stream/operators/flow/Gather.java | 81 ++ .../docs/stream/operators/flow/Gather.scala | 69 ++ .../apache/pekko/stream/javadsl/FlowTest.java | 58 ++ .../pekko/stream/javadsl/SourceTest.java | 40 + .../stream/scaladsl/FlowGatherSpec.scala | 764 ++++++++++++++++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../apache/pekko/stream/impl/fusing/Ops.scala | 290 +++++++ .../apache/pekko/stream/javadsl/Flow.scala | 46 ++ .../apache/pekko/stream/javadsl/Gather.scala | 56 ++ .../apache/pekko/stream/javadsl/Source.scala | 46 ++ .../apache/pekko/stream/javadsl/SubFlow.scala | 30 + .../pekko/stream/javadsl/SubSource.scala | 30 + .../apache/pekko/stream/scaladsl/Flow.scala | 29 + .../apache/pekko/stream/scaladsl/Gather.scala | 66 ++ 18 files changed, 1850 insertions(+) create mode 100644 docs/gather_zero_allocation_evaluation.md create mode 100644 docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md create mode 100644 docs/src/test/java/jdocs/stream/operators/flow/Gather.java create mode 100644 docs/src/test/scala/docs/stream/operators/flow/Gather.scala create mode 100644 stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala create mode 100644 stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala create mode 100644 stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala index 9c1dde4c6e8..45d546f402d 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala @@ -76,6 +76,103 @@ class ZipWithIndexBenchmark { } .toMat(Sink.ignore)(Keep.right) + private val statefulMapZipWithIndex = Source + .repeat(1) + .take(OperationsPerInvocation) + .statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None) + .toMat(Sink.ignore)(Keep.right) + + private val gatherPublicZipWithIndex = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new Gatherer[Int, (Int, Long)] { + private var index = 0L + + override def apply(elem: Int, collector: GatherCollector[(Int, Long)]): Unit = { + val zipped = (elem, index) + index += 1 + collector.push(zipped) + } + }) + .toMat(Sink.ignore)(Keep.right) + + private val gatherInternalOneToOneZipWithIndex = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new OneToOneGatherer[Int, (Int, Long)] { + private var index = 0L + + override def applyOne(elem: Int): (Int, Long) = { + val zipped = (elem, index) + index += 1 + zipped + } + }) + .toMat(Sink.ignore)(Keep.right) + + private val statefulMapIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .statefulMap(() => ())((state, elem) => (state, elem + 1), _ => None) + .toMat(Sink.ignore)(Keep.right) + + private val gatherPublicIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem + 1) + }) + .toMat(Sink.ignore)(Keep.right) + + private val gatherInternalOneToOneIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new OneToOneGatherer[Int, Int] { + override def applyOne(elem: Int): Int = elem + 1 + }) + .toMat(Sink.ignore)(Keep.right) + + private val statefulMapCountedIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .statefulMap(() => 0L)((index, elem) => (index + 1, elem + index.toInt), _ => None) + .toMat(Sink.ignore)(Keep.right) + + private val gatherPublicCountedIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new Gatherer[Int, Int] { + private var index = 0L + + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + val incremented = elem + index.toInt + index += 1 + collector.push(incremented) + } + }) + .toMat(Sink.ignore)(Keep.right) + + private val gatherInternalOneToOneCountedIncrement = Source + .repeat(1) + .take(OperationsPerInvocation) + .gather(() => + new OneToOneGatherer[Int, Int] { + private var index = 0L + + override def applyOne(elem: Int): Int = { + val incremented = elem + index.toInt + index += 1 + incremented + } + }) + .toMat(Sink.ignore)(Keep.right) + @Benchmark @OperationsPerInvocation(OperationsPerInvocation) def benchOldZipWithIndex(): Unit = @@ -86,4 +183,49 @@ class ZipWithIndexBenchmark { def benchNewZipWithIndex(): Unit = Await.result(newZipWithIndex.run(), Duration.Inf) + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchStatefulMapZipWithIndex(): Unit = + Await.result(statefulMapZipWithIndex.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherPublicZipWithIndex(): Unit = + Await.result(gatherPublicZipWithIndex.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherInternalOneToOneZipWithIndex(): Unit = + Await.result(gatherInternalOneToOneZipWithIndex.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchStatefulMapIncrement(): Unit = + Await.result(statefulMapIncrement.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherPublicIncrement(): Unit = + Await.result(gatherPublicIncrement.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherInternalOneToOneIncrement(): Unit = + Await.result(gatherInternalOneToOneIncrement.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchStatefulMapCountedIncrement(): Unit = + Await.result(statefulMapCountedIncrement.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherPublicCountedIncrement(): Unit = + Await.result(gatherPublicCountedIncrement.run(), Duration.Inf) + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def benchGatherInternalOneToOneCountedIncrement(): Unit = + Await.result(gatherInternalOneToOneCountedIncrement.run(), Duration.Inf) + } diff --git a/docs/gather_zero_allocation_evaluation.md b/docs/gather_zero_allocation_evaluation.md new file mode 100644 index 00000000000..f36109323d5 --- /dev/null +++ b/docs/gather_zero_allocation_evaluation.md @@ -0,0 +1,38 @@ +# Pekko Gather/Zero-Allocation Operator 评估报告 + +## 背景与目标 +- 目标:分析现有 statefulMap/statefulMapConcat 的分配情况,评估是否有必要引入类似 JDK 24 Gatherer 的 zero-allocation 操作符,并明确不能破坏现有 API。 +- 范围:仅分析 Pekko Stream 当前实现与 JDK 24 Gatherer、SmallRye Mutiny 等对比。 + +## 现状分析 +### statefulMap +- API:`(S, In) => (S, Out)` +- 每个元素分配一个 Tuple2((S, Out)),为 API 设计所致。 +- 不能避免 per-element 分配。 + +### statefulMapConcat +- API:`(S, In) => (S, Iterable[Out])` +- 每个元素分配 Iterable 和 Iterator。 +- 也是 API 设计决定,无法避免。 + +## JDK 24 Gatherer 对比 +- JDK 24 Gatherer 通过 mutable state + 直接下游 push,理论上可实现零分配。 +- 参考实现仍有部分分配(如 lambda/闭包等)。 +- SmallRye Mutiny 也有类似设计,但仍有微小分配。 + +## 结论与建议 +- 若要实现 zero-allocation,需设计全新 API,不能破坏 statefulMap 现有语义。 +- 建议:如需极致性能,可新增 opt-in gather-like 操作符,保留现有 API。 +- 现有 statefulMap/statefulMapConcat 的分配为 API 设计本质,非实现缺陷。 + +## 参考文件 +- stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +- JDK 24 Gatherer 官方文档 +- SmallRye Mutiny 源码 + +## 评估人 +- 由 Pekko 迁移小组(gpt-4.1)完成 +- 评估时间:2026-03-28 + +--- +如需详细设计/原型实现,请补充需求。 \ No newline at end of file diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md new file mode 100644 index 00000000000..6ee40af0a99 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md @@ -0,0 +1,62 @@ +# gather + +Transform each input element into zero or more output elements with a stateful gatherer. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Flow.gather](Flow) + +## Description + +`gather` creates a new gatherer for each materialization. The gatherer can keep mutable state and emit zero or more +elements for each incoming element by calling `push` on the provided collector. + +Unlike `statefulMap` and `statefulMapConcat`, the operator API itself does not require returning tuples or collections +for each input element. This makes `gather` a good fit for allocation-sensitive stateful transformations. + +Patterns such as `zipWithIndex`, `bufferUntilChanged`, and `distinctUntilChanged` can be expressed by keeping mutable +state inside the gatherer and pushing outputs directly, instead of returning a new state/output wrapper for every +element. + +When the stage terminates or restarts, the gatherer's `onComplete` callback is invoked. Elements pushed from +`onComplete` are emitted before upstream-failure propagation, normal completion, or supervision restart, and are +ignored on downstream cancellation or abrupt termination. + +The `gather` operator adheres to the ActorAttributes.SupervisionStrategy attribute. + +For one-to-one stateful mapping see @ref:[statefulMap](statefulMap.md). For iterable-based fan-out see +@ref:[statefulMapConcat](statefulMapConcat.md). + +## Examples + +In the first example, we implement a `zipWithIndex`-like transformation. + +Scala +: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #zipWithIndex } + +Java +: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #zipWithIndex } + +In the second example, we group incoming elements in batches of three and emit the trailing batch from `onComplete`. + +Scala +: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #grouped } + +Java +: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #grouped } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the gatherer emits an element and downstream is ready to consume it + +**backpressures** downstream backpressures + +**completes** upstream completes and all gathered elements have been emitted + +**cancels** downstream cancels + +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index f39c251bb8f..268ba92bfc7 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -165,6 +165,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[foldWhile](Source-or-Flow/foldWhile.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream.| |Source/Flow|@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`| |Flow|@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.| +|Source/Flow|@ref[gather](Source-or-Flow/gather.md)|Transform each input element into zero or more output elements with a stateful gatherer.| |Source/Flow|@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.| |Source/Flow|@ref[groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)|Partitions this stream into chunks by a delimiter function.| |Source/Flow|@ref[groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)|Partitions this stream into chunks by a delimiter function and a weight limit.| @@ -498,6 +499,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [futureFlow](Flow/futureFlow.md) * [futureSink](Sink/futureSink.md) * [futureSource](Source/futureSource.md) +* [gather](Source-or-Flow/gather.md) * [groupBy](Source-or-Flow/groupBy.md) * [grouped](Source-or-Flow/grouped.md) * [groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md) diff --git a/docs/src/test/java/jdocs/stream/operators/flow/Gather.java b/docs/src/test/java/jdocs/stream/operators/flow/Gather.java new file mode 100644 index 00000000000..bf3f3323dcc --- /dev/null +++ b/docs/src/test/java/jdocs/stream/operators/flow/Gather.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2022 Lightbend Inc. + */ + +package jdocs.stream.operators.flow; + +import java.util.ArrayList; +import java.util.Arrays; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.stream.javadsl.GatherCollector; +import org.apache.pekko.stream.javadsl.Gatherer; +import org.apache.pekko.stream.javadsl.Source; + +public class Gather { + static final ActorSystem system = null; + + public void zipWithIndex() { + // #zipWithIndex + Source.from(Arrays.asList("A", "B", "C", "D")) + .gather( + () -> + new Gatherer() { + private long index = 0L; + + @Override + public void apply(String elem, GatherCollector collector) { + collector.push("(" + elem + "," + index + ")"); + index += 1; + } + }) + .runForeach(System.out::println, system); + // prints + // (A,0) + // (B,1) + // (C,2) + // (D,3) + // #zipWithIndex + } + + public void grouped() { + // #grouped + Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .gather( + () -> + new Gatherer() { + private final ArrayList buffer = new ArrayList<>(3); + + @Override + public void apply(Integer elem, GatherCollector collector) { + buffer.add(elem); + if (buffer.size() == 3) { + collector.push(buffer.toString()); + buffer.clear(); + } + } + + @Override + public void onComplete(GatherCollector collector) { + if (!buffer.isEmpty()) { + collector.push(buffer.toString()); + } + } + }) + .runForeach(System.out::println, system); + // prints + // [1, 2, 3] + // [4, 5, 6] + // [7, 8, 9] + // [10] + // #grouped + } +} diff --git a/docs/src/test/scala/docs/stream/operators/flow/Gather.scala b/docs/src/test/scala/docs/stream/operators/flow/Gather.scala new file mode 100644 index 00000000000..41d05fb89d6 --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/flow/Gather.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2022 Lightbend Inc. + */ + +package docs.stream.operators.flow + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.scaladsl.{ GatherCollector, Gatherer, Source } + +object Gather { + + implicit val actorSystem: ActorSystem = ??? + + def zipWithIndex(): Unit = { + // #zipWithIndex + Source(List("A", "B", "C", "D")) + .gather(() => { + var index = 0L + (elem: String, collector: GatherCollector[(String, Long)]) => { + collector.push((elem, index)) + index += 1 + } + }) + .runForeach(println) + // prints + // (A,0) + // (B,1) + // (C,2) + // (D,3) + // #zipWithIndex + } + + def grouped(): Unit = { + // #grouped + Source(1 to 10) + .gather(() => + new Gatherer[Int, List[Int]] { + private var buffer = List.empty[Int] + + override def apply(elem: Int, collector: GatherCollector[List[Int]]): Unit = { + buffer = elem :: buffer + if (buffer.size == 3) { + collector.push(buffer.reverse) + buffer = Nil + } + } + + override def onComplete(collector: GatherCollector[List[Int]]): Unit = + if (buffer.nonEmpty) + collector.push(buffer.reverse) + }) + .runForeach(println) + // prints + // List(1, 2, 3) + // List(4, 5, 6) + // List(7, 8, 9) + // List(10) + // #grouped + } +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index f3058445066..90be09062a0 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -328,6 +328,64 @@ public void mustBeAbleToUseMapWithAutoCloseableResource() throws Exception { Assertions.assertEquals(1, closed.get()); } + @Test + public void mustBeAbleToUseGather() throws Exception { + final java.lang.Iterable input = Arrays.asList(1, 2, 3, 4, 5); + final CompletionStage grouped = + Source.from(input) + .via( + Flow.of(Integer.class) + .gather( + () -> + new Gatherer() { + private final ArrayList buffer = new ArrayList<>(2); + + @Override + public void apply(Integer elem, GatherCollector collector) { + buffer.add(elem); + if (buffer.size() == 2) { + collector.push(buffer.toString()); + buffer.clear(); + } + } + + @Override + public void onComplete(GatherCollector collector) { + if (!buffer.isEmpty()) { + collector.push(buffer.toString()); + } + } + })) + .runFold("", (acc, elem) -> acc + elem, system); + + Assertions.assertEquals( + "[1, 2][3, 4][5]", grouped.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void mustBeAbleToUseGatherAsDistinctUntilChanged() throws Exception { + final CompletionStage result = + Source.from(Arrays.asList("A", "B", "B", "C", "C", "D")) + .via( + Flow.of(String.class) + .gather( + () -> + new Gatherer() { + private String lastSeen = null; + + @Override + public void apply(String elem, GatherCollector collector) { + if (!elem.equals(lastSeen)) { + collector.push(elem); + lastSeen = elem; + } + } + })) + .runFold("", (acc, elem) -> acc + elem, system); + + Assertions.assertEquals("ABCD", result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + @Test public void mustBeAbleToUseFoldWhile() throws Exception { final int result = diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 3aec7545b09..4b6d5cc986d 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -936,6 +936,46 @@ public void mustBeAbleToUseMapWithResource() { Assertions.assertFalse(gate.get()); } + @Test + public void mustBeAbleToUseGather() throws Exception { + final CompletionStage result = + Source.from(Arrays.asList(1, 2, 3, 4, 5)) + .gather( + () -> + new Gatherer() { + private int sum = 0; + + @Override + public void apply(Integer elem, GatherCollector collector) { + sum += elem; + collector.push(sum); + } + }) + .runFold("", (acc, elem) -> acc + elem, system); + + Assertions.assertEquals("1361015", result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void mustBeAbleToUseGatherAsZipWithIndex() throws Exception { + final CompletionStage result = + Source.from(Arrays.asList("A", "B", "C", "D")) + .gather( + () -> + new Gatherer() { + private long index = 0L; + + @Override + public void apply(String elem, GatherCollector collector) { + collector.push(elem + ":" + index); + index += 1; + } + }) + .runFold("", (acc, elem) -> acc + elem, system); + + Assertions.assertEquals("A:0B:1C:2D:3", result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + @Test public void mustBeAbleToUseMapWithAutoCloseableResource() throws Exception { final TestKit probe = new TestKit(system); diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala new file mode 100644 index 00000000000..23e552436ab --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala @@ -0,0 +1,764 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import java.util.concurrent.atomic.AtomicInteger + +import scala.annotation.nowarn +import scala.concurrent.{ Await, Promise } +import scala.concurrent.duration.DurationInt +import scala.util.Success +import scala.util.control.NoStackTrace + +import org.apache.pekko.Done +import org.apache.pekko.stream.{ AbruptStageTerminationException, ActorAttributes, ActorMaterializer, ClosedShape, Supervision } +import org.apache.pekko.stream.testkit.{ StreamSpec, TestSubscriber } +import org.apache.pekko.stream.testkit.Utils.TE +import org.apache.pekko.stream.testkit.scaladsl.{ TestSink, TestSource } +import org.apache.pekko.testkit.EventFilter + +class FlowGatherSpec extends StreamSpec { + + private val ex = new Exception("TEST") with NoStackTrace + + object BeenCalledTimesGate { + def apply(): BeenCalledTimesGate = new BeenCalledTimesGate(1) + def apply(nTimes: Int): BeenCalledTimesGate = new BeenCalledTimesGate(nTimes) + } + + class BeenCalledTimesGate(nTimes: Int) { + private val beenCalled = new AtomicInteger(0) + + def mark(): Unit = beenCalled.updateAndGet { current => + if (current == nTimes) + throw new IllegalStateException(s"Has been called:[$nTimes] times, should not be called anymore.") + else current + 1 + } + + def ensure(): Unit = + if (beenCalled.get() != nTimes) + throw new IllegalStateException(s"Expected to be called:[$nTimes], but only be called:[$beenCalled]") + } + + "A Gather" must { + "work in the happy case" in { + val gate = BeenCalledTimesGate() + Source(List(1, 2, 3, 4, 5)) + .gather(() => + new Gatherer[Int, (Int, Int)] { + private var agg = 0 + + override def apply(elem: Int, collector: GatherCollector[(Int, Int)]): Unit = { + collector.push((agg, elem)) + agg += elem + } + + override def onComplete(collector: GatherCollector[(Int, Int)]): Unit = + gate.mark() + }) + .runWith(TestSink[(Int, Int)]()) + .request(6) + .expectNext((0, 1)) + .expectNext((1, 2)) + .expectNext((3, 3)) + .expectNext((6, 4)) + .expectNext((10, 5)) + .expectComplete() + gate.ensure() + } + + "remember state when complete" in { + val gate = BeenCalledTimesGate() + Source(1 to 10) + .gather(() => + new Gatherer[Int, List[Int]] { + private var state = List.empty[Int] + + override def apply(elem: Int, collector: GatherCollector[List[Int]]): Unit = { + val newState = elem :: state + if (newState.size == 3) { + state = Nil + collector.push(newState.reverse) + } else + state = newState + } + + override def onComplete(collector: GatherCollector[List[Int]]): Unit = { + gate.mark() + collector.push(state.reverse) + } + }) + .mapConcat(identity) + .runWith(TestSink[Int]()) + .request(10) + .expectNextN(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .expectComplete() + gate.ensure() + } + + "emit zero or more elements and drain on completion" in { + Source(1 to 5) + .gather(() => + new Gatherer[Int, List[Int]] { + private var buffer = List.empty[Int] + + override def apply(elem: Int, collector: GatherCollector[List[Int]]): Unit = { + buffer = elem :: buffer + if (buffer.size == 2) { + collector.push(buffer.reverse) + buffer = Nil + } + } + + override def onComplete(collector: GatherCollector[List[Int]]): Unit = + if (buffer.nonEmpty) + collector.push(buffer.reverse) + }) + .runWith(TestSink[List[Int]]()) + .request(3) + .expectNext(List(1, 2)) + .expectNext(List(3, 4)) + .expectNext(List(5)) + .expectComplete() + } + + "emit all outputs when a callback deopts from single to multi mode" in { + Source.single(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + collector.push(elem) + collector.push(elem + 1) + } + }) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1, 2) + .expectComplete() + } + + "drop a single buffered output if apply throws after pushing it" in { + Source.single(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + collector.push(elem) + throw TE("boom") + } + }) + .runWith(TestSink[Int]()) + .request(1) + .expectError(TE("boom")) + } + + "resume without leaking a single buffered output if apply throws after pushing it" in { + Source(List(1, 2)) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + collector.push(elem) + if (elem == 1) + throw ex + } + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(2) + .expectComplete() + } + + "be usable as zipWithIndex" in { + val gate = BeenCalledTimesGate() + Source(List("A", "B", "C", "D")) + .gather(() => + new Gatherer[String, (String, Long)] { + private var index = 0L + + override def apply(elem: String, collector: GatherCollector[(String, Long)]): Unit = { + collector.push((elem, index)) + index += 1 + } + + override def onComplete(collector: GatherCollector[(String, Long)]): Unit = + gate.mark() + }) + .runWith(TestSink[(String, Long)]()) + .request(4) + .expectNext(("A", 0L)) + .expectNext(("B", 1L)) + .expectNext(("C", 2L)) + .expectNext(("D", 3L)) + .expectComplete() + gate.ensure() + } + + "respect backpressure for public single-output gatherers" in { + Source(List("A", "B", "C")) + .gather(() => + new Gatherer[String, String] { + override def apply(elem: String, collector: GatherCollector[String]): Unit = + collector.push(elem) + }) + .runWith(TestSink[String]()) + .request(1) + .expectNext("A") + .expectNoMessage(200.millis) + .request(1) + .expectNext("B") + .request(1) + .expectNext("C") + .expectComplete() + } + + "respect backpressure for one-to-one gatherers" in { + Source(List("A", "B", "C")) + .gather(() => + new OneToOneGatherer[String, String] { + override def applyOne(elem: String): String = elem + }) + .runWith(TestSink[String]()) + .request(1) + .expectNext("A") + .expectNoMessage(200.millis) + .request(1) + .expectNext("B") + .request(1) + .expectNext("C") + .expectComplete() + } + + "be usable as bufferUntilChanged" in { + val gate = BeenCalledTimesGate() + Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) + .gather(() => + new Gatherer[String, List[String]] { + private var buffer = List.empty[String] + + override def apply(elem: String, collector: GatherCollector[List[String]]): Unit = + buffer match { + case head :: _ if head != elem => + collector.push(buffer.reverse) + buffer = elem :: Nil + case _ => + buffer = elem :: buffer + } + + override def onComplete(collector: GatherCollector[List[String]]): Unit = { + gate.mark() + if (buffer.nonEmpty) + collector.push(buffer.reverse) + } + }) + .runWith(TestSink[List[String]]()) + .request(4) + .expectNext(List("A")) + .expectNext(List("B", "B")) + .expectNext(List("C", "C", "C")) + .expectNext(List("D")) + .expectComplete() + gate.ensure() + } + + "be usable as distinctUntilChanged" in { + val gate = BeenCalledTimesGate() + Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) + .gather(() => + new Gatherer[String, String] { + private var lastElement: Option[String] = None + + override def apply(elem: String, collector: GatherCollector[String]): Unit = + lastElement match { + case Some(last) if last == elem => + lastElement = Some(elem) + case _ => + lastElement = Some(elem) + collector.push(elem) + } + + override def onComplete(collector: GatherCollector[String]): Unit = + gate.mark() + }) + .runWith(TestSink[String]()) + .request(4) + .expectNext("A") + .expectNext("B") + .expectNext("C") + .expectNext("D") + .expectComplete() + gate.ensure() + } + + "resume when supervision says Resume" in { + Source(List(1, 2, 3, 4, 5)) + .gather(() => + new Gatherer[Int, Int] { + private var sum = 0 + + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + if (elem % 2 == 0) + throw ex + else { + sum += elem + collector.push(sum) + } + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Int]()) + .request(3) + .expectNext(1, 4, 9) + .expectComplete() + } + + "emit onComplete elements before restarting" in { + val generation = new AtomicInteger(0) + val (source, sink) = TestSource[String]() + .viaMat(Flow[String].gather(() => { + val currentGeneration = generation.incrementAndGet() + new Gatherer[String, String] { + override def apply(elem: String, collector: GatherCollector[String]): Unit = + if (elem == "boom") throw TE("boom") + else collector.push(s"$elem$currentGeneration") + + override def onComplete(collector: GatherCollector[String]): Unit = + collector.push(s"onClose$currentGeneration") + } + }))(Keep.left) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .toMat(TestSink())(Keep.both) + .run() + + sink.request(1) + source.sendNext("one") + sink.expectNext("one1") + sink.request(1) + source.sendNext("boom") + sink.expectNext("onClose1") + sink.request(1) + source.sendNext("two") + sink.expectNext("two2") + sink.cancel() + source.expectCancellation() + } + + "restart and recreate gatherer state when supervision says Restart" in { + val generation = new AtomicInteger(0) + Source(List(1, 2, 3, 4, 5)) + .gather(() => { + generation.incrementAndGet() + new Gatherer[Int, (Int, Int)] { + private var agg = 0 + + override def apply(elem: Int, collector: GatherCollector[(Int, Int)]): Unit = + if (elem % 3 == 0) + throw ex + else { + collector.push((agg, elem)) + agg += elem + } + } + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(TestSink[(Int, Int)]()) + .request(5) + .expectNext((0, 1)) + .expectNext((1, 2)) + .expectNext((0, 4)) + .expectNext((4, 5)) + .expectComplete() + generation.get() shouldBe 2 + } + + "stop when supervision says Stop" in { + val gate = BeenCalledTimesGate() + Source(List(1, 2, 3, 4, 5)) + .gather(() => + new Gatherer[Int, (Int, Int)] { + private var agg = 0 + + override def apply(elem: Int, collector: GatherCollector[(Int, Int)]): Unit = + if (elem % 3 == 0) + throw ex + else { + collector.push((agg, elem)) + agg += elem + } + + override def onComplete(collector: GatherCollector[(Int, Int)]): Unit = + gate.mark() + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(TestSink[(Int, Int)]()) + .request(5) + .expectNext((0, 1)) + .expectNext((1, 2)) + .expectError(ex) + gate.ensure() + } + + "fail on upstream failure when onComplete emits nothing" in { + val gate = BeenCalledTimesGate() + val (source, sink) = TestSource[Int]() + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = + gate.mark() + }) + .toMat(TestSink[Int]())(Keep.both) + .run() + + sink.request(3) + source.sendNext(1) + sink.expectNext(1) + source.sendNext(2) + sink.expectNext(2) + source.sendError(ex) + sink.expectError(ex) + gate.ensure() + } + + "defer upstream failure until onComplete elements are emitted" in { + val (source, sink) = TestSource[Int]() + .gather(() => + new Gatherer[Int, Int] { + private var sum = 0 + + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + sum += elem + collector.push(sum) + } + + override def onComplete(collector: GatherCollector[Int]): Unit = + collector.push(-1) + }) + .toMat(TestSink[Int]())(Keep.both) + .run() + + sink.request(3) + source.sendNext(1) + sink.expectNext(1) + source.sendNext(2) + sink.expectNext(3) + source.sendError(ex) + sink.expectNext(-1) + sink.expectError(ex) + } + + "emit buffered elements before failing when supervision stops the stage" in { + Source(List(1, 2, 3)) + .gather(() => + new Gatherer[Int, List[Int]] { + private var buffer = List.empty[Int] + + override def apply(elem: Int, collector: GatherCollector[List[Int]]): Unit = + if (elem == 3) + throw ex + else + buffer = elem :: buffer + + override def onComplete(collector: GatherCollector[List[Int]]): Unit = + if (buffer.nonEmpty) + collector.push(buffer.reverse) + }) + .runWith(TestSink[List[Int]]()) + .request(2) + .expectNext(List(1, 2)) + .expectError(ex) + } + + "call onComplete when supervision stops the stage" in { + val gate = BeenCalledTimesGate() + val promise = Promise[Done]() + val done = Source + .single(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + throw ex + + override def onComplete(collector: GatherCollector[Int]): Unit = { + gate.mark() + promise.complete(Success(Done)) + } + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.ignore) + + done.failed.futureValue shouldBe ex + Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() + } + + "cancel upstream when downstream cancels" in { + val gate = BeenCalledTimesGate() + val promise = Promise[Done]() + val source = TestSource[Int]() + .via(Flow[Int].gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + gate.mark() + promise.complete(Success(Done)) + } + })) + .toMat(Sink.cancelled)(Keep.left) + .run() + + source.expectCancellation() + Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() + } + + "cancel upstream when downstream fails" in { + val gate = BeenCalledTimesGate() + val promise = Promise[Done]() + val testProbe = TestSubscriber.probe[Int]() + val source = TestSource[Int]() + .via(Flow[Int].gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + gate.mark() + promise.complete(Success(Done)) + } + })) + .toMat(Sink.fromSubscriber(testProbe))(Keep.left) + .run() + + testProbe.cancel(ex) + source.expectCancellationWithCause(ex) + Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() + } + + "invoke onComplete exactly once when downstream cancels while draining final elements" in { + val closedCounter = new AtomicInteger(0) + val (source, sink) = TestSource[Int]() + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + closedCounter.incrementAndGet() + collector.push(100) + collector.push(200) + } + }) + .toMat(TestSink[Int]())(Keep.both) + .run() + + sink.request(2) + source.sendNext(1) + sink.expectNext(1) + source.sendComplete() + sink.expectNext(100) + sink.cancel() + closedCounter.get() shouldBe 1 + } + + "not restart gatherer when downstream cancels while draining restart elements" in { + val generation = new AtomicInteger(0) + val (source, sink) = TestSource[Int]() + .gather(() => { + val currentGeneration = generation.incrementAndGet() + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + throw TE(s"boom-$currentGeneration") + + override def onComplete(collector: GatherCollector[Int]): Unit = { + collector.push(100 + currentGeneration) + collector.push(200 + currentGeneration) + } + } + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .toMat(TestSink[Int]())(Keep.both) + .run() + + sink.request(1) + source.sendNext(1) + sink.expectNext(101) + sink.cancel() + generation.get() shouldBe 1 + } + + "fail when emitting null" in { + Source.single(1) + .gather(() => + new Gatherer[Int, String] { + override def apply(elem: Int, collector: GatherCollector[String]): Unit = + collector.push(null.asInstanceOf[String]) + }) + .runWith(TestSink[String]()) + .request(1) + .expectError() shouldBe a[NullPointerException] + } + + "call onComplete on abrupt materializer termination" in { + val gate = BeenCalledTimesGate() + val promise = Promise[Done]() + @nowarn("msg=deprecated") + val mat = ActorMaterializer() + + val matVal = Source + .single(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + gate.mark() + promise.complete(Success(Done)) + } + }) + .runWith(Sink.never)(mat) + + mat.shutdown() + matVal.failed.futureValue shouldBe a[AbruptStageTerminationException] + Await.result(promise.future, 3.seconds) shouldBe Done + gate.ensure() + } + + "will not call onComplete twice if apply fails" in { + val closedCounter = new AtomicInteger(0) + val probe = Source + .repeat(1) + .gather(() => + new Gatherer[Int, String] { + override def apply(elem: Int, collector: GatherCollector[String]): Unit = + throw TE("failing read") + + override def onComplete(collector: GatherCollector[String]): Unit = + closedCounter.incrementAndGet() + }) + .runWith(TestSink[String]()) + + probe.request(1) + probe.expectError(TE("failing read")) + closedCounter.get() shouldBe 1 + } + + "will not call onComplete twice if both apply and onComplete fail" in { + val closedCounter = new AtomicInteger(0) + val probe = Source + .repeat(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + throw TE("failing read") + + override def onComplete(collector: GatherCollector[Int]): Unit = + if (closedCounter.incrementAndGet() == 1) + throw TE("boom") + }) + .runWith(TestSink[Int]()) + + EventFilter[TE](occurrences = 1).intercept { + probe.request(1) + probe.expectError(TE("boom")) + } + closedCounter.get() shouldBe 1 + } + + "will not call onComplete twice on cancel when onComplete fails" in { + val closedCounter = new AtomicInteger(0) + val (source, sink) = TestSource[Int]() + .viaMat(Flow[Int].gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + closedCounter.incrementAndGet() + throw TE("boom") + } + }))(Keep.left) + .toMat(TestSink[Int]())(Keep.both) + .run() + + EventFilter[TE](occurrences = 1).intercept { + sink.request(1) + source.sendNext(1) + sink.expectNext(1) + sink.cancel() + source.expectCancellation() + } + closedCounter.get() shouldBe 1 + } + + "will not call onComplete twice if onComplete fails on upstream complete" in { + val closedCounter = new AtomicInteger(0) + val (source, sink) = TestSource[Int]() + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = { + closedCounter.incrementAndGet() + throw TE("boom") + } + }) + .toMat(TestSink[Int]())(Keep.both) + .run() + + EventFilter[TE](occurrences = 1).intercept { + sink.request(1) + source.sendNext(1) + sink.expectNext(1) + sink.request(1) + source.sendComplete() + sink.expectError(TE("boom")) + } + closedCounter.get() shouldBe 1 + } + } + + "support junction output ports" in { + val source = Source(List((1, 1), (2, 2))) + val graph = RunnableGraph.fromGraph(GraphDSL.createGraph(TestSink[(Int, Int)]()) { implicit b => sink => + import GraphDSL.Implicits._ + val unzip = b.add(Unzip[Int, Int]()) + val zip = b.add(Zip[Int, Int]()) + val gather = b.add(Flow[(Int, Int)].gather(() => (elem: (Int, Int), collector: GatherCollector[(Int, Int)]) => collector.push(elem))) + + source ~> unzip.in + unzip.out0 ~> zip.in0 + unzip.out1 ~> zip.in1 + zip.out ~> gather ~> sink.in + + ClosedShape + }) + + graph + .run() + .request(2) + .expectNext((1, 1)) + .expectNext((2, 2)) + .expectComplete() + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 6de53600bd7..09459906c02 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -74,6 +74,7 @@ import pekko.stream.Attributes._ val batchWeighted = name("batchWeighted") val expand = name("expand") val statefulMap = name("statefulMap") + val gather = name("gather") val statefulMapConcat = name("statefulMapConcat") val mapConcat = name("mapConcat") val detacher = name("detacher") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index d214479d4ec..a9b39552003 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -48,6 +48,9 @@ import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SimpleLinearGraphStage, SingleSource } import pekko.stream.scaladsl.{ DelayStrategy, + GatherCollector, + Gatherer, + OneToOneGatherer, Source, StatefulMapConcatAccumulator, StatefulMapConcatAccumulatorFactory @@ -2328,6 +2331,293 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) = override def toString = "StatefulMap" } +/** + * INTERNAL API + */ +@InternalApi +private[pekko] final class Gather[In, Out](factory: () => Gatherer[In, Out]) extends GraphStage[FlowShape[In, Out]] { + require(factory != null, "gatherer factory should not be null") + + private val in = Inlet[In]("Gather.in") + private val out = Outlet[Out]("Gather.out") + override val shape: FlowShape[In, Out] = FlowShape(in, out) + + override def initialAttributes: Attributes = DefaultAttributes.gather and SourceLocation.forLambda(factory) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private object FinalAction { + final val None = 0 + final val Complete = 1 + final val Restart = 2 + final val Fail = 3 + } + + private lazy val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + private val contextPropagation = ContextPropagation() + private val noopCollector = new GatherCollector[Out] { + override def push(elem: Out): Unit = () + } + private val singleCollector = new GatherCollector[Out] { + override def push(elem: Out): Unit = pushSingleCallbackOutput(elem) + } + private val pendingCollector = new GatherCollector[Out] { + override def push(elem: Out): Unit = enqueuePendingOutput(elem) + } + private var callbackFirst: Out = _ + private var hasCallbackFirst = false + private var pendingFirst: Out = _ + private var pendingOverflow: java.util.ArrayDeque[Out] = _ + private var hasPendingFirst = false + private var multiMode = false + private var gatherer: Gatherer[In, Out] = _ + private var oneToOneGatherer: OneToOneGatherer[In, Out] = _ + private var finalAction = FinalAction.None + private var finalFailure: Throwable = null + private var needInvokeOnCompleteCallback = false + private var downstreamFinished = false + + override def preStart(): Unit = { + restartGatherer() + pull(in) + } + + override def onPush(): Unit = + try { + if (oneToOneGatherer ne null) onPushOneToOne() + else if (multiMode) onPushMulti() + else onPushSingle() + } catch { + case NonFatal(ex) => + clearPending() + if (!downstreamFinished) + decider(ex) match { + case Supervision.Stop => invokeOnCompleteAndThen(FinalAction.Fail, ex) + case Supervision.Resume => maybePull() + case Supervision.Restart => invokeOnCompleteAndThen(FinalAction.Restart) + } + } + + override def onPull(): Unit = + if (hasPendingFirst) { + if (multiMode) + pushPendingMulti(shouldResumeContext = needInvokeOnCompleteCallback) + else + pushPendingSingle(shouldResumeContext = needInvokeOnCompleteCallback) + } else maybePull() + + override def onUpstreamFinish(): Unit = + if (hasPending) { + if (finalAction != FinalAction.Fail) + finalAction = FinalAction.Complete + } else invokeOnCompleteAndThen(FinalAction.Complete) + + override def onUpstreamFailure(ex: Throwable): Unit = + if (hasPending) { + finalFailure = ex + finalAction = FinalAction.Fail + } else invokeOnCompleteAndThen(FinalAction.Fail, ex) + + override def onDownstreamFinish(cause: Throwable): Unit = { + downstreamFinished = true + if (needInvokeOnCompleteCallback) { + needInvokeOnCompleteCallback = false + gatherer.onComplete(noopCollector) + } + super.onDownstreamFinish(cause) + } + + override def postStop(): Unit = { + if (needInvokeOnCompleteCallback) + gatherer.onComplete(noopCollector) + } + + private def enqueuePendingOutput(elem: Out): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(elem) + if (hasPendingFirst) { + multiMode = true + if (pendingOverflow eq null) + pendingOverflow = new java.util.ArrayDeque[Out]() + pendingOverflow.addLast(elem) + } else { + pendingFirst = elem + hasPendingFirst = true + } + } + + private def pushSingleCallbackOutput(elem: Out): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(elem) + if (hasCallbackFirst) { + pendingFirst = callbackFirst + hasPendingFirst = true + callbackFirst = null.asInstanceOf[Out] + hasCallbackFirst = false + multiMode = true + if (pendingOverflow eq null) + pendingOverflow = new java.util.ArrayDeque[Out]() + pendingOverflow.addLast(elem) + } else { + callbackFirst = elem + hasCallbackFirst = true + } + } + + private def hasPending: Boolean = hasPendingFirst + + private def onPushOneToOne(): Unit = { + val elem = oneToOneGatherer.applyOne(grab(in)) + ReactiveStreamsCompliance.requireNonNullElement(elem) + if (isAvailable(out)) + push(out, elem) + else { + pendingFirst = elem + hasPendingFirst = true + contextPropagation.suspendContext() + } + } + + private def onPushSingle(): Unit = { + gatherer(grab(in), singleCollector) + if (hasCallbackFirst) + pushCallbackSingle() + else if (hasPendingFirst && isAvailable(out)) { + if (multiMode) + pushPendingMulti(shouldResumeContext = false) + else + pushPendingSingle(shouldResumeContext = false) + } else if (hasPendingFirst) + contextPropagation.suspendContext() + else + maybePull() + } + + private def onPushMulti(): Unit = { + gatherer(grab(in), pendingCollector) + if (hasPendingFirst && isAvailable(out)) + pushPendingMulti(shouldResumeContext = false) + else if (hasPendingFirst) + contextPropagation.suspendContext() + else + maybePull() + } + + private def maybePull(): Unit = + if (!isClosed(in) && !hasBeenPulled(in)) + pull(in) + + private def restartGatherer(): Unit = { + gatherer = factory() + oneToOneGatherer = gatherer match { + case specialized: OneToOneGatherer[In, Out] @unchecked => specialized + case _ => null + } + multiMode = false + needInvokeOnCompleteCallback = true + } + + private def clearPending(): Unit = { + callbackFirst = null.asInstanceOf[Out] + hasCallbackFirst = false + pendingFirst = null.asInstanceOf[Out] + hasPendingFirst = false + if (pendingOverflow ne null) + pendingOverflow.clear() + } + + private def pushCallbackSingle(): Unit = { + val elem = callbackFirst + callbackFirst = null.asInstanceOf[Out] + hasCallbackFirst = false + + if (isAvailable(out)) + push(out, elem) + else { + pendingFirst = elem + hasPendingFirst = true + contextPropagation.suspendContext() + } + } + + private def pushPendingSingle(shouldResumeContext: Boolean): Unit = { + val hadContext = needInvokeOnCompleteCallback + if (shouldResumeContext && hadContext) + contextPropagation.resumeContext() + + val elem = pendingFirst + pendingFirst = null.asInstanceOf[Out] + hasPendingFirst = false + + push(out, elem) + maybeRunFinalAction(hadContext) + } + + private def pushPendingMulti(shouldResumeContext: Boolean): Unit = { + val hadContext = needInvokeOnCompleteCallback + if (shouldResumeContext && hadContext) + contextPropagation.resumeContext() + + push(out, pendingFirst) + + if ((pendingOverflow ne null) && !pendingOverflow.isEmpty) { + pendingFirst = pendingOverflow.removeFirst() + if (hadContext) + contextPropagation.suspendContext() + } else { + pendingFirst = null.asInstanceOf[Out] + hasPendingFirst = false + maybeRunFinalAction(hadContext) + } + } + + private def maybeRunFinalAction(hadContext: Boolean): Unit = { + if (downstreamFinished || isClosed(out)) { + finalAction = FinalAction.None + finalFailure = null + } else if (finalAction == FinalAction.None) + maybePull() + else { + val action = finalAction + val failure = finalFailure + finalAction = FinalAction.None + finalFailure = null + if (hadContext) + invokeOnCompleteAndThen(action, failure) + else + execute(action, failure) + } + } + + private def invokeOnCompleteAndThen(action: Int, failure: Throwable = null): Unit = { + needInvokeOnCompleteCallback = false + gatherer.onComplete(pendingCollector) + if (hasPending) { + finalAction = action + finalFailure = failure + if (isAvailable(out)) + if (multiMode) + pushPendingMulti(shouldResumeContext = false) + else + pushPendingSingle(shouldResumeContext = false) + } else + execute(action, failure) + } + + private def execute(action: Int, failure: Throwable): Unit = + action match { + case FinalAction.None => maybePull() + case FinalAction.Complete => completeStage() + case FinalAction.Fail => failStage(failure) + case FinalAction.Restart => + restartGatherer() + maybePull() + } + + setHandlers(in, out, this) + } + + override def toString = "Gather" +} + /** * INTERNAL API */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 1622dad4c6e..31cc926ecff 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -868,6 +868,52 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr Optional.empty() }) + /** + * Transform each input element into zero or more output elements without requiring tuple or collection allocations + * imposed by the operator API itself. + * + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields. + * The provided [[GatherCollector]] can emit zero or more output elements for each input element. + * + * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. + * + * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion, + * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart. + * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart, + * and are ignored on downstream cancellation and abrupt termination. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the gatherer emits an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete` + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.gather(() => + new scaladsl.Gatherer[Out, T] { + private val gatherer = create.create() + private var currentCollector: scaladsl.GatherCollector[T] = _ + private val javaCollector = new javadsl.GatherCollector[T] { + override def push(elem: T): Unit = currentCollector.push(elem) + } + + override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.apply(in, javaCollector) + } + + override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.onComplete(javaCollector) + } + })) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala new file mode 100644 index 00000000000..d5c3b1d0914 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.javadsl + +import org.apache.pekko.annotation.DoNotInherit +import org.apache.pekko.japi.function + +/** + * Collector passed to [[Gatherer]] for emitting output elements. + * + * The collector is only valid while the current [[Gatherer]] callback is running. + * + * @since 1.3.0 + */ +@DoNotInherit +trait GatherCollector[-Out] extends function.Procedure[Out] { + def push(elem: Out): Unit + + final override def apply(param: Out): Unit = push(param) +} + +/** + * A stateful gatherer for the `gather` operator. + * + * A new gatherer instance is created for each materialization and on each supervision restart. + * It can keep mutable state in fields. + * + * @since 1.3.0 + */ +@FunctionalInterface +trait Gatherer[-In, Out] extends function.Procedure2[In, GatherCollector[Out]] { + + /** + * Called once whenever the stage terminates or restarts: on upstream completion, upstream failure, + * downstream cancellation, abrupt stage termination, or when the stage is restarted due to supervision. + * + * Elements pushed to the collector are emitted only on upstream completion, upstream failure, + * or supervision restart. They are ignored on downstream cancellation and abrupt termination. + */ + def onComplete(collector: GatherCollector[Out]): Unit = () +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index fe5b1f5ef0f..513db5db6b7 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2761,6 +2761,52 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ Optional.empty() }) + /** + * Transform each input element into zero or more output elements without requiring tuple or collection allocations + * imposed by the operator API itself. + * + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields. + * The provided [[GatherCollector]] can emit zero or more output elements for each input element. + * + * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. + * + * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion, + * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart. + * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart, + * and are ignored on downstream cancellation and abrupt termination. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the gatherer emits an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete` + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.Source[T, Mat] = + new Source(delegate.gather(() => + new scaladsl.Gatherer[Out, T] { + private val gatherer = create.create() + private var currentCollector: scaladsl.GatherCollector[T] = _ + private val javaCollector = new javadsl.GatherCollector[T] { + override def push(elem: T): Unit = currentCollector.push(elem) + } + + override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.apply(in, javaCollector) + } + + override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.onComplete(javaCollector) + } + })) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 14f525bb70f..b8924956075 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -345,6 +345,36 @@ final class SubFlow[In, Out, Mat]( Optional.empty() }) + /** + * Transform each input element into zero or more output elements without requiring tuple or collection allocations + * imposed by the operator API itself. + * + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields. + * `onComplete` is invoked on upstream completion, upstream failure, downstream cancellation, + * abrupt stage termination, and supervision restart. + * + * @since 1.3.0 + */ + def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.SubFlow[In, T, Mat] = + new SubFlow(delegate.gather(() => + new scaladsl.Gatherer[Out, T] { + private val gatherer = create.create() + private var currentCollector: scaladsl.GatherCollector[T] = _ + private val javaCollector = new javadsl.GatherCollector[T] { + override def push(elem: T): Unit = currentCollector.push(elem) + } + + override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.apply(in, javaCollector) + } + + override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.onComplete(javaCollector) + } + })) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 7edb90afd92..b76dc2c76f8 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -336,6 +336,36 @@ final class SubSource[Out, Mat]( Optional.empty() }) + /** + * Transform each input element into zero or more output elements without requiring tuple or collection allocations + * imposed by the operator API itself. + * + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields. + * `onComplete` is invoked on upstream completion, upstream failure, downstream cancellation, + * abrupt stage termination, and supervision restart. + * + * @since 1.3.0 + */ + def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.SubSource[T, Mat] = + new SubSource(delegate.gather(() => + new scaladsl.Gatherer[Out, T] { + private val gatherer = create.create() + private var currentCollector: scaladsl.GatherCollector[T] = _ + private val javaCollector = new javadsl.GatherCollector[T] { + override def push(elem: T): Unit = currentCollector.push(elem) + } + + override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.apply(in, javaCollector) + } + + override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = { + currentCollector = collector + gatherer.onComplete(javaCollector) + } + })) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 49d0c8382ef..6581d8759b5 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1264,6 +1264,35 @@ trait FlowOps[+Out, +Mat] { None }) + /** + * Transform each input element into zero or more output elements without requiring tuple or collection allocations + * imposed by the operator API itself. + * + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields or closures. + * The provided [[GatherCollector]] can emit zero or more output elements for each input element. + * + * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. + * + * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion, + * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart. + * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart, + * and are ignored on downstream cancellation and abrupt termination. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the gatherer emits an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete` + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def gather[T](create: () => Gatherer[Out, T]): Repr[T] = + via(new Gather[Out, T](create)) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala new file mode 100644 index 00000000000..88fa030657b --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko.annotation.DoNotInherit + +/** + * Collector passed to [[Gatherer]] for emitting output elements. + * + * The collector is only valid while the current [[Gatherer]] callback is running. + * + * @since 1.3.0 + */ +@DoNotInherit +trait GatherCollector[-Out] { + def push(elem: Out): Unit +} + +/** + * A stateful gatherer for the `gather` operator. + * + * A new gatherer instance is created for each materialization and on each supervision restart. + * It can keep mutable state in fields or closures. + * + * @since 1.3.0 + */ +@FunctionalInterface +trait Gatherer[-In, +Out] { + def apply(in: In, collector: GatherCollector[Out]): Unit + + /** + * Called once whenever the stage terminates or restarts: on upstream completion, upstream failure, + * downstream cancellation, abrupt stage termination, or when the stage is restarted due to supervision. + * + * Elements pushed to the collector are emitted only on upstream completion, upstream failure, + * or supervision restart. They are ignored on downstream cancellation and abrupt termination. + */ + def onComplete(collector: GatherCollector[Out]): Unit = () +} + +/** + * INTERNAL API + */ +@DoNotInherit +@FunctionalInterface +private[stream] trait OneToOneGatherer[-In, +Out] extends Gatherer[In, Out] { + def applyOne(in: In): Out + + final override def apply(in: In, collector: GatherCollector[Out]): Unit = + collector.push(applyOne(in)) +} From f26f4788b106ae14615a5c8e8f91bf533bb4116a Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sun, 29 Mar 2026 23:54:35 +0800 Subject: [PATCH 2/2] stream: refine gather docs and @since markers Motivation: follow-up review and documentation work for the new gather operator. Modification: correct the new gather API @since annotations to 2.0.0 and document how gather coverage aligns with statefulMap semantics while differing on internal null-state handling. Result: the published PR now reflects the intended release version and explains the remaining semantic differences more clearly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../main/paradox/stream/operators/Source-or-Flow/gather.md | 7 +++++++ .../main/scala/org/apache/pekko/stream/javadsl/Flow.scala | 2 +- .../scala/org/apache/pekko/stream/javadsl/Gather.scala | 4 ++-- .../scala/org/apache/pekko/stream/javadsl/Source.scala | 2 +- .../scala/org/apache/pekko/stream/javadsl/SubFlow.scala | 2 +- .../scala/org/apache/pekko/stream/javadsl/SubSource.scala | 2 +- .../main/scala/org/apache/pekko/stream/scaladsl/Flow.scala | 2 +- .../scala/org/apache/pekko/stream/scaladsl/Gather.scala | 4 ++-- 8 files changed, 16 insertions(+), 9 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md index 6ee40af0a99..3c2c5ef7f60 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md @@ -20,6 +20,13 @@ Patterns such as `zipWithIndex`, `bufferUntilChanged`, and `distinctUntilChanged state inside the gatherer and pushing outputs directly, instead of returning a new state/output wrapper for every element. +Compared with `statefulMap`, `gather` covers the same common stateful streaming patterns used in this PR's test suite: +happy-path stateful mapping, delayed completion output, restart/stop supervision behavior, and backpressure-sensitive +one-output transformations. The main difference is that `statefulMap` exposes state as an explicit return value, +including `null` state transitions, while `gather` keeps state inside the gatherer instance itself. Because of that, +`statefulMap` tests about `null` state do not translate one-to-one; the equivalent `gather` coverage focuses on the +observable stream behavior instead. + When the stage terminates or restarts, the gatherer's `onComplete` callback is invoked. Elements pushed from `onComplete` are emitted before upstream-failure propagation, normal completion, or supervision restart, and are ignored on downstream cancellation or abrupt termination. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 31cc926ecff..fdd2dae31ab 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -693,7 +693,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels * - * @since 1.3.0 + * @since 2.0.0 */ def mapOption[T](f: function.Function[Out, Optional[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.map(f(_)).collect { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala index d5c3b1d0914..b56f24dd40a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala @@ -25,7 +25,7 @@ import org.apache.pekko.japi.function * * The collector is only valid while the current [[Gatherer]] callback is running. * - * @since 1.3.0 + * @since 2.0.0 */ @DoNotInherit trait GatherCollector[-Out] extends function.Procedure[Out] { @@ -40,7 +40,7 @@ trait GatherCollector[-Out] extends function.Procedure[Out] { * A new gatherer instance is created for each materialization and on each supervision restart. * It can keep mutable state in fields. * - * @since 1.3.0 + * @since 2.0.0 */ @FunctionalInterface trait Gatherer[-In, Out] extends function.Procedure2[In, GatherCollector[Out]] { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 513db5db6b7..cb4f2ae576d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -209,7 +209,7 @@ object Source { /** * Create a `Source` from an `Optional` value, emitting the value if it is present. * - * @since 1.3.0 + * @since 2.0.0 */ def fromOption[T](optional: Optional[T]): Source[T, NotUsed] = if (optional.isPresent) single(optional.get()) else empty() diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index b8924956075..80fe136d506 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -168,7 +168,7 @@ final class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels * - * @since 1.3.0 + * @since 2.0.0 */ def mapOption[T](f: function.Function[Out, Optional[T]]): SubFlow[In, T, Mat] = new SubFlow(delegate.map(f(_)).collect { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index b76dc2c76f8..e5d80fb7347 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -159,7 +159,7 @@ final class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels * - * @since 1.3.0 + * @since 2.0.0 */ def mapOption[T](f: function.Function[Out, Optional[T]]): SubSource[T, Mat] = new SubSource(delegate.map(f(_)).collect { diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 6581d8759b5..9d46820b863 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1011,7 +1011,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels * * @param errorConsumer function invoked when an error occurs - * @since 1.3.0 + * @since 2.0.0 */ def onErrorContinue[T <: Throwable](errorConsumer: Throwable => Unit)(implicit tag: ClassTag[T]): Repr[Out] = { this.withAttributes(ActorAttributes.supervisionStrategy { diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala index 88fa030657b..12d8f027e63 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala @@ -24,7 +24,7 @@ import org.apache.pekko.annotation.DoNotInherit * * The collector is only valid while the current [[Gatherer]] callback is running. * - * @since 1.3.0 + * @since 2.0.0 */ @DoNotInherit trait GatherCollector[-Out] { @@ -37,7 +37,7 @@ trait GatherCollector[-Out] { * A new gatherer instance is created for each materialization and on each supervision restart. * It can keep mutable state in fields or closures. * - * @since 1.3.0 + * @since 2.0.0 */ @FunctionalInterface trait Gatherer[-In, +Out] {