stream: add gather operator and tighten coverage#2824
Draft
He-Pin wants to merge 2 commits intoapache:mainfrom
Draft
stream: add gather operator and tighten coverage#2824He-Pin wants to merge 2 commits intoapache:mainfrom
He-Pin wants to merge 2 commits intoapache:mainfrom
Conversation
Motivation: add the gather operator across the Scala and Java DSLs, document it, tighten its execution semantics, and broaden its statefulMap-equivalent coverage. Modification: implement the gather stage and DSL wiring, add Scala/Java/docs examples and tests, add JMH coverage, optimize the public gather hot path, and fix the one-to-one backpressure bug found in review. Result: gather is now documented and verified end-to-end with stronger semantics, broader regression coverage, and near-parity zipWithIndex performance against statefulMap. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Motivation: follow-up review and documentation work for the new gather operator. Modification: correct the new gather API @SInCE annotations to 2.0.0 and document how gather coverage aligns with statefulMap semantics while differing on internal null-state handling. Result: the published PR now reflects the intended release version and explains the remaining semantic differences more clearly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Add the
gatheroperator across the Scala and Java DSLs, tighten its execution semantics, and broaden its statefulMap-equivalent regression coverage.This PR also includes the hot-path and backpressure fixes found during review, plus benchmark and documentation support for the new operator.
Modification
gatherAPI support in the Scala and Java DSLsGatherstage and supportingGatherer/GatherCollectorAPIsFlowGatherSpecplus Java parity coveragezipWithIndex-style workloadsOneToOneGathererbackpressure issue found during review@sincemarkers to2.0.0Result
gatheris documented and available end-to-end in Scala and Java DSLs2.0.0) for the new APIssbt --no-colors 'scalafmtAll' 'stream-tests/testOnly org.apache.pekko.stream.scaladsl.FlowGatherSpec' 'stream-tests/testOnly org.apache.pekko.stream.javadsl.FlowTest' 'stream-tests/testOnly org.apache.pekko.stream.javadsl.SourceTest' 'docs/test:compile' 'bench-jmh/Jmh/compile'gather-basedzipWithIndexis now near parity withstatefulMapReferences
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scalastream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scalastream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scalastream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scaladocs/src/main/paradox/stream/operators/Source-or-Flow/gather.md