[pip] PIP-474: Key_Shared Hot Key Overflow Mechanism#25706
[pip] PIP-474: Key_Shared Hot Key Overflow Mechanism#25706liangyepianzhou wants to merge 2 commits intoapache:masterfrom
Conversation
lhotari
left a comment
There was a problem hiding this comment.
I'd suggest analyzing 3 alternative designs before deciding on the solution.
Alternative 1:
I'd suggest looking into an alternative design that achieves the same outcome of allowing the subscription cursor to advance. Instead of making copies of the messages, an alternative design would be to create another subscription to track the slow or hot keys. Essentially, the design could be very similar as diverting to the overflow managed ledger, but there wouldn't be a need to duplicate the data and get into a situation where different failure modes cause unnecessary complications.
Alternative 2:
Simply optimize the replay queue solution together with improving the scalability of individualDeletedMessages so that it scales to 1,000,000 ack holes and beyond. This would result in the simplest solution, which would cover most use cases. There are multiple benefits in keeping the solution simple. For example backlog management doesn't change.
Alternative 3:
The client-side code could simply route to a separate topic on its own when it detects a hot key and acknowledge the original message.
|
|
||
| Increasing `keySharedLookAheadMsgInReplayThresholdPerConsumer` and `keySharedLookAheadMsgInReplayThresholdPerSubscription` may seem like the most straightforward mitigation, but there are three structural issues: | ||
|
|
||
| 1. **Amplified I/O waste**: A larger replay queue means more stuck-Key messages are read from BK each Replay Read cycle, yet all of them are returned unchanged because the target Consumer has no permits—pure I/O waste. |
There was a problem hiding this comment.
Thanks for pointing this out. I had plans to improve this area after PIP-379 and PIP-430, but it hasn't happened.
It would be possible to avoid unnecessary reading from cache / BK by skipping reads for positions that are for consumers that don't have permits available and permits are taken into account for how many entries are read for each hash. This optimization should be done regardless of any change to optimize for hot keys.
There was a problem hiding this comment.
After checking this again, the existing implementation is already skipping reading from cache / BK.
This happens in ReplayPositionFilter:
This means that there's no "amplified I/O waste" in the way that has been described. However there are most likely some corner cases that can be optimized further. However PIP-430 (since 4.1.0) mitigates most of the issues of unnecessary re-reads. Please take a look at https://github.com/apache/pulsar/blob/master/pip/pip-430.md.
The PIP-430 broker cache should be sufficiently tuned for high scale use cases to avoid unnecessary BK reads when using Key_Shared subscriptions.
|
|
||
| ``` | ||
| hash in replay → Normal Read messages for same hash are blocked → enter replay → replay grows | ||
| → Normal Read batch shrinks → replay reaches limit faster → Normal Read stops → global blackout |
There was a problem hiding this comment.
"global blackout" is confusing. A better wording would be "no progress for any consumer"
| → Normal Read batch shrinks → replay reaches limit faster → Normal Read stops → global blackout | ||
| ``` | ||
|
|
||
| The original design intent of this check is entirely correct (M11 must not be dispatched before M10 is consumed). But under traffic skew or when some Consumers stall, this ordering mechanism becomes an avalanche accelerator. **A few Keys' problem escalates into a catastrophe for all Keys.** |
There was a problem hiding this comment.
"catastrophe" is confusing. It's better to describe the actual impact. "A single hot key exhausting a single consumer could result in blocking progress for all consumers."
| 2. **Normal Read batch compression**: `getMaxEntriesReadLimit()` returns `max(effectiveLimit - replaySize, 1)`—larger replay means smaller Normal Read, merely delaying the blackout rather than solving it. | ||
| 3. **Mark-delete gap inflation**: Stuck-Key messages remain unacked for extended periods, preventing mark-delete from advancing. `individualDeletedMessages` grows continuously and may exceed `managedLedgerMaxUnackedRangesToPersist`, causing confirmed ranges to be discarded and messages to be redelivered; on broker restart, the system must rescan all gaps from mark-delete. | ||
|
|
||
| **We need a mechanism to completely remove stuck-Key messages from the main dispatch path so that neither Normal Read nor mark-delete is affected by them.** |
There was a problem hiding this comment.
This sentence goes directly to a single solution ("remove stuck-key messages from the main dispatch path"). We should be open for multiple options and choose the one that solves the problem.
| Increasing `keySharedLookAheadMsgInReplayThresholdPerConsumer` and `keySharedLookAheadMsgInReplayThresholdPerSubscription` may seem like the most straightforward mitigation, but there are three structural issues: | ||
|
|
||
| 1. **Amplified I/O waste**: A larger replay queue means more stuck-Key messages are read from BK each Replay Read cycle, yet all of them are returned unchanged because the target Consumer has no permits—pure I/O waste. | ||
| 2. **Normal Read batch compression**: `getMaxEntriesReadLimit()` returns `max(effectiveLimit - replaySize, 1)`—larger replay means smaller Normal Read, merely delaying the blackout rather than solving it. |
There was a problem hiding this comment.
"blackout" is confusing here. "delaying the situation where all consumers get blocked"
|
|
||
| ## In Scope | ||
|
|
||
| - Completely remove stuck-Key messages from the main dispatch path so that Normal Read is no longer blocked and other Consumers are unaffected |
Thanks for the feedback. Let's first analyze the design approaches, and then we can refine the wording details. Here are my thoughts on the three alternatives you proposed: Alternative 1
The auxiliary cursor would need to individually ack all non-hot-key messages (keeping only hot-key messages unacked). This means
The auxiliary cursor prevents ledger GC at the entire ledger granularity — as long as the auxiliary cursor's mark-delete hasn't advanced past a ledger, the entire ledger is retained (containing messages for all keys). The Overflow ML, by contrast, only stores hot-key messages. If hot keys account for 5% of traffic:
Every normal message must be acked on two cursors (original + auxiliary), requiring changes throughout the dispatch-ack chain. Conclusion: This approach cannot claim "no data duplication" — it trades broader ledger retention for not copying, and is more invasive to implement than Overflow. That said, the starting point (avoiding data duplication) is reasonable — it's just that the overall cost turns out to be higher. Alternative 2
Scaling
The backlog would include a large volume of stuck-key messages that are known to be unconsumable, misleading operational judgment. The Overflow approach isolates them so the backlog reflects only genuinely consumable messages — which is actually more accurate semantics. Conclusion: This approach is useful for mild scenarios (e.g., brief consumer restarts), but is ineffective for sustained hot keys. Alternative 3 This approach has the most fundamental contradiction — a stuck consumer cannot rescue itself. Client transparency is violated: all Pulsar client libraries (Java, C++, Python, Go, Node.js…) would need to implement hot-key detection + routing logic. This is a massive cross-language engineering effort and is invasive to users. Conclusion: This is the weakest alternative. It pushes a broker-side scheduling problem to the client, yet the client is precisely the victim of the problem (stuck consumer) and cannot rescue itself. The core advantage of the Overflow ML approach is complete isolation: hot-key messages are entirely removed from the main dispatch path — not tagged, not deferred, not accommodated by enlarging capacity — so that the replay queue, Normal Read, and mark-delete all return to normal. The cost (additional BK writes) is precisely proportional — only hot-key messages are written, rather than retaining entire ledgers. |
|
Thanks for the detailed response! I really appreciate you taking the time to think through the alternatives. Let me share some thoughts in return. I'd suggest validating Alternative 2 thoroughly before deciding to implement PIP-474. The reason is that Alternative 2 is the simplest approach, and by following this path, we avoid the new failure modes that the proposed PIP-474 implementation introduces. I'd also like to share some comments on your response. Rather than pushing back on the alternatives, I think it would be more useful to genuinely put effort into comparing them.
There seems to be a misconception here. The additional cursor would be handled on the broker side, and it would acknowledge all messages that don't contain hashes that are diverted. On the broker side, there would obviously need to be state about which hashes are diverted. This state could be stored in a similar way to how individual acks are stored in the managed ledger or the metadata store. When the additional cursor has caught up, it could be deleted. There could be a solution to address high volume traffic since the cursor would never catch up under continuous high volume traffic.
Is this really a problem in practice? How much data could the slow consumers actually accumulate in your use case?
I'm not sure I see the problem here. The required changes are relatively minor.
I'd respectfully disagree with this conclusion. There's no data duplication in this case. The data duplication actually happens in the currently proposed solution. For ledgers with multiple subscriptions, the proposed solution would duplicate data for all hot/slow keys. In this alternative, the only tradeoff is retention for as long as there are slow keys, and that's most likely not a real problem in practice.
I'd see this differently. As long as the individual ack solution can scale to handle a peak, it's sufficient. I believe that the current limit for the compressed size of stored individual acks is about 5MB. A roaring bitmap ( If this isn't sufficient, the consumer side should be improved. One possible reason could be that the consumer is handling messages serially, and a slow key could result in head-of-line blocking of further keys. In that case, the individual consumer should be vertically scaled with more CPU and memory to handle its workload, alongside using asynchronous message processing or MessageListener with Java 21 virtual threads to handle more keys in parallel for I/O-intensive workloads. Here's an example of how to achieve high parallelism (concurrency) using Pulsar MessageListener and Java 21 virtual threads: https://github.com/lhotari/dss25-mastering-key-ordered-demo/blob/740dd8d1b350b8201266fb62d0456f44806299ba/pulsar-listener/src/main/java/com/github/lhotari/dss25/listener/PulsarListenerApp.java#L145-L188. (inspired by @pdolif's work and implementation). This is especially suitable for workloads where the consumer handles processing through external services, e.g. calling REST APIs. In that case, the processing itself isn't compute-intensive. (explained in my DSS 25 presentation)
It's true that this could happen. For use cases where this is a challenge, the load balancer should be configured in ways that avoid moving the topic to another node. Broker restarts aren't a common scenario and typically only happen during cluster upgrades. Broker nodes should be sufficiently overprovisioned so that they can handle the load, and backlogs can be caught up in such scenarios.
There are already stats for how many keys are backlogged as slow keys — it's currently the size of the replay queue. Since pending acks are kept in memory, we could also add a separate REST API to gain more details about the actual keys that are part of the replay queue, including the message keys and the cardinality for each consumer.
I agree that this isn't easy to implement on the application side without support in the Pulsar client and broker. There are similarities to the problem I described in my DSS25 presentation, Challenge #2: Error handling in key-ordered message processing (https://youtu.be/AkSGvYP4r88?si=E8YwCCHp0t2UQO7G&t=478). Sending failed messages to a dead letter queue results in out-of-order message processing since messages are skipped. A similar problem is already present with negative-acks and DLQ handling for Key_Shared. This isn't an easy problem to solve. The solution for negative-acks is slightly simpler. One possible solution is to introduce client-side and broker-side changes to pause sending of hashes for a specific key, so that negative-ack usage wouldn't break ordering guarantees. The pause-sending-of-hashes solution could be extended to re-assign hashes of all other keys to another consumer with less load when a single consumer is occupied processing hot keys. The benefit of this would be that other keys in the consumer's hash range wouldn't cause ack holes when the hot key eventually causes head-of-line blocking once permits are exhausted. Thanks again for engaging deeply with this. I really do hope we can validate Alternative 2 properly before committing to PIP-474, since simpler solutions tend to have fewer surprises down the road.. |
|
Thank you for your feedback. I'll keep my response brief to avoid making the thread too long for others to review and share their opinions. Alt 2 does not solve any of the core problems — it only delays their onset. The argument "if it survives the peak, it's enough" does not hold for sustained hot key scenarios. In production, we cannot bet that "sustained slow-consumption hot keys will never occur," especially for critical business workloads. This would undermine users' confidence in Pulsar. Alt 1 and Overflow ML both solve the core problems. Alt 1's cost is storage amplification (the auxiliary cursor's mark-delete cannot advance → entire ledgers are retained) + longer broker restart recovery time (the auxiliary cursor must replay from far behind). Overflow ML's cost is a secondary BK write for hot key data. Alt 3 is the weakest — the victim (stuck consumer) cannot self-rescue. I still lean toward Overflow ML because it addresses all the core problems. Its cost — writing hot key data to a secondary BK ledger — can be managed through disk capacity planning and expansion. Hot keys may persist for a long time, but their data volume is typically a small fraction of total traffic. Meanwhile, its backlog and consumption progress metrics remain clean and straightforward for operators. Alt 1's storage amplification can also be addressed via disk expansion, but the longer broker restart recovery time is a harder trade-off. Alt 1 could also expose additional metrics for backlog and consumption progress, but that seems more complex and less user-friendly. Overall, Overflow ML provides cleaner operational visibility. Looking forward to hearing more voices and feedback. |
|
@lhotari Thank you all for the comments and feedback — I've gone through them carefully. Based on the discussion so far, I've restructured the Alternatives section of the PIP. The key change is separating the analysis into distinct layers:
This structure also has a practical benefit for PR review: each point (problem classification, per-problem resolution, cost dimension) can be discussed independently in its own comment thread, rather than getting lost in a single long-form reply. The updated PIP is ready for review. I'd suggest we start by aligning on the problem classification (the P1–P5 table) — once we agree on what must be solved, the solution comparison follows naturally. |
|
Responding to the previous comment here.
As a general thought, I think building a system that covers 100% of all possible use cases would add complexity, impact reliability, and increase the risk of incidents. For the minority of use cases that aren't covered by the current solution, I believe we can find ways to mitigate the possible issues. As I mentioned earlier, it's already possible to scale the individual ack holes solution to 1M entries, which would help mitigate the issues further. On the client application side, for use cases with hot keys, I think there could be ways to identify which keys tend to be hot. In those cases, the application could route the hot keys to separate topics already at produce time. The same could also be done in a Pulsar Function (or another application) that splits hot keys to another topic, although that wouldn't be as efficient as splitting at produce time. As I mentioned earlier, on the client side there are also multiple ways to scale consumers so they can handle hot/slow keys while still processing other keys. The virtual threads MessageListener example is one nice possibility.
This is already optimized so that already acknowledged messages are skipped even when a cursor replays from far behind.
I agree that it feels cleaner as a plan at first thought. The main reason for my pushback is that adding complexity in a distributed system usually impacts reliability by introducing new failure modes. That's why I'm pushing back, so that we also look into other solutions that could improve the existing architecture and still meet the main goal of this PIP. First, we should agree on the problem statement and address the problems specifically:
By taking a look at each problem and listing the possible alternatives, we could make improvements that result in a completely different solution than the "Overflow ML" one. There could be better ways to address the root causes of the problem with minimal added complexity. Besides solving the hot/slow key problems for Key_Shared, there could be broader benefits if we improve individual ack handling and address head-of-line blocking issues that possibly impact other subscription types (Shared, Exclusive, Failover) besides Key_Shared. Head-of-line blocking is also a real issue for Shared subscriptions. In extreme cases, users currently work around it for Shared subscriptions by setting the receiver queue size to 0 or 1. A very small receiver queue size kills performance and doesn't work efficiently with workloads that mix fast and slow consumers, or with workloads containing messages whose processing durations vary widely. |
This is helpful. I guess this is also about drilling down deeper into the actual problem and causes.
Describing tradeoffs is useful. Expressing them in a table can be challenging, since explaining what each decision is trading off often requires more detail than a table can hold. That said, a table can still work as a high-level summary.
The impact on reliability and the risk of incidents are also important factors to weigh as part of "cost".
Deciding on the solution too early has always been a problem with the Pulsar PIP process. Ideally, discussions and interactions would happen in the early stages, while everyone is still open to different solutions. Otherwise, it easily turns into a situation where people feel they need to defend their point of view, and the synergy that a team can find together isn't leveraged. What could as a result of this that a real problem doesn't get solved at all due to lack of consensus and initially motivated contributors turning away. Every decision and solution is a tradeoff. Making decisions reversible or solutions optional is extremely useful to reduce risk and avoiding increased maintenance overhead. Instead of adding more conditionals to the existing PersistentDispatcherMultipleConsumers/PersistentStickyKeyDispatcherMultipleConsumers classes, it would be preferable to introduce method hooks so that another, more specialized implementation can plug in. That would avoid piling more maintenance overhead onto the existing classes when a special-case implementation such as the overflow ML is added to the codebase. Another possibility is to add configuration so that a third-party implementation can be used for Key_Shared or Shared subscription dispatchers. One way for you to make progress regardless of what gets decided about PIP-474 is to add this possibility — refactor the dispatcher classes with hooks so that the implementation can be extended, and add configuration so that the broker can use a third-party dispatcher implementation. Such a change could be handled in a separate PIP, and it would also be useful for other use cases. This type of solution would also be a good fit for implementing PIP-474 as a separate dispatcher implementation initially and have it later be merged into Pulsar when it's a proven solution.
Makes sense. I'd just suggest taking a look at the "pluggable dispatcher" idea and focusing on that first in a separate PIP. That would let you make progress quickly without getting stalled by the decisions in this PIP. The "pluggable dispatcher" could also be considered a prerequisite for this PIP. Besides the dispatcher itself, there might be a need to extend the interface so that the dispatcher can expose implementation-specific statistics or metrics. It might also touch other areas, which could broaden the scope enough to support something like the PIP-474 implementation. Covering "overflow ML" could require several other plugin interfaces so that the lifecycle of the ML can be tied to the subscription and the topic. Adding such interfaces deserves more attention. Coupling the "overflow ML" directly to other internals wouldn't be a great idea. Pulsar has multiple listener interfaces, but they likely don't cover every need. One way to plug into Pulsar today is by implementing a TopicFactory (configurable via topicFactoryClassName). Implementing a TopicFactory and creating a custom subclass of PersistentTopic is one way to hook into the required extension points from custom plugins. Many hooks might be missing for easily extending up to the broker side Consumer level if that happens to be required. Looking forward to your opinion about first focusing on the "pluggable dispatcher" part. |
|
Very constructive insights and suggestions! I agree that we should initiate a separate PIP first to implement a pluggable dispatcher. |

Motivation
Key_Shared is the only subscription type in Pulsar that enables parallel consumption while preserving per-key message ordering — the sole built-in answer to the "slow consumer + ordering" problem (see also the related discussion).
However, Key_Shared currently has a critical production-readiness issue: when even a single consumer stalls (e.g., downstream timeout), the stuck keys' messages flood the Replay queue and — through the
containsStickyKeyHashordering check — starve all other healthy keys within minutes. In the scenario analyzed in this PIP, a single consumer failure turns 500K msg/s real-time consumption into near-total starvation across all 200 partitions in under 3 minutes.This is especially pressing in the AI era. AI inference workloads increasingly rely on message queues as their async transport layer, yet inference is inherently slow (seconds per request) and demands strict per-key ordering. Key_Shared is the natural fit — but its current hot-key starvation problem makes it unsuitable for production AI pipelines.
This PIP proposes the Hot Key Overflow Mechanism: when a small number of keys dominate the replay queue, their messages are diverted to an independent Overflow ManagedLedger. This unblocks Normal Read, allows mark-delete to advance, and maintains all guarantees (at-least-once, per-key ordering, client transparency).
For full details — including the scenario analysis, mechanism design, entry format, write-then-ack protocol, hot-key detection strategy, and alternatives considered — please see the PIP document.