fix(core/txpool): coordinate reset lifecycle and shutdown signaling #28837#2132
fix(core/txpool): coordinate reset lifecycle and shutdown signaling #28837#2132gzliudan wants to merge 1 commit intoXinFinOrg:dev-upgradefrom
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR improves internal synchronization within the core/txpool/txpool.go event loop, specifically around background reset lifecycle management and shutdown signaling. It introduces a termination channel (term) to signal when the pool has stopped, a sync channel for simulator/testing use to force synchronous reset completion, and resetForced/resetWaiter state variables to track pending forced resets and notify waiters upon completion or pool termination.
Changes:
- Added
term chan struct{}field to signal pool termination (closed viadeferinloop()) - Added
sync chan chan errorfield plusresetForced/resetWaiterstate to support forced-reset synchronization for simulator use - Ensured active reset waiters are notified with an error upon pool shutdown
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| term: make(chan struct{}), | ||
| sync: make(chan chan error), |
There was a problem hiding this comment.
The sync field (chan chan error) is added to the TxPool struct and initialized in New(), and the loop consumes from it via case syncc := <-p.sync. However, there is no exported method on TxPool (such as a Sync() error method) that sends to this channel. The channel is therefore inaccessible to any caller outside the package, making the entire sync/forced-reset mechanism completely unreachable from the intended simulator or testing use cases described in the PR. The companion public method is missing from this change.
| // Notify the live reset waiter to not block if the txpool is closed. | ||
| defer func() { | ||
| if resetWaiter != nil { | ||
| resetWaiter <- errors.New("pool already terminated") |
There was a problem hiding this comment.
The shutdown defer (lines 143–148) sends on resetWaiter using a blocking send (resetWaiter <- errors.New("pool already terminated")). If the caller that originally sent its channel via p.sync is no longer actively receiving (e.g., it has already been cancelled or timed out), this blocking send will deadlock the loop() goroutine's shutdown path indefinitely, preventing the pool from terminating cleanly. The channel should be sent to in a non-blocking manner or using a select with a default branch.
| // Notify the live reset waiter to not block if the txpool is closed. | |
| defer func() { | |
| if resetWaiter != nil { | |
| resetWaiter <- errors.New("pool already terminated") | |
| // Notify the live reset waiter without blocking if the txpool is closed. | |
| defer func() { | |
| if resetWaiter != nil { | |
| select { | |
| case resetWaiter <- errors.New("pool already terminated"): | |
| default: | |
| } |
| // the forced op is still pending. In that case, wait another round | ||
| // of resets. | ||
| if resetWaiter != nil && !resetForced { | ||
| resetWaiter <- nil |
There was a problem hiding this comment.
The notification at resetWaiter <- nil (line 194) is a blocking send on an unbuffered chan error. If a Sync() caller has abandoned its wait (e.g., due to context cancellation or timeout), this send will permanently block the main loop goroutine, preventing it from processing any further chain head events or shutdown signals. The send should be non-blocking (using a select with default) to avoid stalling the event loop.
| resetWaiter <- nil | |
| select { | |
| case resetWaiter <- nil: | |
| // notification delivered | |
| default: | |
| // no active listener; avoid blocking the event loop | |
| } |
|
|
||
| subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown | ||
| quit chan chan error // Quit channel to tear down the head updater | ||
| term chan struct{} // Termination channel to detect a closed pool |
There was a problem hiding this comment.
The term channel is declared in the TxPool struct, initialized in New(), and closed via defer close(p.term) in loop(), but it is never read anywhere in the codebase. There are no callers selecting on p.term to detect pool termination. As written, this field has no observable effect and is dead code. Either the channel should be exposed (e.g., via a Terminated() method that returns the channel for callers to select on), or it should be removed to avoid confusion.
| // that any running reset operation finishes to make block imports | ||
| // deterministic. On top of that, run a new reset operation to make | ||
| // transaction insertions deterministic instead of being stuck in a | ||
| // queue waiting for a reset. |
There was a problem hiding this comment.
When a new sync request arrives via case syncc := <-p.sync, the code unconditionally assigns resetWaiter = syncc. If a previous resetWaiter was already set (from an earlier sync request that has not yet been fulfilled), it is silently overwritten and the previous caller goroutine that is blocked waiting on its channel will never receive a notification, causing an indefinite goroutine leak/hang.
Before overwriting resetWaiter, the prior waiter should be notified (e.g., with an appropriate error) to prevent the leak.
| // queue waiting for a reset. | |
| // queue waiting for a reset. | |
| if resetWaiter != nil { | |
| // A previous sync waiter is still pending; notify it to avoid | |
| // leaking a goroutine waiting on the old channel. | |
| resetWaiter <- errors.New("sync request superseded by a new request") | |
| resetWaiter = nil | |
| } |
…thereum#28837 Improve txpool loop synchronization around background resets. This change: - adds an explicit termination channel to signal pool shutdown - tracks forced-reset intent and a waiter channel inside the reset loop - ensures reset waiters are notified on completion or on pool termination - allows an explicit sync request path to trigger an additional reset round when needed Scope is limited to internal txpool concurrency control in core/txpool/txpool.go, with no protocol or RPC behavior change.
9204a8e to
3c60232
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (p *TxPool) Sync() error { | ||
| sync := make(chan error) | ||
| select { | ||
| case p.sync <- sync: | ||
| return <-sync | ||
| case <-p.term: | ||
| return errors.New("pool already terminated") | ||
| } | ||
| } |
There was a problem hiding this comment.
The new Sync() method, resetWaiter handling, and term channel shutdown signaling introduce complex concurrency behavior that lacks any unit test coverage. There are no test files at the core/txpool package level. Given the complexity of the added synchronization logic (e.g., forced reset lifecycle, waiter notification on pool termination), adding test cases to verify correct behavior and prevent regressions would be valuable. For example, tests for: (1) Sync() unblocking after a reset completes, (2) Sync() returning an error when the pool is closed, (3) correct waiter notification on pool shutdown.
| func (p *TxPool) Sync() error { | ||
| sync := make(chan error) | ||
| select { | ||
| case p.sync <- sync: | ||
| return <-sync | ||
| case <-p.term: | ||
| return errors.New("pool already terminated") |
There was a problem hiding this comment.
The string "pool already terminated" is used in two separate errors.New() calls (line 145 in the defer and line 432 in Sync()), resulting in two distinct error instances. The existing pattern in errors.go defines all package-level errors as exported sentinel variables (e.g., ErrAlreadyKnown, ErrTxPoolOverflow), which allows callers to compare with errors.Is(). A sentinel error such as ErrPoolTerminated would be consistent with this codebase convention and easier to compare programmatically.
| defer func() { | ||
| if resetWaiter != nil { | ||
| resetWaiter <- errors.New("pool already terminated") |
There was a problem hiding this comment.
This errors.New("pool already terminated") and the identical one in Sync() at line 432 are two separate error instances. Replacing both with a shared sentinel variable (e.g., ErrPoolTerminated defined in errors.go) would follow the existing pattern of package-level error variables in this file and make the error checkable via errors.Is().
Proposed changes
Improve txpool loop synchronization around background resets.
This change:
Scope is limited to internal txpool concurrency control in core/txpool/txpool.go, with no protocol or RPC behavior change.
Ref: ethereum#28837
Types of changes
What types of changes does your code introduce to XDC network?
Put an
✅in the boxes that applyImpacted Components
Which parts of the codebase does this PR touch?
Put an
✅in the boxes that applyChecklist
Put an
✅in the boxes once you have confirmed below actions (or provide reasons on not doing so) that