Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,103 @@ class ZipWithIndexBenchmark {
}
.toMat(Sink.ignore)(Keep.right)

private val statefulMapZipWithIndex = Source
.repeat(1)
.take(OperationsPerInvocation)
.statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None)
.toMat(Sink.ignore)(Keep.right)

private val gatherPublicZipWithIndex = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new Gatherer[Int, (Int, Long)] {
private var index = 0L

override def apply(elem: Int, collector: GatherCollector[(Int, Long)]): Unit = {
val zipped = (elem, index)
index += 1
collector.push(zipped)
}
})
.toMat(Sink.ignore)(Keep.right)

private val gatherInternalOneToOneZipWithIndex = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new OneToOneGatherer[Int, (Int, Long)] {
private var index = 0L

override def applyOne(elem: Int): (Int, Long) = {
val zipped = (elem, index)
index += 1
zipped
}
})
.toMat(Sink.ignore)(Keep.right)

private val statefulMapIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.statefulMap(() => ())((state, elem) => (state, elem + 1), _ => None)
.toMat(Sink.ignore)(Keep.right)

private val gatherPublicIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new Gatherer[Int, Int] {
override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
collector.push(elem + 1)
})
.toMat(Sink.ignore)(Keep.right)

private val gatherInternalOneToOneIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new OneToOneGatherer[Int, Int] {
override def applyOne(elem: Int): Int = elem + 1
})
.toMat(Sink.ignore)(Keep.right)

private val statefulMapCountedIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.statefulMap(() => 0L)((index, elem) => (index + 1, elem + index.toInt), _ => None)
.toMat(Sink.ignore)(Keep.right)

private val gatherPublicCountedIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new Gatherer[Int, Int] {
private var index = 0L

override def apply(elem: Int, collector: GatherCollector[Int]): Unit = {
val incremented = elem + index.toInt
index += 1
collector.push(incremented)
}
})
.toMat(Sink.ignore)(Keep.right)

private val gatherInternalOneToOneCountedIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new OneToOneGatherer[Int, Int] {
private var index = 0L

override def applyOne(elem: Int): Int = {
val incremented = elem + index.toInt
index += 1
incremented
}
})
.toMat(Sink.ignore)(Keep.right)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchOldZipWithIndex(): Unit =
Expand All @@ -86,4 +183,49 @@ class ZipWithIndexBenchmark {
def benchNewZipWithIndex(): Unit =
Await.result(newZipWithIndex.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchStatefulMapZipWithIndex(): Unit =
Await.result(statefulMapZipWithIndex.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherPublicZipWithIndex(): Unit =
Await.result(gatherPublicZipWithIndex.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherInternalOneToOneZipWithIndex(): Unit =
Await.result(gatherInternalOneToOneZipWithIndex.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchStatefulMapIncrement(): Unit =
Await.result(statefulMapIncrement.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherPublicIncrement(): Unit =
Await.result(gatherPublicIncrement.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherInternalOneToOneIncrement(): Unit =
Await.result(gatherInternalOneToOneIncrement.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchStatefulMapCountedIncrement(): Unit =
Await.result(statefulMapCountedIncrement.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherPublicCountedIncrement(): Unit =
Await.result(gatherPublicCountedIncrement.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherInternalOneToOneCountedIncrement(): Unit =
Await.result(gatherInternalOneToOneCountedIncrement.run(), Duration.Inf)

}
38 changes: 38 additions & 0 deletions docs/gather_zero_allocation_evaluation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Pekko Gather/Zero-Allocation Operator 评估报告

## 背景与目标
- 目标:分析现有 statefulMap/statefulMapConcat 的分配情况,评估是否有必要引入类似 JDK 24 Gatherer 的 zero-allocation 操作符,并明确不能破坏现有 API。
- 范围:仅分析 Pekko Stream 当前实现与 JDK 24 Gatherer、SmallRye Mutiny 等对比。

## 现状分析
### statefulMap
- API:`(S, In) => (S, Out)`
- 每个元素分配一个 Tuple2((S, Out)),为 API 设计所致。
- 不能避免 per-element 分配。

### statefulMapConcat
- API:`(S, In) => (S, Iterable[Out])`
- 每个元素分配 Iterable 和 Iterator。
- 也是 API 设计决定,无法避免。

## JDK 24 Gatherer 对比
- JDK 24 Gatherer 通过 mutable state + 直接下游 push,理论上可实现零分配。
- 参考实现仍有部分分配(如 lambda/闭包等)。
- SmallRye Mutiny 也有类似设计,但仍有微小分配。

## 结论与建议
- 若要实现 zero-allocation,需设计全新 API,不能破坏 statefulMap 现有语义。
- 建议:如需极致性能,可新增 opt-in gather-like 操作符,保留现有 API。
- 现有 statefulMap/statefulMapConcat 的分配为 API 设计本质,非实现缺陷。

## 参考文件
- stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
- JDK 24 Gatherer 官方文档
- SmallRye Mutiny 源码

## 评估人
- 由 Pekko 迁移小组(gpt-4.1)完成
- 评估时间:2026-03-28

---
如需详细设计/原型实现,请补充需求。
69 changes: 69 additions & 0 deletions docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# gather

Transform each input element into zero or more output elements with a stateful gatherer.

@ref[Simple operators](../index.md#simple-operators)

## Signature

@apidoc[Flow.gather](Flow)

## Description

`gather` creates a new gatherer for each materialization. The gatherer can keep mutable state and emit zero or more
elements for each incoming element by calling `push` on the provided collector.

Unlike `statefulMap` and `statefulMapConcat`, the operator API itself does not require returning tuples or collections
for each input element. This makes `gather` a good fit for allocation-sensitive stateful transformations.

Patterns such as `zipWithIndex`, `bufferUntilChanged`, and `distinctUntilChanged` can be expressed by keeping mutable
state inside the gatherer and pushing outputs directly, instead of returning a new state/output wrapper for every
element.

Compared with `statefulMap`, `gather` covers the same common stateful streaming patterns used in this PR's test suite:
happy-path stateful mapping, delayed completion output, restart/stop supervision behavior, and backpressure-sensitive
one-output transformations. The main difference is that `statefulMap` exposes state as an explicit return value,
including `null` state transitions, while `gather` keeps state inside the gatherer instance itself. Because of that,
`statefulMap` tests about `null` state do not translate one-to-one; the equivalent `gather` coverage focuses on the
observable stream behavior instead.

When the stage terminates or restarts, the gatherer's `onComplete` callback is invoked. Elements pushed from
`onComplete` are emitted before upstream-failure propagation, normal completion, or supervision restart, and are
ignored on downstream cancellation or abrupt termination.

The `gather` operator adheres to the ActorAttributes.SupervisionStrategy attribute.

For one-to-one stateful mapping see @ref:[statefulMap](statefulMap.md). For iterable-based fan-out see
@ref:[statefulMapConcat](statefulMapConcat.md).

## Examples

In the first example, we implement a `zipWithIndex`-like transformation.

Scala
: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #zipWithIndex }

Java
: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #zipWithIndex }

In the second example, we group incoming elements in batches of three and emit the trailing batch from `onComplete`.

Scala
: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #grouped }

Java
: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #grouped }

## Reactive Streams semantics

@@@div { .callout }

**emits** the gatherer emits an element and downstream is ready to consume it

**backpressures** downstream backpressures

**completes** upstream completes and all gathered elements have been emitted

**cancels** downstream cancels

@@@
2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="foldwhile"></a>@ref[foldWhile](Source-or-Flow/foldWhile.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream.|
|Source/Flow|<a name="frommaterializer"></a>@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
|Flow|<a name="futureflow"></a>@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.|
|Source/Flow|<a name="gather"></a>@ref[gather](Source-or-Flow/gather.md)|Transform each input element into zero or more output elements with a stateful gatherer.|
|Source/Flow|<a name="grouped"></a>@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.|
|Source/Flow|<a name="groupedadjacentby"></a>@ref[groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)|Partitions this stream into chunks by a delimiter function.|
|Source/Flow|<a name="groupedadjacentbyweighted"></a>@ref[groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)|Partitions this stream into chunks by a delimiter function and a weight limit.|
Expand Down Expand Up @@ -498,6 +499,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [futureFlow](Flow/futureFlow.md)
* [futureSink](Sink/futureSink.md)
* [futureSource](Source/futureSource.md)
* [gather](Source-or-Flow/gather.md)
* [groupBy](Source-or-Flow/groupBy.md)
* [grouped](Source-or-Flow/grouped.md)
* [groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)
Expand Down
81 changes: 81 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/flow/Gather.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
*/

package jdocs.stream.operators.flow;

import java.util.ArrayList;
import java.util.Arrays;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.GatherCollector;
import org.apache.pekko.stream.javadsl.Gatherer;
import org.apache.pekko.stream.javadsl.Source;

public class Gather {
static final ActorSystem system = null;

public void zipWithIndex() {
// #zipWithIndex
Source.from(Arrays.asList("A", "B", "C", "D"))
.gather(
() ->
new Gatherer<String, String>() {
private long index = 0L;

@Override
public void apply(String elem, GatherCollector<String> collector) {
collector.push("(" + elem + "," + index + ")");
index += 1;
}
})
.runForeach(System.out::println, system);
// prints
// (A,0)
// (B,1)
// (C,2)
// (D,3)
// #zipWithIndex
}

public void grouped() {
// #grouped
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.gather(
() ->
new Gatherer<Integer, String>() {
private final ArrayList<Integer> buffer = new ArrayList<>(3);

@Override
public void apply(Integer elem, GatherCollector<String> collector) {
buffer.add(elem);
if (buffer.size() == 3) {
collector.push(buffer.toString());
buffer.clear();
}
}

@Override
public void onComplete(GatherCollector<String> collector) {
if (!buffer.isEmpty()) {
collector.push(buffer.toString());
}
}
})
.runForeach(System.out::println, system);
// prints
// [1, 2, 3]
// [4, 5, 6]
// [7, 8, 9]
// [10]
// #grouped
}
}
Loading
Loading