diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala
index 9c1dde4c6e8..45d546f402d 100644
--- a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala
+++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala
@@ -76,6 +76,103 @@ class ZipWithIndexBenchmark {
}
.toMat(Sink.ignore)(Keep.right)
+ private val statefulMapZipWithIndex = Source
+ .repeat(1)
+ .take(OperationsPerInvocation)
+ .statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None)
+ .toMat(Sink.ignore)(Keep.right)
+
+ private val gatherPublicZipWithIndex = Source
+ .repeat(1)
+ .take(OperationsPerInvocation)
+ .gather(() =>
+ new Gatherer[Int, (Int, Long)] {
+ private var index = 0L
+
+ override def apply(elem: Int, collector: GatherCollector[(Int, Long)]): Unit = {
+ val zipped = (elem, index)
+ index += 1
+ collector.push(zipped)
+ }
+ })
+ .toMat(Sink.ignore)(Keep.right)
+
+ private val gatherInternalOneToOneZipWithIndex = Source
+ .repeat(1)
+ .take(OperationsPerInvocation)
+ .gather(() =>
+ new OneToOneGatherer[Int, (Int, Long)] {
+ private var index = 0L
+
+ override def applyOne(elem: Int): (Int, Long) = {
+ val zipped = (elem, index)
+ index += 1
+ zipped
+ }
+ })
+ .toMat(Sink.ignore)(Keep.right)
+
+ private val statefulMapIncrement = Source
+ .repeat(1)
+ .take(OperationsPerInvocation)
+ .statefulMap(() => ())((state, elem) => (state, elem + 1), _ => None)
+ .toMat(Sink.ignore)(Keep.right)
+
+ private val gatherPublicIncrement = Source
+ .repeat(1)
+ .take(OperationsPerInvocation)
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ collector.push(elem + 1)
+ })
+ .toMat(Sink.ignore)(Keep.right)
+
+ private val gatherInternalOneToOneIncrement = Source
+ .repeat(1)
+ .take(OperationsPerInvocation)
+ .gather(() =>
+ new OneToOneGatherer[Int, Int] {
+ override def applyOne(elem: Int): Int = elem + 1
+ })
+ .toMat(Sink.ignore)(Keep.right)
+
+ private val statefulMapCountedIncrement = Source
+ .repeat(1)
+ .take(OperationsPerInvocation)
+ .statefulMap(() => 0L)((index, elem) => (index + 1, elem + index.toInt), _ => None)
+ .toMat(Sink.ignore)(Keep.right)
+
+ private val gatherPublicCountedIncrement = Source
+ .repeat(1)
+ .take(OperationsPerInvocation)
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ private var index = 0L
+
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit = {
+ val incremented = elem + index.toInt
+ index += 1
+ collector.push(incremented)
+ }
+ })
+ .toMat(Sink.ignore)(Keep.right)
+
+ private val gatherInternalOneToOneCountedIncrement = Source
+ .repeat(1)
+ .take(OperationsPerInvocation)
+ .gather(() =>
+ new OneToOneGatherer[Int, Int] {
+ private var index = 0L
+
+ override def applyOne(elem: Int): Int = {
+ val incremented = elem + index.toInt
+ index += 1
+ incremented
+ }
+ })
+ .toMat(Sink.ignore)(Keep.right)
+
@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchOldZipWithIndex(): Unit =
@@ -86,4 +183,49 @@ class ZipWithIndexBenchmark {
def benchNewZipWithIndex(): Unit =
Await.result(newZipWithIndex.run(), Duration.Inf)
+ @Benchmark
+ @OperationsPerInvocation(OperationsPerInvocation)
+ def benchStatefulMapZipWithIndex(): Unit =
+ Await.result(statefulMapZipWithIndex.run(), Duration.Inf)
+
+ @Benchmark
+ @OperationsPerInvocation(OperationsPerInvocation)
+ def benchGatherPublicZipWithIndex(): Unit =
+ Await.result(gatherPublicZipWithIndex.run(), Duration.Inf)
+
+ @Benchmark
+ @OperationsPerInvocation(OperationsPerInvocation)
+ def benchGatherInternalOneToOneZipWithIndex(): Unit =
+ Await.result(gatherInternalOneToOneZipWithIndex.run(), Duration.Inf)
+
+ @Benchmark
+ @OperationsPerInvocation(OperationsPerInvocation)
+ def benchStatefulMapIncrement(): Unit =
+ Await.result(statefulMapIncrement.run(), Duration.Inf)
+
+ @Benchmark
+ @OperationsPerInvocation(OperationsPerInvocation)
+ def benchGatherPublicIncrement(): Unit =
+ Await.result(gatherPublicIncrement.run(), Duration.Inf)
+
+ @Benchmark
+ @OperationsPerInvocation(OperationsPerInvocation)
+ def benchGatherInternalOneToOneIncrement(): Unit =
+ Await.result(gatherInternalOneToOneIncrement.run(), Duration.Inf)
+
+ @Benchmark
+ @OperationsPerInvocation(OperationsPerInvocation)
+ def benchStatefulMapCountedIncrement(): Unit =
+ Await.result(statefulMapCountedIncrement.run(), Duration.Inf)
+
+ @Benchmark
+ @OperationsPerInvocation(OperationsPerInvocation)
+ def benchGatherPublicCountedIncrement(): Unit =
+ Await.result(gatherPublicCountedIncrement.run(), Duration.Inf)
+
+ @Benchmark
+ @OperationsPerInvocation(OperationsPerInvocation)
+ def benchGatherInternalOneToOneCountedIncrement(): Unit =
+ Await.result(gatherInternalOneToOneCountedIncrement.run(), Duration.Inf)
+
}
diff --git a/docs/gather_zero_allocation_evaluation.md b/docs/gather_zero_allocation_evaluation.md
new file mode 100644
index 00000000000..f36109323d5
--- /dev/null
+++ b/docs/gather_zero_allocation_evaluation.md
@@ -0,0 +1,38 @@
+# Pekko Gather/Zero-Allocation Operator 评估报告
+
+## 背景与目标
+- 目标:分析现有 statefulMap/statefulMapConcat 的分配情况,评估是否有必要引入类似 JDK 24 Gatherer 的 zero-allocation 操作符,并明确不能破坏现有 API。
+- 范围:仅分析 Pekko Stream 当前实现与 JDK 24 Gatherer、SmallRye Mutiny 等对比。
+
+## 现状分析
+### statefulMap
+- API:`(S, In) => (S, Out)`
+- 每个元素分配一个 Tuple2((S, Out)),为 API 设计所致。
+- 不能避免 per-element 分配。
+
+### statefulMapConcat
+- API:`(S, In) => (S, Iterable[Out])`
+- 每个元素分配 Iterable 和 Iterator。
+- 也是 API 设计决定,无法避免。
+
+## JDK 24 Gatherer 对比
+- JDK 24 Gatherer 通过 mutable state + 直接下游 push,理论上可实现零分配。
+- 参考实现仍有部分分配(如 lambda/闭包等)。
+- SmallRye Mutiny 也有类似设计,但仍有微小分配。
+
+## 结论与建议
+- 若要实现 zero-allocation,需设计全新 API,不能破坏 statefulMap 现有语义。
+- 建议:如需极致性能,可新增 opt-in gather-like 操作符,保留现有 API。
+- 现有 statefulMap/statefulMapConcat 的分配为 API 设计本质,非实现缺陷。
+
+## 参考文件
+- stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+- JDK 24 Gatherer 官方文档
+- SmallRye Mutiny 源码
+
+## 评估人
+- 由 Pekko 迁移小组(gpt-4.1)完成
+- 评估时间:2026-03-28
+
+---
+如需详细设计/原型实现,请补充需求。
\ No newline at end of file
diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md
new file mode 100644
index 00000000000..3c2c5ef7f60
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md
@@ -0,0 +1,69 @@
+# gather
+
+Transform each input element into zero or more output elements with a stateful gatherer.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Flow.gather](Flow)
+
+## Description
+
+`gather` creates a new gatherer for each materialization. The gatherer can keep mutable state and emit zero or more
+elements for each incoming element by calling `push` on the provided collector.
+
+Unlike `statefulMap` and `statefulMapConcat`, the operator API itself does not require returning tuples or collections
+for each input element. This makes `gather` a good fit for allocation-sensitive stateful transformations.
+
+Patterns such as `zipWithIndex`, `bufferUntilChanged`, and `distinctUntilChanged` can be expressed by keeping mutable
+state inside the gatherer and pushing outputs directly, instead of returning a new state/output wrapper for every
+element.
+
+Compared with `statefulMap`, `gather` covers the same common stateful streaming patterns used in this PR's test suite:
+happy-path stateful mapping, delayed completion output, restart/stop supervision behavior, and backpressure-sensitive
+one-output transformations. The main difference is that `statefulMap` exposes state as an explicit return value,
+including `null` state transitions, while `gather` keeps state inside the gatherer instance itself. Because of that,
+`statefulMap` tests about `null` state do not translate one-to-one; the equivalent `gather` coverage focuses on the
+observable stream behavior instead.
+
+When the stage terminates or restarts, the gatherer's `onComplete` callback is invoked. Elements pushed from
+`onComplete` are emitted before upstream-failure propagation, normal completion, or supervision restart, and are
+ignored on downstream cancellation or abrupt termination.
+
+The `gather` operator adheres to the ActorAttributes.SupervisionStrategy attribute.
+
+For one-to-one stateful mapping see @ref:[statefulMap](statefulMap.md). For iterable-based fan-out see
+@ref:[statefulMapConcat](statefulMapConcat.md).
+
+## Examples
+
+In the first example, we implement a `zipWithIndex`-like transformation.
+
+Scala
+: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #zipWithIndex }
+
+Java
+: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #zipWithIndex }
+
+In the second example, we group incoming elements in batches of three and emit the trailing batch from `onComplete`.
+
+Scala
+: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #grouped }
+
+Java
+: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #grouped }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** the gatherer emits an element and downstream is ready to consume it
+
+**backpressures** downstream backpressures
+
+**completes** upstream completes and all gathered elements have been emitted
+
+**cancels** downstream cancels
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md
index f39c251bb8f..268ba92bfc7 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -165,6 +165,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|@ref[foldWhile](Source-or-Flow/foldWhile.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream.|
|Source/Flow|@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
|Flow|@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.|
+|Source/Flow|@ref[gather](Source-or-Flow/gather.md)|Transform each input element into zero or more output elements with a stateful gatherer.|
|Source/Flow|@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.|
|Source/Flow|@ref[groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)|Partitions this stream into chunks by a delimiter function.|
|Source/Flow|@ref[groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)|Partitions this stream into chunks by a delimiter function and a weight limit.|
@@ -498,6 +499,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [futureFlow](Flow/futureFlow.md)
* [futureSink](Sink/futureSink.md)
* [futureSource](Source/futureSource.md)
+* [gather](Source-or-Flow/gather.md)
* [groupBy](Source-or-Flow/groupBy.md)
* [grouped](Source-or-Flow/grouped.md)
* [groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)
diff --git a/docs/src/test/java/jdocs/stream/operators/flow/Gather.java b/docs/src/test/java/jdocs/stream/operators/flow/Gather.java
new file mode 100644
index 00000000000..bf3f3323dcc
--- /dev/null
+++ b/docs/src/test/java/jdocs/stream/operators/flow/Gather.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc.
+ */
+
+package jdocs.stream.operators.flow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.stream.javadsl.GatherCollector;
+import org.apache.pekko.stream.javadsl.Gatherer;
+import org.apache.pekko.stream.javadsl.Source;
+
+public class Gather {
+ static final ActorSystem system = null;
+
+ public void zipWithIndex() {
+ // #zipWithIndex
+ Source.from(Arrays.asList("A", "B", "C", "D"))
+ .gather(
+ () ->
+ new Gatherer() {
+ private long index = 0L;
+
+ @Override
+ public void apply(String elem, GatherCollector collector) {
+ collector.push("(" + elem + "," + index + ")");
+ index += 1;
+ }
+ })
+ .runForeach(System.out::println, system);
+ // prints
+ // (A,0)
+ // (B,1)
+ // (C,2)
+ // (D,3)
+ // #zipWithIndex
+ }
+
+ public void grouped() {
+ // #grouped
+ Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
+ .gather(
+ () ->
+ new Gatherer() {
+ private final ArrayList buffer = new ArrayList<>(3);
+
+ @Override
+ public void apply(Integer elem, GatherCollector collector) {
+ buffer.add(elem);
+ if (buffer.size() == 3) {
+ collector.push(buffer.toString());
+ buffer.clear();
+ }
+ }
+
+ @Override
+ public void onComplete(GatherCollector collector) {
+ if (!buffer.isEmpty()) {
+ collector.push(buffer.toString());
+ }
+ }
+ })
+ .runForeach(System.out::println, system);
+ // prints
+ // [1, 2, 3]
+ // [4, 5, 6]
+ // [7, 8, 9]
+ // [10]
+ // #grouped
+ }
+}
diff --git a/docs/src/test/scala/docs/stream/operators/flow/Gather.scala b/docs/src/test/scala/docs/stream/operators/flow/Gather.scala
new file mode 100644
index 00000000000..41d05fb89d6
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/flow/Gather.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc.
+ */
+
+package docs.stream.operators.flow
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.stream.scaladsl.{ GatherCollector, Gatherer, Source }
+
+object Gather {
+
+ implicit val actorSystem: ActorSystem = ???
+
+ def zipWithIndex(): Unit = {
+ // #zipWithIndex
+ Source(List("A", "B", "C", "D"))
+ .gather(() => {
+ var index = 0L
+ (elem: String, collector: GatherCollector[(String, Long)]) => {
+ collector.push((elem, index))
+ index += 1
+ }
+ })
+ .runForeach(println)
+ // prints
+ // (A,0)
+ // (B,1)
+ // (C,2)
+ // (D,3)
+ // #zipWithIndex
+ }
+
+ def grouped(): Unit = {
+ // #grouped
+ Source(1 to 10)
+ .gather(() =>
+ new Gatherer[Int, List[Int]] {
+ private var buffer = List.empty[Int]
+
+ override def apply(elem: Int, collector: GatherCollector[List[Int]]): Unit = {
+ buffer = elem :: buffer
+ if (buffer.size == 3) {
+ collector.push(buffer.reverse)
+ buffer = Nil
+ }
+ }
+
+ override def onComplete(collector: GatherCollector[List[Int]]): Unit =
+ if (buffer.nonEmpty)
+ collector.push(buffer.reverse)
+ })
+ .runForeach(println)
+ // prints
+ // List(1, 2, 3)
+ // List(4, 5, 6)
+ // List(7, 8, 9)
+ // List(10)
+ // #grouped
+ }
+}
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index f3058445066..90be09062a0 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -328,6 +328,64 @@ public void mustBeAbleToUseMapWithAutoCloseableResource() throws Exception {
Assertions.assertEquals(1, closed.get());
}
+ @Test
+ public void mustBeAbleToUseGather() throws Exception {
+ final java.lang.Iterable input = Arrays.asList(1, 2, 3, 4, 5);
+ final CompletionStage grouped =
+ Source.from(input)
+ .via(
+ Flow.of(Integer.class)
+ .gather(
+ () ->
+ new Gatherer() {
+ private final ArrayList buffer = new ArrayList<>(2);
+
+ @Override
+ public void apply(Integer elem, GatherCollector collector) {
+ buffer.add(elem);
+ if (buffer.size() == 2) {
+ collector.push(buffer.toString());
+ buffer.clear();
+ }
+ }
+
+ @Override
+ public void onComplete(GatherCollector collector) {
+ if (!buffer.isEmpty()) {
+ collector.push(buffer.toString());
+ }
+ }
+ }))
+ .runFold("", (acc, elem) -> acc + elem, system);
+
+ Assertions.assertEquals(
+ "[1, 2][3, 4][5]", grouped.toCompletableFuture().get(3, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void mustBeAbleToUseGatherAsDistinctUntilChanged() throws Exception {
+ final CompletionStage result =
+ Source.from(Arrays.asList("A", "B", "B", "C", "C", "D"))
+ .via(
+ Flow.of(String.class)
+ .gather(
+ () ->
+ new Gatherer() {
+ private String lastSeen = null;
+
+ @Override
+ public void apply(String elem, GatherCollector collector) {
+ if (!elem.equals(lastSeen)) {
+ collector.push(elem);
+ lastSeen = elem;
+ }
+ }
+ }))
+ .runFold("", (acc, elem) -> acc + elem, system);
+
+ Assertions.assertEquals("ABCD", result.toCompletableFuture().get(3, TimeUnit.SECONDS));
+ }
+
@Test
public void mustBeAbleToUseFoldWhile() throws Exception {
final int result =
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 3aec7545b09..4b6d5cc986d 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -936,6 +936,46 @@ public void mustBeAbleToUseMapWithResource() {
Assertions.assertFalse(gate.get());
}
+ @Test
+ public void mustBeAbleToUseGather() throws Exception {
+ final CompletionStage result =
+ Source.from(Arrays.asList(1, 2, 3, 4, 5))
+ .gather(
+ () ->
+ new Gatherer() {
+ private int sum = 0;
+
+ @Override
+ public void apply(Integer elem, GatherCollector collector) {
+ sum += elem;
+ collector.push(sum);
+ }
+ })
+ .runFold("", (acc, elem) -> acc + elem, system);
+
+ Assertions.assertEquals("1361015", result.toCompletableFuture().get(3, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void mustBeAbleToUseGatherAsZipWithIndex() throws Exception {
+ final CompletionStage result =
+ Source.from(Arrays.asList("A", "B", "C", "D"))
+ .gather(
+ () ->
+ new Gatherer() {
+ private long index = 0L;
+
+ @Override
+ public void apply(String elem, GatherCollector collector) {
+ collector.push(elem + ":" + index);
+ index += 1;
+ }
+ })
+ .runFold("", (acc, elem) -> acc + elem, system);
+
+ Assertions.assertEquals("A:0B:1C:2D:3", result.toCompletableFuture().get(3, TimeUnit.SECONDS));
+ }
+
@Test
public void mustBeAbleToUseMapWithAutoCloseableResource() throws Exception {
final TestKit probe = new TestKit(system);
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
new file mode 100644
index 00000000000..23e552436ab
--- /dev/null
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.annotation.nowarn
+import scala.concurrent.{ Await, Promise }
+import scala.concurrent.duration.DurationInt
+import scala.util.Success
+import scala.util.control.NoStackTrace
+
+import org.apache.pekko.Done
+import org.apache.pekko.stream.{ AbruptStageTerminationException, ActorAttributes, ActorMaterializer, ClosedShape, Supervision }
+import org.apache.pekko.stream.testkit.{ StreamSpec, TestSubscriber }
+import org.apache.pekko.stream.testkit.Utils.TE
+import org.apache.pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
+import org.apache.pekko.testkit.EventFilter
+
+class FlowGatherSpec extends StreamSpec {
+
+ private val ex = new Exception("TEST") with NoStackTrace
+
+ object BeenCalledTimesGate {
+ def apply(): BeenCalledTimesGate = new BeenCalledTimesGate(1)
+ def apply(nTimes: Int): BeenCalledTimesGate = new BeenCalledTimesGate(nTimes)
+ }
+
+ class BeenCalledTimesGate(nTimes: Int) {
+ private val beenCalled = new AtomicInteger(0)
+
+ def mark(): Unit = beenCalled.updateAndGet { current =>
+ if (current == nTimes)
+ throw new IllegalStateException(s"Has been called:[$nTimes] times, should not be called anymore.")
+ else current + 1
+ }
+
+ def ensure(): Unit =
+ if (beenCalled.get() != nTimes)
+ throw new IllegalStateException(s"Expected to be called:[$nTimes], but only be called:[$beenCalled]")
+ }
+
+ "A Gather" must {
+ "work in the happy case" in {
+ val gate = BeenCalledTimesGate()
+ Source(List(1, 2, 3, 4, 5))
+ .gather(() =>
+ new Gatherer[Int, (Int, Int)] {
+ private var agg = 0
+
+ override def apply(elem: Int, collector: GatherCollector[(Int, Int)]): Unit = {
+ collector.push((agg, elem))
+ agg += elem
+ }
+
+ override def onComplete(collector: GatherCollector[(Int, Int)]): Unit =
+ gate.mark()
+ })
+ .runWith(TestSink[(Int, Int)]())
+ .request(6)
+ .expectNext((0, 1))
+ .expectNext((1, 2))
+ .expectNext((3, 3))
+ .expectNext((6, 4))
+ .expectNext((10, 5))
+ .expectComplete()
+ gate.ensure()
+ }
+
+ "remember state when complete" in {
+ val gate = BeenCalledTimesGate()
+ Source(1 to 10)
+ .gather(() =>
+ new Gatherer[Int, List[Int]] {
+ private var state = List.empty[Int]
+
+ override def apply(elem: Int, collector: GatherCollector[List[Int]]): Unit = {
+ val newState = elem :: state
+ if (newState.size == 3) {
+ state = Nil
+ collector.push(newState.reverse)
+ } else
+ state = newState
+ }
+
+ override def onComplete(collector: GatherCollector[List[Int]]): Unit = {
+ gate.mark()
+ collector.push(state.reverse)
+ }
+ })
+ .mapConcat(identity)
+ .runWith(TestSink[Int]())
+ .request(10)
+ .expectNextN(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
+ .expectComplete()
+ gate.ensure()
+ }
+
+ "emit zero or more elements and drain on completion" in {
+ Source(1 to 5)
+ .gather(() =>
+ new Gatherer[Int, List[Int]] {
+ private var buffer = List.empty[Int]
+
+ override def apply(elem: Int, collector: GatherCollector[List[Int]]): Unit = {
+ buffer = elem :: buffer
+ if (buffer.size == 2) {
+ collector.push(buffer.reverse)
+ buffer = Nil
+ }
+ }
+
+ override def onComplete(collector: GatherCollector[List[Int]]): Unit =
+ if (buffer.nonEmpty)
+ collector.push(buffer.reverse)
+ })
+ .runWith(TestSink[List[Int]]())
+ .request(3)
+ .expectNext(List(1, 2))
+ .expectNext(List(3, 4))
+ .expectNext(List(5))
+ .expectComplete()
+ }
+
+ "emit all outputs when a callback deopts from single to multi mode" in {
+ Source.single(1)
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit = {
+ collector.push(elem)
+ collector.push(elem + 1)
+ }
+ })
+ .runWith(TestSink[Int]())
+ .request(2)
+ .expectNext(1, 2)
+ .expectComplete()
+ }
+
+ "drop a single buffered output if apply throws after pushing it" in {
+ Source.single(1)
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit = {
+ collector.push(elem)
+ throw TE("boom")
+ }
+ })
+ .runWith(TestSink[Int]())
+ .request(1)
+ .expectError(TE("boom"))
+ }
+
+ "resume without leaking a single buffered output if apply throws after pushing it" in {
+ Source(List(1, 2))
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit = {
+ collector.push(elem)
+ if (elem == 1)
+ throw ex
+ }
+ })
+ .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+ .runWith(TestSink[Int]())
+ .request(1)
+ .expectNext(2)
+ .expectComplete()
+ }
+
+ "be usable as zipWithIndex" in {
+ val gate = BeenCalledTimesGate()
+ Source(List("A", "B", "C", "D"))
+ .gather(() =>
+ new Gatherer[String, (String, Long)] {
+ private var index = 0L
+
+ override def apply(elem: String, collector: GatherCollector[(String, Long)]): Unit = {
+ collector.push((elem, index))
+ index += 1
+ }
+
+ override def onComplete(collector: GatherCollector[(String, Long)]): Unit =
+ gate.mark()
+ })
+ .runWith(TestSink[(String, Long)]())
+ .request(4)
+ .expectNext(("A", 0L))
+ .expectNext(("B", 1L))
+ .expectNext(("C", 2L))
+ .expectNext(("D", 3L))
+ .expectComplete()
+ gate.ensure()
+ }
+
+ "respect backpressure for public single-output gatherers" in {
+ Source(List("A", "B", "C"))
+ .gather(() =>
+ new Gatherer[String, String] {
+ override def apply(elem: String, collector: GatherCollector[String]): Unit =
+ collector.push(elem)
+ })
+ .runWith(TestSink[String]())
+ .request(1)
+ .expectNext("A")
+ .expectNoMessage(200.millis)
+ .request(1)
+ .expectNext("B")
+ .request(1)
+ .expectNext("C")
+ .expectComplete()
+ }
+
+ "respect backpressure for one-to-one gatherers" in {
+ Source(List("A", "B", "C"))
+ .gather(() =>
+ new OneToOneGatherer[String, String] {
+ override def applyOne(elem: String): String = elem
+ })
+ .runWith(TestSink[String]())
+ .request(1)
+ .expectNext("A")
+ .expectNoMessage(200.millis)
+ .request(1)
+ .expectNext("B")
+ .request(1)
+ .expectNext("C")
+ .expectComplete()
+ }
+
+ "be usable as bufferUntilChanged" in {
+ val gate = BeenCalledTimesGate()
+ Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil)
+ .gather(() =>
+ new Gatherer[String, List[String]] {
+ private var buffer = List.empty[String]
+
+ override def apply(elem: String, collector: GatherCollector[List[String]]): Unit =
+ buffer match {
+ case head :: _ if head != elem =>
+ collector.push(buffer.reverse)
+ buffer = elem :: Nil
+ case _ =>
+ buffer = elem :: buffer
+ }
+
+ override def onComplete(collector: GatherCollector[List[String]]): Unit = {
+ gate.mark()
+ if (buffer.nonEmpty)
+ collector.push(buffer.reverse)
+ }
+ })
+ .runWith(TestSink[List[String]]())
+ .request(4)
+ .expectNext(List("A"))
+ .expectNext(List("B", "B"))
+ .expectNext(List("C", "C", "C"))
+ .expectNext(List("D"))
+ .expectComplete()
+ gate.ensure()
+ }
+
+ "be usable as distinctUntilChanged" in {
+ val gate = BeenCalledTimesGate()
+ Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil)
+ .gather(() =>
+ new Gatherer[String, String] {
+ private var lastElement: Option[String] = None
+
+ override def apply(elem: String, collector: GatherCollector[String]): Unit =
+ lastElement match {
+ case Some(last) if last == elem =>
+ lastElement = Some(elem)
+ case _ =>
+ lastElement = Some(elem)
+ collector.push(elem)
+ }
+
+ override def onComplete(collector: GatherCollector[String]): Unit =
+ gate.mark()
+ })
+ .runWith(TestSink[String]())
+ .request(4)
+ .expectNext("A")
+ .expectNext("B")
+ .expectNext("C")
+ .expectNext("D")
+ .expectComplete()
+ gate.ensure()
+ }
+
+ "resume when supervision says Resume" in {
+ Source(List(1, 2, 3, 4, 5))
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ private var sum = 0
+
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ if (elem % 2 == 0)
+ throw ex
+ else {
+ sum += elem
+ collector.push(sum)
+ }
+ })
+ .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+ .runWith(TestSink[Int]())
+ .request(3)
+ .expectNext(1, 4, 9)
+ .expectComplete()
+ }
+
+ "emit onComplete elements before restarting" in {
+ val generation = new AtomicInteger(0)
+ val (source, sink) = TestSource[String]()
+ .viaMat(Flow[String].gather(() => {
+ val currentGeneration = generation.incrementAndGet()
+ new Gatherer[String, String] {
+ override def apply(elem: String, collector: GatherCollector[String]): Unit =
+ if (elem == "boom") throw TE("boom")
+ else collector.push(s"$elem$currentGeneration")
+
+ override def onComplete(collector: GatherCollector[String]): Unit =
+ collector.push(s"onClose$currentGeneration")
+ }
+ }))(Keep.left)
+ .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
+ .toMat(TestSink())(Keep.both)
+ .run()
+
+ sink.request(1)
+ source.sendNext("one")
+ sink.expectNext("one1")
+ sink.request(1)
+ source.sendNext("boom")
+ sink.expectNext("onClose1")
+ sink.request(1)
+ source.sendNext("two")
+ sink.expectNext("two2")
+ sink.cancel()
+ source.expectCancellation()
+ }
+
+ "restart and recreate gatherer state when supervision says Restart" in {
+ val generation = new AtomicInteger(0)
+ Source(List(1, 2, 3, 4, 5))
+ .gather(() => {
+ generation.incrementAndGet()
+ new Gatherer[Int, (Int, Int)] {
+ private var agg = 0
+
+ override def apply(elem: Int, collector: GatherCollector[(Int, Int)]): Unit =
+ if (elem % 3 == 0)
+ throw ex
+ else {
+ collector.push((agg, elem))
+ agg += elem
+ }
+ }
+ })
+ .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
+ .runWith(TestSink[(Int, Int)]())
+ .request(5)
+ .expectNext((0, 1))
+ .expectNext((1, 2))
+ .expectNext((0, 4))
+ .expectNext((4, 5))
+ .expectComplete()
+ generation.get() shouldBe 2
+ }
+
+ "stop when supervision says Stop" in {
+ val gate = BeenCalledTimesGate()
+ Source(List(1, 2, 3, 4, 5))
+ .gather(() =>
+ new Gatherer[Int, (Int, Int)] {
+ private var agg = 0
+
+ override def apply(elem: Int, collector: GatherCollector[(Int, Int)]): Unit =
+ if (elem % 3 == 0)
+ throw ex
+ else {
+ collector.push((agg, elem))
+ agg += elem
+ }
+
+ override def onComplete(collector: GatherCollector[(Int, Int)]): Unit =
+ gate.mark()
+ })
+ .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
+ .runWith(TestSink[(Int, Int)]())
+ .request(5)
+ .expectNext((0, 1))
+ .expectNext((1, 2))
+ .expectError(ex)
+ gate.ensure()
+ }
+
+ "fail on upstream failure when onComplete emits nothing" in {
+ val gate = BeenCalledTimesGate()
+ val (source, sink) = TestSource[Int]()
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ collector.push(elem)
+
+ override def onComplete(collector: GatherCollector[Int]): Unit =
+ gate.mark()
+ })
+ .toMat(TestSink[Int]())(Keep.both)
+ .run()
+
+ sink.request(3)
+ source.sendNext(1)
+ sink.expectNext(1)
+ source.sendNext(2)
+ sink.expectNext(2)
+ source.sendError(ex)
+ sink.expectError(ex)
+ gate.ensure()
+ }
+
+ "defer upstream failure until onComplete elements are emitted" in {
+ val (source, sink) = TestSource[Int]()
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ private var sum = 0
+
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit = {
+ sum += elem
+ collector.push(sum)
+ }
+
+ override def onComplete(collector: GatherCollector[Int]): Unit =
+ collector.push(-1)
+ })
+ .toMat(TestSink[Int]())(Keep.both)
+ .run()
+
+ sink.request(3)
+ source.sendNext(1)
+ sink.expectNext(1)
+ source.sendNext(2)
+ sink.expectNext(3)
+ source.sendError(ex)
+ sink.expectNext(-1)
+ sink.expectError(ex)
+ }
+
+ "emit buffered elements before failing when supervision stops the stage" in {
+ Source(List(1, 2, 3))
+ .gather(() =>
+ new Gatherer[Int, List[Int]] {
+ private var buffer = List.empty[Int]
+
+ override def apply(elem: Int, collector: GatherCollector[List[Int]]): Unit =
+ if (elem == 3)
+ throw ex
+ else
+ buffer = elem :: buffer
+
+ override def onComplete(collector: GatherCollector[List[Int]]): Unit =
+ if (buffer.nonEmpty)
+ collector.push(buffer.reverse)
+ })
+ .runWith(TestSink[List[Int]]())
+ .request(2)
+ .expectNext(List(1, 2))
+ .expectError(ex)
+ }
+
+ "call onComplete when supervision stops the stage" in {
+ val gate = BeenCalledTimesGate()
+ val promise = Promise[Done]()
+ val done = Source
+ .single(1)
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ throw ex
+
+ override def onComplete(collector: GatherCollector[Int]): Unit = {
+ gate.mark()
+ promise.complete(Success(Done))
+ }
+ })
+ .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
+ .runWith(Sink.ignore)
+
+ done.failed.futureValue shouldBe ex
+ Await.result(promise.future, 3.seconds) shouldBe Done
+ gate.ensure()
+ }
+
+ "cancel upstream when downstream cancels" in {
+ val gate = BeenCalledTimesGate()
+ val promise = Promise[Done]()
+ val source = TestSource[Int]()
+ .via(Flow[Int].gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ collector.push(elem)
+
+ override def onComplete(collector: GatherCollector[Int]): Unit = {
+ gate.mark()
+ promise.complete(Success(Done))
+ }
+ }))
+ .toMat(Sink.cancelled)(Keep.left)
+ .run()
+
+ source.expectCancellation()
+ Await.result(promise.future, 3.seconds) shouldBe Done
+ gate.ensure()
+ }
+
+ "cancel upstream when downstream fails" in {
+ val gate = BeenCalledTimesGate()
+ val promise = Promise[Done]()
+ val testProbe = TestSubscriber.probe[Int]()
+ val source = TestSource[Int]()
+ .via(Flow[Int].gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ collector.push(elem)
+
+ override def onComplete(collector: GatherCollector[Int]): Unit = {
+ gate.mark()
+ promise.complete(Success(Done))
+ }
+ }))
+ .toMat(Sink.fromSubscriber(testProbe))(Keep.left)
+ .run()
+
+ testProbe.cancel(ex)
+ source.expectCancellationWithCause(ex)
+ Await.result(promise.future, 3.seconds) shouldBe Done
+ gate.ensure()
+ }
+
+ "invoke onComplete exactly once when downstream cancels while draining final elements" in {
+ val closedCounter = new AtomicInteger(0)
+ val (source, sink) = TestSource[Int]()
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ collector.push(elem)
+
+ override def onComplete(collector: GatherCollector[Int]): Unit = {
+ closedCounter.incrementAndGet()
+ collector.push(100)
+ collector.push(200)
+ }
+ })
+ .toMat(TestSink[Int]())(Keep.both)
+ .run()
+
+ sink.request(2)
+ source.sendNext(1)
+ sink.expectNext(1)
+ source.sendComplete()
+ sink.expectNext(100)
+ sink.cancel()
+ closedCounter.get() shouldBe 1
+ }
+
+ "not restart gatherer when downstream cancels while draining restart elements" in {
+ val generation = new AtomicInteger(0)
+ val (source, sink) = TestSource[Int]()
+ .gather(() => {
+ val currentGeneration = generation.incrementAndGet()
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ throw TE(s"boom-$currentGeneration")
+
+ override def onComplete(collector: GatherCollector[Int]): Unit = {
+ collector.push(100 + currentGeneration)
+ collector.push(200 + currentGeneration)
+ }
+ }
+ })
+ .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
+ .toMat(TestSink[Int]())(Keep.both)
+ .run()
+
+ sink.request(1)
+ source.sendNext(1)
+ sink.expectNext(101)
+ sink.cancel()
+ generation.get() shouldBe 1
+ }
+
+ "fail when emitting null" in {
+ Source.single(1)
+ .gather(() =>
+ new Gatherer[Int, String] {
+ override def apply(elem: Int, collector: GatherCollector[String]): Unit =
+ collector.push(null.asInstanceOf[String])
+ })
+ .runWith(TestSink[String]())
+ .request(1)
+ .expectError() shouldBe a[NullPointerException]
+ }
+
+ "call onComplete on abrupt materializer termination" in {
+ val gate = BeenCalledTimesGate()
+ val promise = Promise[Done]()
+ @nowarn("msg=deprecated")
+ val mat = ActorMaterializer()
+
+ val matVal = Source
+ .single(1)
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ collector.push(elem)
+
+ override def onComplete(collector: GatherCollector[Int]): Unit = {
+ gate.mark()
+ promise.complete(Success(Done))
+ }
+ })
+ .runWith(Sink.never)(mat)
+
+ mat.shutdown()
+ matVal.failed.futureValue shouldBe a[AbruptStageTerminationException]
+ Await.result(promise.future, 3.seconds) shouldBe Done
+ gate.ensure()
+ }
+
+ "will not call onComplete twice if apply fails" in {
+ val closedCounter = new AtomicInteger(0)
+ val probe = Source
+ .repeat(1)
+ .gather(() =>
+ new Gatherer[Int, String] {
+ override def apply(elem: Int, collector: GatherCollector[String]): Unit =
+ throw TE("failing read")
+
+ override def onComplete(collector: GatherCollector[String]): Unit =
+ closedCounter.incrementAndGet()
+ })
+ .runWith(TestSink[String]())
+
+ probe.request(1)
+ probe.expectError(TE("failing read"))
+ closedCounter.get() shouldBe 1
+ }
+
+ "will not call onComplete twice if both apply and onComplete fail" in {
+ val closedCounter = new AtomicInteger(0)
+ val probe = Source
+ .repeat(1)
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ throw TE("failing read")
+
+ override def onComplete(collector: GatherCollector[Int]): Unit =
+ if (closedCounter.incrementAndGet() == 1)
+ throw TE("boom")
+ })
+ .runWith(TestSink[Int]())
+
+ EventFilter[TE](occurrences = 1).intercept {
+ probe.request(1)
+ probe.expectError(TE("boom"))
+ }
+ closedCounter.get() shouldBe 1
+ }
+
+ "will not call onComplete twice on cancel when onComplete fails" in {
+ val closedCounter = new AtomicInteger(0)
+ val (source, sink) = TestSource[Int]()
+ .viaMat(Flow[Int].gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ collector.push(elem)
+
+ override def onComplete(collector: GatherCollector[Int]): Unit = {
+ closedCounter.incrementAndGet()
+ throw TE("boom")
+ }
+ }))(Keep.left)
+ .toMat(TestSink[Int]())(Keep.both)
+ .run()
+
+ EventFilter[TE](occurrences = 1).intercept {
+ sink.request(1)
+ source.sendNext(1)
+ sink.expectNext(1)
+ sink.cancel()
+ source.expectCancellation()
+ }
+ closedCounter.get() shouldBe 1
+ }
+
+ "will not call onComplete twice if onComplete fails on upstream complete" in {
+ val closedCounter = new AtomicInteger(0)
+ val (source, sink) = TestSource[Int]()
+ .gather(() =>
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
+ collector.push(elem)
+
+ override def onComplete(collector: GatherCollector[Int]): Unit = {
+ closedCounter.incrementAndGet()
+ throw TE("boom")
+ }
+ })
+ .toMat(TestSink[Int]())(Keep.both)
+ .run()
+
+ EventFilter[TE](occurrences = 1).intercept {
+ sink.request(1)
+ source.sendNext(1)
+ sink.expectNext(1)
+ sink.request(1)
+ source.sendComplete()
+ sink.expectError(TE("boom"))
+ }
+ closedCounter.get() shouldBe 1
+ }
+ }
+
+ "support junction output ports" in {
+ val source = Source(List((1, 1), (2, 2)))
+ val graph = RunnableGraph.fromGraph(GraphDSL.createGraph(TestSink[(Int, Int)]()) { implicit b => sink =>
+ import GraphDSL.Implicits._
+ val unzip = b.add(Unzip[Int, Int]())
+ val zip = b.add(Zip[Int, Int]())
+ val gather = b.add(Flow[(Int, Int)].gather(() => (elem: (Int, Int), collector: GatherCollector[(Int, Int)]) => collector.push(elem)))
+
+ source ~> unzip.in
+ unzip.out0 ~> zip.in0
+ unzip.out1 ~> zip.in1
+ zip.out ~> gather ~> sink.in
+
+ ClosedShape
+ })
+
+ graph
+ .run()
+ .request(2)
+ .expectNext((1, 1))
+ .expectNext((2, 2))
+ .expectComplete()
+ }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 6de53600bd7..09459906c02 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -74,6 +74,7 @@ import pekko.stream.Attributes._
val batchWeighted = name("batchWeighted")
val expand = name("expand")
val statefulMap = name("statefulMap")
+ val gather = name("gather")
val statefulMapConcat = name("statefulMapConcat")
val mapConcat = name("mapConcat")
val detacher = name("detacher")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index d214479d4ec..a9b39552003 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -48,6 +48,9 @@ import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SimpleLinearGraphStage, SingleSource }
import pekko.stream.scaladsl.{
DelayStrategy,
+ GatherCollector,
+ Gatherer,
+ OneToOneGatherer,
Source,
StatefulMapConcatAccumulator,
StatefulMapConcatAccumulatorFactory
@@ -2328,6 +2331,293 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) =
override def toString = "StatefulMap"
}
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class Gather[In, Out](factory: () => Gatherer[In, Out]) extends GraphStage[FlowShape[In, Out]] {
+ require(factory != null, "gatherer factory should not be null")
+
+ private val in = Inlet[In]("Gather.in")
+ private val out = Outlet[Out]("Gather.out")
+ override val shape: FlowShape[In, Out] = FlowShape(in, out)
+
+ override def initialAttributes: Attributes = DefaultAttributes.gather and SourceLocation.forLambda(factory)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ private object FinalAction {
+ final val None = 0
+ final val Complete = 1
+ final val Restart = 2
+ final val Fail = 3
+ }
+
+ private lazy val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+ private val contextPropagation = ContextPropagation()
+ private val noopCollector = new GatherCollector[Out] {
+ override def push(elem: Out): Unit = ()
+ }
+ private val singleCollector = new GatherCollector[Out] {
+ override def push(elem: Out): Unit = pushSingleCallbackOutput(elem)
+ }
+ private val pendingCollector = new GatherCollector[Out] {
+ override def push(elem: Out): Unit = enqueuePendingOutput(elem)
+ }
+ private var callbackFirst: Out = _
+ private var hasCallbackFirst = false
+ private var pendingFirst: Out = _
+ private var pendingOverflow: java.util.ArrayDeque[Out] = _
+ private var hasPendingFirst = false
+ private var multiMode = false
+ private var gatherer: Gatherer[In, Out] = _
+ private var oneToOneGatherer: OneToOneGatherer[In, Out] = _
+ private var finalAction = FinalAction.None
+ private var finalFailure: Throwable = null
+ private var needInvokeOnCompleteCallback = false
+ private var downstreamFinished = false
+
+ override def preStart(): Unit = {
+ restartGatherer()
+ pull(in)
+ }
+
+ override def onPush(): Unit =
+ try {
+ if (oneToOneGatherer ne null) onPushOneToOne()
+ else if (multiMode) onPushMulti()
+ else onPushSingle()
+ } catch {
+ case NonFatal(ex) =>
+ clearPending()
+ if (!downstreamFinished)
+ decider(ex) match {
+ case Supervision.Stop => invokeOnCompleteAndThen(FinalAction.Fail, ex)
+ case Supervision.Resume => maybePull()
+ case Supervision.Restart => invokeOnCompleteAndThen(FinalAction.Restart)
+ }
+ }
+
+ override def onPull(): Unit =
+ if (hasPendingFirst) {
+ if (multiMode)
+ pushPendingMulti(shouldResumeContext = needInvokeOnCompleteCallback)
+ else
+ pushPendingSingle(shouldResumeContext = needInvokeOnCompleteCallback)
+ } else maybePull()
+
+ override def onUpstreamFinish(): Unit =
+ if (hasPending) {
+ if (finalAction != FinalAction.Fail)
+ finalAction = FinalAction.Complete
+ } else invokeOnCompleteAndThen(FinalAction.Complete)
+
+ override def onUpstreamFailure(ex: Throwable): Unit =
+ if (hasPending) {
+ finalFailure = ex
+ finalAction = FinalAction.Fail
+ } else invokeOnCompleteAndThen(FinalAction.Fail, ex)
+
+ override def onDownstreamFinish(cause: Throwable): Unit = {
+ downstreamFinished = true
+ if (needInvokeOnCompleteCallback) {
+ needInvokeOnCompleteCallback = false
+ gatherer.onComplete(noopCollector)
+ }
+ super.onDownstreamFinish(cause)
+ }
+
+ override def postStop(): Unit = {
+ if (needInvokeOnCompleteCallback)
+ gatherer.onComplete(noopCollector)
+ }
+
+ private def enqueuePendingOutput(elem: Out): Unit = {
+ ReactiveStreamsCompliance.requireNonNullElement(elem)
+ if (hasPendingFirst) {
+ multiMode = true
+ if (pendingOverflow eq null)
+ pendingOverflow = new java.util.ArrayDeque[Out]()
+ pendingOverflow.addLast(elem)
+ } else {
+ pendingFirst = elem
+ hasPendingFirst = true
+ }
+ }
+
+ private def pushSingleCallbackOutput(elem: Out): Unit = {
+ ReactiveStreamsCompliance.requireNonNullElement(elem)
+ if (hasCallbackFirst) {
+ pendingFirst = callbackFirst
+ hasPendingFirst = true
+ callbackFirst = null.asInstanceOf[Out]
+ hasCallbackFirst = false
+ multiMode = true
+ if (pendingOverflow eq null)
+ pendingOverflow = new java.util.ArrayDeque[Out]()
+ pendingOverflow.addLast(elem)
+ } else {
+ callbackFirst = elem
+ hasCallbackFirst = true
+ }
+ }
+
+ private def hasPending: Boolean = hasPendingFirst
+
+ private def onPushOneToOne(): Unit = {
+ val elem = oneToOneGatherer.applyOne(grab(in))
+ ReactiveStreamsCompliance.requireNonNullElement(elem)
+ if (isAvailable(out))
+ push(out, elem)
+ else {
+ pendingFirst = elem
+ hasPendingFirst = true
+ contextPropagation.suspendContext()
+ }
+ }
+
+ private def onPushSingle(): Unit = {
+ gatherer(grab(in), singleCollector)
+ if (hasCallbackFirst)
+ pushCallbackSingle()
+ else if (hasPendingFirst && isAvailable(out)) {
+ if (multiMode)
+ pushPendingMulti(shouldResumeContext = false)
+ else
+ pushPendingSingle(shouldResumeContext = false)
+ } else if (hasPendingFirst)
+ contextPropagation.suspendContext()
+ else
+ maybePull()
+ }
+
+ private def onPushMulti(): Unit = {
+ gatherer(grab(in), pendingCollector)
+ if (hasPendingFirst && isAvailable(out))
+ pushPendingMulti(shouldResumeContext = false)
+ else if (hasPendingFirst)
+ contextPropagation.suspendContext()
+ else
+ maybePull()
+ }
+
+ private def maybePull(): Unit =
+ if (!isClosed(in) && !hasBeenPulled(in))
+ pull(in)
+
+ private def restartGatherer(): Unit = {
+ gatherer = factory()
+ oneToOneGatherer = gatherer match {
+ case specialized: OneToOneGatherer[In, Out] @unchecked => specialized
+ case _ => null
+ }
+ multiMode = false
+ needInvokeOnCompleteCallback = true
+ }
+
+ private def clearPending(): Unit = {
+ callbackFirst = null.asInstanceOf[Out]
+ hasCallbackFirst = false
+ pendingFirst = null.asInstanceOf[Out]
+ hasPendingFirst = false
+ if (pendingOverflow ne null)
+ pendingOverflow.clear()
+ }
+
+ private def pushCallbackSingle(): Unit = {
+ val elem = callbackFirst
+ callbackFirst = null.asInstanceOf[Out]
+ hasCallbackFirst = false
+
+ if (isAvailable(out))
+ push(out, elem)
+ else {
+ pendingFirst = elem
+ hasPendingFirst = true
+ contextPropagation.suspendContext()
+ }
+ }
+
+ private def pushPendingSingle(shouldResumeContext: Boolean): Unit = {
+ val hadContext = needInvokeOnCompleteCallback
+ if (shouldResumeContext && hadContext)
+ contextPropagation.resumeContext()
+
+ val elem = pendingFirst
+ pendingFirst = null.asInstanceOf[Out]
+ hasPendingFirst = false
+
+ push(out, elem)
+ maybeRunFinalAction(hadContext)
+ }
+
+ private def pushPendingMulti(shouldResumeContext: Boolean): Unit = {
+ val hadContext = needInvokeOnCompleteCallback
+ if (shouldResumeContext && hadContext)
+ contextPropagation.resumeContext()
+
+ push(out, pendingFirst)
+
+ if ((pendingOverflow ne null) && !pendingOverflow.isEmpty) {
+ pendingFirst = pendingOverflow.removeFirst()
+ if (hadContext)
+ contextPropagation.suspendContext()
+ } else {
+ pendingFirst = null.asInstanceOf[Out]
+ hasPendingFirst = false
+ maybeRunFinalAction(hadContext)
+ }
+ }
+
+ private def maybeRunFinalAction(hadContext: Boolean): Unit = {
+ if (downstreamFinished || isClosed(out)) {
+ finalAction = FinalAction.None
+ finalFailure = null
+ } else if (finalAction == FinalAction.None)
+ maybePull()
+ else {
+ val action = finalAction
+ val failure = finalFailure
+ finalAction = FinalAction.None
+ finalFailure = null
+ if (hadContext)
+ invokeOnCompleteAndThen(action, failure)
+ else
+ execute(action, failure)
+ }
+ }
+
+ private def invokeOnCompleteAndThen(action: Int, failure: Throwable = null): Unit = {
+ needInvokeOnCompleteCallback = false
+ gatherer.onComplete(pendingCollector)
+ if (hasPending) {
+ finalAction = action
+ finalFailure = failure
+ if (isAvailable(out))
+ if (multiMode)
+ pushPendingMulti(shouldResumeContext = false)
+ else
+ pushPendingSingle(shouldResumeContext = false)
+ } else
+ execute(action, failure)
+ }
+
+ private def execute(action: Int, failure: Throwable): Unit =
+ action match {
+ case FinalAction.None => maybePull()
+ case FinalAction.Complete => completeStage()
+ case FinalAction.Fail => failStage(failure)
+ case FinalAction.Restart =>
+ restartGatherer()
+ maybePull()
+ }
+
+ setHandlers(in, out, this)
+ }
+
+ override def toString = "Gather"
+}
+
/**
* INTERNAL API
*/
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1622dad4c6e..fdd2dae31ab 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -693,7 +693,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*
- * @since 1.3.0
+ * @since 2.0.0
*/
def mapOption[T](f: function.Function[Out, Optional[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.map(f(_)).collect {
@@ -868,6 +868,52 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
Optional.empty()
})
+ /**
+ * Transform each input element into zero or more output elements without requiring tuple or collection allocations
+ * imposed by the operator API itself.
+ *
+ * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields.
+ * The provided [[GatherCollector]] can emit zero or more output elements for each input element.
+ *
+ * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`.
+ *
+ * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion,
+ * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart.
+ * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart,
+ * and are ignored on downstream cancellation and abrupt termination.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the gatherer emits an element and downstream is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete`
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.Flow[In, T, Mat] =
+ new Flow(delegate.gather(() =>
+ new scaladsl.Gatherer[Out, T] {
+ private val gatherer = create.create()
+ private var currentCollector: scaladsl.GatherCollector[T] = _
+ private val javaCollector = new javadsl.GatherCollector[T] {
+ override def push(elem: T): Unit = currentCollector.push(elem)
+ }
+
+ override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = {
+ currentCollector = collector
+ gatherer.apply(in, javaCollector)
+ }
+
+ override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = {
+ currentCollector = collector
+ gatherer.onComplete(javaCollector)
+ }
+ }))
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala
new file mode 100644
index 00000000000..b56f24dd40a
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.javadsl
+
+import org.apache.pekko.annotation.DoNotInherit
+import org.apache.pekko.japi.function
+
+/**
+ * Collector passed to [[Gatherer]] for emitting output elements.
+ *
+ * The collector is only valid while the current [[Gatherer]] callback is running.
+ *
+ * @since 2.0.0
+ */
+@DoNotInherit
+trait GatherCollector[-Out] extends function.Procedure[Out] {
+ def push(elem: Out): Unit
+
+ final override def apply(param: Out): Unit = push(param)
+}
+
+/**
+ * A stateful gatherer for the `gather` operator.
+ *
+ * A new gatherer instance is created for each materialization and on each supervision restart.
+ * It can keep mutable state in fields.
+ *
+ * @since 2.0.0
+ */
+@FunctionalInterface
+trait Gatherer[-In, Out] extends function.Procedure2[In, GatherCollector[Out]] {
+
+ /**
+ * Called once whenever the stage terminates or restarts: on upstream completion, upstream failure,
+ * downstream cancellation, abrupt stage termination, or when the stage is restarted due to supervision.
+ *
+ * Elements pushed to the collector are emitted only on upstream completion, upstream failure,
+ * or supervision restart. They are ignored on downstream cancellation and abrupt termination.
+ */
+ def onComplete(collector: GatherCollector[Out]): Unit = ()
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index fe5b1f5ef0f..cb4f2ae576d 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -209,7 +209,7 @@ object Source {
/**
* Create a `Source` from an `Optional` value, emitting the value if it is present.
*
- * @since 1.3.0
+ * @since 2.0.0
*/
def fromOption[T](optional: Optional[T]): Source[T, NotUsed] =
if (optional.isPresent) single(optional.get()) else empty()
@@ -2761,6 +2761,52 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
Optional.empty()
})
+ /**
+ * Transform each input element into zero or more output elements without requiring tuple or collection allocations
+ * imposed by the operator API itself.
+ *
+ * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields.
+ * The provided [[GatherCollector]] can emit zero or more output elements for each input element.
+ *
+ * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`.
+ *
+ * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion,
+ * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart.
+ * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart,
+ * and are ignored on downstream cancellation and abrupt termination.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the gatherer emits an element and downstream is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete`
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.Source[T, Mat] =
+ new Source(delegate.gather(() =>
+ new scaladsl.Gatherer[Out, T] {
+ private val gatherer = create.create()
+ private var currentCollector: scaladsl.GatherCollector[T] = _
+ private val javaCollector = new javadsl.GatherCollector[T] {
+ override def push(elem: T): Unit = currentCollector.push(elem)
+ }
+
+ override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = {
+ currentCollector = collector
+ gatherer.apply(in, javaCollector)
+ }
+
+ override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = {
+ currentCollector = collector
+ gatherer.onComplete(javaCollector)
+ }
+ }))
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 14f525bb70f..80fe136d506 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -168,7 +168,7 @@ final class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*
- * @since 1.3.0
+ * @since 2.0.0
*/
def mapOption[T](f: function.Function[Out, Optional[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.map(f(_)).collect {
@@ -345,6 +345,36 @@ final class SubFlow[In, Out, Mat](
Optional.empty()
})
+ /**
+ * Transform each input element into zero or more output elements without requiring tuple or collection allocations
+ * imposed by the operator API itself.
+ *
+ * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields.
+ * `onComplete` is invoked on upstream completion, upstream failure, downstream cancellation,
+ * abrupt stage termination, and supervision restart.
+ *
+ * @since 1.3.0
+ */
+ def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.SubFlow[In, T, Mat] =
+ new SubFlow(delegate.gather(() =>
+ new scaladsl.Gatherer[Out, T] {
+ private val gatherer = create.create()
+ private var currentCollector: scaladsl.GatherCollector[T] = _
+ private val javaCollector = new javadsl.GatherCollector[T] {
+ override def push(elem: T): Unit = currentCollector.push(elem)
+ }
+
+ override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = {
+ currentCollector = collector
+ gatherer.apply(in, javaCollector)
+ }
+
+ override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = {
+ currentCollector = collector
+ gatherer.onComplete(javaCollector)
+ }
+ }))
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 7edb90afd92..e5d80fb7347 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -159,7 +159,7 @@ final class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*
- * @since 1.3.0
+ * @since 2.0.0
*/
def mapOption[T](f: function.Function[Out, Optional[T]]): SubSource[T, Mat] =
new SubSource(delegate.map(f(_)).collect {
@@ -336,6 +336,36 @@ final class SubSource[Out, Mat](
Optional.empty()
})
+ /**
+ * Transform each input element into zero or more output elements without requiring tuple or collection allocations
+ * imposed by the operator API itself.
+ *
+ * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields.
+ * `onComplete` is invoked on upstream completion, upstream failure, downstream cancellation,
+ * abrupt stage termination, and supervision restart.
+ *
+ * @since 1.3.0
+ */
+ def gather[T](create: function.Creator[javadsl.Gatherer[Out, T]]): javadsl.SubSource[T, Mat] =
+ new SubSource(delegate.gather(() =>
+ new scaladsl.Gatherer[Out, T] {
+ private val gatherer = create.create()
+ private var currentCollector: scaladsl.GatherCollector[T] = _
+ private val javaCollector = new javadsl.GatherCollector[T] {
+ override def push(elem: T): Unit = currentCollector.push(elem)
+ }
+
+ override def apply(in: Out, collector: scaladsl.GatherCollector[T]): Unit = {
+ currentCollector = collector
+ gatherer.apply(in, javaCollector)
+ }
+
+ override def onComplete(collector: scaladsl.GatherCollector[T]): Unit = {
+ currentCollector = collector
+ gatherer.onComplete(javaCollector)
+ }
+ }))
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 49d0c8382ef..9d46820b863 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1011,7 +1011,7 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream cancels
*
* @param errorConsumer function invoked when an error occurs
- * @since 1.3.0
+ * @since 2.0.0
*/
def onErrorContinue[T <: Throwable](errorConsumer: Throwable => Unit)(implicit tag: ClassTag[T]): Repr[Out] = {
this.withAttributes(ActorAttributes.supervisionStrategy {
@@ -1264,6 +1264,35 @@ trait FlowOps[+Out, +Mat] {
None
})
+ /**
+ * Transform each input element into zero or more output elements without requiring tuple or collection allocations
+ * imposed by the operator API itself.
+ *
+ * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields or closures.
+ * The provided [[GatherCollector]] can emit zero or more output elements for each input element.
+ *
+ * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`.
+ *
+ * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion,
+ * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart.
+ * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart,
+ * and are ignored on downstream cancellation and abrupt termination.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the gatherer emits an element and downstream is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete`
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def gather[T](create: () => Gatherer[Out, T]): Repr[T] =
+ via(new Gather[Out, T](create))
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala
new file mode 100644
index 00000000000..12d8f027e63
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko.annotation.DoNotInherit
+
+/**
+ * Collector passed to [[Gatherer]] for emitting output elements.
+ *
+ * The collector is only valid while the current [[Gatherer]] callback is running.
+ *
+ * @since 2.0.0
+ */
+@DoNotInherit
+trait GatherCollector[-Out] {
+ def push(elem: Out): Unit
+}
+
+/**
+ * A stateful gatherer for the `gather` operator.
+ *
+ * A new gatherer instance is created for each materialization and on each supervision restart.
+ * It can keep mutable state in fields or closures.
+ *
+ * @since 2.0.0
+ */
+@FunctionalInterface
+trait Gatherer[-In, +Out] {
+ def apply(in: In, collector: GatherCollector[Out]): Unit
+
+ /**
+ * Called once whenever the stage terminates or restarts: on upstream completion, upstream failure,
+ * downstream cancellation, abrupt stage termination, or when the stage is restarted due to supervision.
+ *
+ * Elements pushed to the collector are emitted only on upstream completion, upstream failure,
+ * or supervision restart. They are ignored on downstream cancellation and abrupt termination.
+ */
+ def onComplete(collector: GatherCollector[Out]): Unit = ()
+}
+
+/**
+ * INTERNAL API
+ */
+@DoNotInherit
+@FunctionalInterface
+private[stream] trait OneToOneGatherer[-In, +Out] extends Gatherer[In, Out] {
+ def applyOne(in: In): Out
+
+ final override def apply(in: In, collector: GatherCollector[Out]): Unit =
+ collector.push(applyOne(in))
+}