fix unhandler error -172 on entry committable causing the server to crash#2731
fix unhandler error -172 on entry committable causing the server to crash#2731SylvainSenechal wants to merge 1 commit intodevelopment/9.4from
Conversation
Hello sylvainsenechal,My role is to assist you with the merge of this Available options
Available commands
Status report is not available. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files
... and 3 files with indirect coverage changes
@@ Coverage Diff @@
## development/9.4 #2731 +/- ##
===================================================
- Coverage 74.75% 74.53% -0.23%
===================================================
Files 200 200
Lines 13610 13613 +3
===================================================
- Hits 10174 10146 -28
- Misses 3426 3457 +31
Partials 10 10
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
|
LGTM |
Waiting for approvalThe following approvals are needed before I can proceed with the merge:
|
| try { | ||
| this._consumer.offsetsStore([{ topic, partition, offset: committableOffset }]); | ||
| } catch (e) { | ||
| // ERR__STATE (-172) means the consumer is not in a valid state |
There was a problem hiding this comment.
So this stuff seems to happen on rebalance or weird kafka status.
- Before, the pod would crash, the offset wouldn't be commited, and another one pod would reconsume and re process it
- Now : the pod doesn't crash, but the offset is still not commited. and reconsumed later by another pod, or maybe the same pod since now it doesn't crash anymore
There was a problem hiding this comment.
do we know when this happens?
in order to properly handle rebalance, we actually need to commit before the rebalance (hence the logic implemented here). If there are some cases where it does not work, we need to understand why/how, and not only mask the problem....
(there will always be corner cases where we cannot commit and will re-play the message, that is fine... but we need to understand why/when it happens, to make sure this is really the unavoidable corner case and not a deeper issue)
There was a problem hiding this comment.
I don't really know when, and more importantly why this happen :/
Maybe it could be interesting to wait for the pr from Thomas to be merged and run in zenko ci to see if this issue still exists
There was a problem hiding this comment.
Okay, I repulled the zenko ci logs from a successful run and asked claude to analyze and pay attention to the timing of the logs.
What was found was that this is happening during a rebalance, we first have one pod handling all the partitions, then the operator deployed another pod, and and when that new pod joined the Kafka consumer group, a rebalance was triggered.
The old pod had 5 partitions revoked and was re-assigned only 2 of them. The problem is: some tasks that were already in-flight finished processing after the revoke happened. When their callback tried to call offsetsStore() to mark the offset as ready to commit, librdkafka threw ERR__STATE because those partitions were no longer assigned to this pod so it crashed
| // state transition) — treat as non-fatal and log at debug level. | ||
| const logger = this._consumer.isConnected() && | ||
| e.code !== kafka.CODES.ERRORS.ERR__STATE | ||
| ? this._log.error : this._log.debug; |
There was a problem hiding this comment.
this is never a debug log : we don't expect this to happen at all... So always ouptutting an error log may be appropriate
| this._consumer.offsetsStore([{ topic, partition, | ||
| offset: committableOffset }]); | ||
| if (committableOffset !== null && !this.isPaused() && this._consumer.isConnected()) { | ||
| try { |
There was a problem hiding this comment.
since you added the isConnected() guard, do we also need to catch the exception?
There was a problem hiding this comment.
I think we could still have cases where is connected returns true, but then inside offsetStore something wrong happens 🤔
Issue: BB-758
Seen in zenko ci :
