Skip to content

Conversation

@PhongChuong
Copy link
Collaborator

Support messaging ordering in Publisher by creating a new BatchWorker which handles that batching and message ordering logic.

When the Worker receives an Publish operation, it demultiplex it into the corresponding BatchWorker which ensures that there is at most one inflight batch at any given time.
When the Worker receives a Flush operation, it flushes all BatchWorker pending message and waits until the operation is complete.

As of now, the set of BatchWorkers grows indefinitely. This is be address in the next PR.

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Dec 23, 2025
@codecov
Copy link

codecov bot commented Dec 23, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 95.24%. Comparing base (317b8d2) to head (39e1124).
⚠️ Report is 48 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4124      +/-   ##
==========================================
- Coverage   95.25%   95.24%   -0.02%     
==========================================
  Files         178      178              
  Lines        6811     6808       -3     
==========================================
- Hits         6488     6484       -4     
- Misses        323      324       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@PhongChuong PhongChuong marked this pull request as ready for review December 23, 2025 22:27
@PhongChuong PhongChuong requested a review from a team as a code owner December 23, 2025 22:27
@PhongChuong PhongChuong requested a review from suzmue December 23, 2025 22:27
Copy link
Collaborator

@suzmue suzmue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as ordering, this looks pretty good. However, I think this removed support for non-ordered messages where we can have multiple inflight batches.

Please let me know if I'm mistaken.

let (tx, rx) = oneshot::channel();
batch_worker
.send(ToBatchWorker::Flush(tx))
.expect("Batch worker should not close the channel");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Consider using a helper message or helper function to avoid the duplication of this error message all over.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a helper message for now. It's likely to change when we implement the batch worker cleanup PR.

/// `Worker` drops the Sender.
pub(crate) async fn run(mut self) {
// While it is possible to use Some(JoinHandle) here as there is at max
// a single inflight task at any given time, the use of JoinSet
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't true when ordering key is "".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a TODO to handle the empty ordering key special case.

break;
}
self.move_to_batch();
self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using flush for the timer seems slightly inefficient since if we have a max batch size of 1000 and we have a batch out and 1001 messages in the queue, then we we will send that single message as a batch of 1 even if more come in. I guess that is a possibility with having the timeout anyway, so we maybe its not a real problem and we don't need it to be changed necessarily.

This also makes we wonder actually if we have an outstanding batch, would it be better to just leave all messages in the channel? to avoid having to copy things around as much? I'm just curious about this. Something to maybe try out later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. Lets discuss during our meeting tomorrow. It largely depends on how strict we want to respect the user configured batch size.
Note: In the worst case, we still need to batch if the batch size is greater than the backend limit.

PhongChuong and others added 2 commits January 5, 2026 12:13
Co-authored-by: Suzy Mueller <suzmue@google.com>
@PhongChuong
Copy link
Collaborator Author

@suzmue , thanks for the review.
In this implementation, we do not handle the special case of "" ordering key. I've added a TODO and handle that case in a later PR.
PTAL.

// While it is possible to use Some(JoinHandle) here as there is at max
// a single inflight task at any given time, the use of JoinSet
// simplify the managing the inflight JoinHandle.
// TODO(#4012): There is no inflight restriction when the ordering key is "".
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little concerned about delaying this to a future PR since this is a performance regression, but since we are still in preview I think its alright if it improves dev velocity for this to be submitted.

@PhongChuong PhongChuong merged commit 659b9e3 into googleapis:main Jan 5, 2026
29 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants