sink: add metrics for blackhole sink#5042
Conversation
|
Skipping CI for Draft Pull Request. |
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThe blackhole sink is refactored to integrate changefeed-aware metrics tracking and adopt a new ChangesBlackhole Sink Metrics and Channel Refactoring
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request replaces the standard Go channel in the blackhole sink with an UnlimitedChannel and updates the BatchCount method to return the actual channel length. A critical issue was identified in the Run loop where the use of a blocking Get() call within a default block could prevent the sink from responding to context cancellation, hindering graceful shutdowns. It is recommended to use GetWithContext(ctx) instead.
| select { | ||
| case <-ctx.Done(): | ||
| return nil | ||
| case event := <-s.eventCh: | ||
| default: | ||
| event, ok := s.eventCh.Get() | ||
| if !ok { | ||
| log.Info("blackhole sink event channel closed") | ||
| return nil | ||
| } | ||
| event.PostFlush() | ||
| } |
There was a problem hiding this comment.
The current implementation of the Run loop has a cancellation issue. The select statement with a default block will immediately call s.eventCh.Get(), which is a blocking call. If the context is cancelled while Get() is waiting for an event, the loop will not exit until an event is received or the channel is closed. This prevents the sink from shutting down gracefully when idle. Using GetWithContext(ctx) allows the loop to respond to context cancellation while waiting for events.
event, ok, err := s.eventCh.GetWithContext(ctx)
if err != nil {
return nil
}
if !ok {
log.Info("blackhole sink event channel closed")
return nil
}
event.PostFlush()Signed-off-by: wk989898 <nhsmwk@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
downstreamadapter/sink/blackhole/sink.go (1)
64-81:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winDDL events must invoke their PostFlush callback.
WriteBlockEventrecords metrics but never callsevent.PostFlush()for DDL events. According to theSinkinterface contract (downstreamadapter/sink/sink.go:40-42), implementations are expected to callevent.PostFlush()on success. This omission causesTestBlacHoleSinkBasicFunctionalityto fail because the DDL event'sPostTxnFlushedcallbacks are never executed (expected count: 3, actual: 2).🐛 Proposed fix to invoke PostFlush
func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { switch event.GetType() { case commonEvent.TypeDDLEvent: e := event.(*commonEvent.DDLEvent) // NOTE: don't change the log, integration test `lossy_ddl` depends on it. // ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L17 log.Debug("BlackHoleSink: DDL Event", zap.Any("ddl", e)) ddlType := e.GetDDLType().String() if err := s.statistics.RecordDDLExecution(func() (string, error) { return ddlType, nil }); err != nil { log.Warn("failed to record DDL execution", zap.Error(err)) } + event.PostFlush() case commonEvent.TypeSyncPointEvent: + event.PostFlush() default: log.Error("unknown event type", zap.Any("event", event)) } return nil }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/sink/blackhole/sink.go` around lines 64 - 81, WriteBlockEvent currently handles DDL events but never invokes the required PostFlush callback; update sink.WriteBlockEvent (function sink.WriteBlockEvent) so that after recording DDL metrics (the s.statistics.RecordDDLExecution(...) block) you call event.PostFlush() (or e.PostFlush() if using the asserted DDLEvent) on successful handling; ensure the PostFlush call is executed only on the success path for commonEvent.TypeDDLEvent and does not change the existing log or metric behavior.
🧹 Nitpick comments (1)
downstreamadapter/sink/blackhole/sink.go (1)
34-34: 💤 Low valueUnexported return type from exported function.
The linter flags that
Newreturns the unexported type*sink. While this is a common Go pattern when the type implements a public interface, consider whether the return type should be the exportedSinkinterface defined indownstreamadapter/sink/sink.go. This would make the function signature more flexible and align with Go best practices for factory functions.♻️ Optional signature improvement
If the
Sinkinterface is meant to be the public contract, you can return it instead:-func New(changefeedID common.ChangeFeedID) (*sink, error) { +func New(changefeedID common.ChangeFeedID) (Sink, error) { return &sink{ eventCh: chann.NewUnlimitedChannelDefault[*commonEvent.DMLEvent](), statistics: metrics.NewStatistics(changefeedID, "sink"), }, nil }However, verify that
Sinkis defined in a way that doesn't create import cycles.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/sink/blackhole/sink.go` at line 34, Change the exported constructor New to return the exported Sink interface instead of the unexported *sink: update the signature from func New(changefeedID common.ChangeFeedID) (*sink, error) to func New(changefeedID common.ChangeFeedID) (Sink, error), and ensure the implementation returns the concrete &sink{} value as a Sink; also update any callers/variable types if needed and verify this does not introduce import cycles with the Sink interface definition.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@downstreamadapter/sink/blackhole/sink_test.go`:
- Line 96: The test assertion for sink.BatchCount() is stale: BatchCount now
returns the current channel length (eventCh.Len()) so update the test to either
assert the initial value is 0 (require.Equal(t, 0, sink.BatchCount())) or, if
you want the old behavior, push the expected number of events into the
sink/channel first (e.g., use the sink's enqueue/publish method to add 4096
events) and then assert require.Equal(t, 4096, sink.BatchCount()); reference the
BatchCount() method and eventCh.Len() semantics when making the change.
In `@downstreamadapter/sink/blackhole/sink.go`:
- Around line 103-105: The call to s.statistics.RecordBatchExecution currently
ignores its error return; update the Run loop (the block calling
s.statistics.RecordBatchExecution) to capture the returned error and handle it
according to repo guidelines: check the error value, wrap or map it to the
appropriate predefined repository error from docs/agents/error-handling.md (do
not create ad-hoc errors), and either log and continue or return/propagate it up
from the containing function consistent with surrounding error flow. Ensure you
reference the s.statistics.RecordBatchExecution call site and follow existing
patterns in this file for logging/propagation so behavior remains consistent
with other error handling.
- Around line 72-74: The call to s.statistics.RecordDDLExecution(…) ignores its
error; update the sink (in sink.go around s.statistics.RecordDDLExecution and
ddlType) to capture the returned error and handle it—either log the error using
the sink's logger (e.g., s.logger.Error/Warning) or return it up the call chain
from the enclosing method; when creating or wrapping any error use the
repository's predefined error types/patterns per docs/agents/error-handling.md
instead of ad-hoc strings so the error follows project conventions.
- Around line 97-106: Replace the blocking s.eventCh.Get() call with
s.eventCh.GetWithContext(ctx) in the default case so the wait respects context
cancellation: call GetWithContext(ctx) on s.eventCh (instead of Get()), check
its return values (handle the closed-channel/false case and any context-related
error), log and return on those cases the same way you already do for Get, then
proceed to call s.statistics.RecordBatchExecution(...) and event.PostFlush() as
before; ensure the surrounding function has a context variable named ctx passed
in or available to use.
---
Outside diff comments:
In `@downstreamadapter/sink/blackhole/sink.go`:
- Around line 64-81: WriteBlockEvent currently handles DDL events but never
invokes the required PostFlush callback; update sink.WriteBlockEvent (function
sink.WriteBlockEvent) so that after recording DDL metrics (the
s.statistics.RecordDDLExecution(...) block) you call event.PostFlush() (or
e.PostFlush() if using the asserted DDLEvent) on successful handling; ensure the
PostFlush call is executed only on the success path for commonEvent.TypeDDLEvent
and does not change the existing log or metric behavior.
---
Nitpick comments:
In `@downstreamadapter/sink/blackhole/sink.go`:
- Line 34: Change the exported constructor New to return the exported Sink
interface instead of the unexported *sink: update the signature from func
New(changefeedID common.ChangeFeedID) (*sink, error) to func New(changefeedID
common.ChangeFeedID) (Sink, error), and ensure the implementation returns the
concrete &sink{} value as a Sink; also update any callers/variable types if
needed and verify this does not introduce import cycles with the Sink interface
definition.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: aa8d045c-7d63-4ac5-b4d4-5d59121a1d26
📒 Files selected for processing (3)
downstreamadapter/sink/blackhole/sink.godownstreamadapter/sink/blackhole/sink_test.godownstreamadapter/sink/sink.go
| sink, err := New() | ||
| sink, err := New(common.NewChangefeedID(common.DefaultKeyspaceName)) | ||
| require.NoError(t, err) | ||
| require.Equal(t, 4096, sink.BatchCount()) |
There was a problem hiding this comment.
Update test expectation to match new BatchCount semantics.
The test expects BatchCount() to return 4096, which was the old constant value. The new implementation returns eventCh.Len(), the actual number of events in the channel (0 at initialization). Update the test to reflect the new behavior or add events before asserting.
🧪 Proposed fix to match new semantics
Option 1: Assert the correct initial value
func TestBlackHoleSinkBatchConfig(t *testing.T) {
sink, err := New(common.NewChangefeedID(common.DefaultKeyspaceName))
require.NoError(t, err)
- require.Equal(t, 4096, sink.BatchCount())
+ require.Equal(t, 0, sink.BatchCount())
require.Zero(t, sink.BatchBytes())
}Option 2: Test after adding events
func TestBlackHoleSinkBatchConfig(t *testing.T) {
sink, err := New(common.NewChangefeedID(common.DefaultKeyspaceName))
require.NoError(t, err)
+
+ // Add some test events
+ helper := commonEvent.NewEventTestHelper(t)
+ defer helper.Close()
+ helper.Tk().MustExec("use test")
+ dmlEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')")
+ sink.AddDMLEvent(dmlEvent)
+
- require.Equal(t, 4096, sink.BatchCount())
+ require.Equal(t, 1, sink.BatchCount())
require.Zero(t, sink.BatchBytes())
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| require.Equal(t, 4096, sink.BatchCount()) | |
| require.Equal(t, 0, sink.BatchCount()) |
🧰 Tools
🪛 GitHub Actions: PR Build and Unit Test / 0_Next Gen Unit Tests.txt
[error] 96-96: downstreamadapter/sink/blackhole TestBlackHoleSinkBatchConfig failed: Not equal (expected: 4096, actual: 0).
🪛 GitHub Actions: PR Build and Unit Test / 4_Classic Unit Tests.txt
[error] 96-96: TestBlackHoleSinkBatchConfig failed: Not equal (expected: 4096, actual: 0).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@downstreamadapter/sink/blackhole/sink_test.go` at line 96, The test assertion
for sink.BatchCount() is stale: BatchCount now returns the current channel
length (eventCh.Len()) so update the test to either assert the initial value is
0 (require.Equal(t, 0, sink.BatchCount())) or, if you want the old behavior,
push the expected number of events into the sink/channel first (e.g., use the
sink's enqueue/publish method to add 4096 events) and then assert
require.Equal(t, 4096, sink.BatchCount()); reference the BatchCount() method and
eventCh.Len() semantics when making the change.
| default: | ||
| event, ok := s.eventCh.Get() | ||
| if !ok { | ||
| log.Info("blackhole sink event channel closed") | ||
| return nil | ||
| } | ||
| s.statistics.RecordBatchExecution(func() (int, int64, error) { | ||
| return int(event.Len()), event.GetSize(), nil | ||
| }) | ||
| event.PostFlush() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Check UnlimitedChannel API for context-aware methods
# Search for UnlimitedChannel method signatures
ast-grep --pattern $'type UnlimitedChannel[$_, $_] struct {
$$$
}'
ast-grep --pattern $'func ($_ *UnlimitedChannel[$_, $_]) GetWithContext($$$) $$$'Repository: pingcap/ticdc
Length of output: 435
🏁 Script executed:
# Check the UnlimitedChannel implementation to understand Get() behavior
cat -n utils/chann/unlimited_chann.go | head -120 | tail -80Repository: pingcap/ticdc
Length of output: 2758
🏁 Script executed:
# Look at the actual sink.go file to see the full Run() method context
cat -n downstreamadapter/sink/blackhole/sink.go | sed -n '80,120p'Repository: pingcap/ticdc
Length of output: 1013
🏁 Script executed:
# Check for other patterns of UnlimitedChannel usage in the codebase
rg -A 3 "\.Get\(\)" --type go | grep -A 3 "select"Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# View the complete getWithContext implementation
cat -n utils/chann/unlimited_chann.go | sed -n '106,150p'Repository: pingcap/ticdc
Length of output: 1217
🏁 Script executed:
# Search for similar patterns in other sink implementations
fd -e go -path "*sink*.go" | xargs grep -l "\.Get()" | head -5Repository: pingcap/ticdc
Length of output: 226
🏁 Script executed:
# Find other implementations using the same sink pattern
rg "\.Run\(ctx.*context\.Context\)" --type go -A 15 | grep -A 15 "select"Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Look for other uses of GetWithContext to understand idiomatic pattern
rg "GetWithContext" --type go -B 2 -A 5Repository: pingcap/ticdc
Length of output: 3359
🏁 Script executed:
# Get the full context of the cloudstorage pattern
cat -n downstreamadapter/sink/cloudstorage/dml_writers.go | sed -n '80,120p'Repository: pingcap/ticdc
Length of output: 1194
🏁 Script executed:
# Check if there are other sinks that use GetWithContext properly
rg "GetWithContext" --type go -B 5 | grep -A 5 "for {" | head -30Repository: pingcap/ticdc
Length of output: 39
Use GetWithContext to enable proper context cancellation handling.
The current pattern blocks on Get() in the default case, which cannot be interrupted by context cancellation. When the context is cancelled, the goroutine remains blocked in Get() waiting for an event, delaying graceful shutdown. Replace with GetWithContext(ctx) to make the blocking channel operation respect the context:
Proposed fix
func (s *sink) Run(ctx context.Context) error {
for {
- select {
- case <-ctx.Done():
- return nil
- default:
- event, ok := s.eventCh.Get()
- if !ok {
- log.Info("blackhole sink event channel closed")
- return nil
- }
- s.statistics.RecordBatchExecution(func() (int, int64, error) {
- return int(event.Len()), event.GetSize(), nil
- })
- event.PostFlush()
- }
+ event, ok, err := s.eventCh.GetWithContext(ctx)
+ if err != nil {
+ return nil
+ }
+ if !ok {
+ log.Info("blackhole sink event channel closed")
+ return nil
+ }
+ s.statistics.RecordBatchExecution(func() (int, int64, error) {
+ return int(event.Len()), event.GetSize(), nil
+ })
+ event.PostFlush()
}
}🧰 Tools
🪛 GitHub Check: Check
[failure] 103-103:
Error return value of s.statistics.RecordBatchExecution is not checked (errcheck)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@downstreamadapter/sink/blackhole/sink.go` around lines 97 - 106, Replace the
blocking s.eventCh.Get() call with s.eventCh.GetWithContext(ctx) in the default
case so the wait respects context cancellation: call GetWithContext(ctx) on
s.eventCh (instead of Get()), check its return values (handle the
closed-channel/false case and any context-related error), log and return on
those cases the same way you already do for Get, then proceed to call
s.statistics.RecordBatchExecution(...) and event.PostFlush() as before; ensure
the surrounding function has a context variable named ctx passed in or available
to use.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
downstreamadapter/sink/blackhole/sink.go (3)
103-105:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winUnchecked error from
RecordBatchExecution(still flagged by errcheck).Errcheck continues to flag the dropped error on line 103. Capture and at minimum log it (combined fix shown in the
Runrefactor above). As per coding guidelines, use predefined repository errors; see docs/agents/error-handling.md before changing error creation, wrapping, or propagation.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/sink/blackhole/sink.go` around lines 103 - 105, The call to s.statistics.RecordBatchExecution in sink.go ignores the returned error; capture its error return and handle it (at minimum log it) instead of dropping it. Update the invocation in the block that currently calls s.statistics.RecordBatchExecution(func() (int, int64, error) { ... }) to capture the error value and pass it to the repository's logging facility (e.g., processLogger or the sink's logger) using the repository error patterns described in docs/agents/error-handling.md; do not create ad-hoc error types—use the predefined error helpers/wrappers and include contextual text indicating the batch execution metrics and any upstream event info when logging.
97-102:⚠️ Potential issue | 🟠 Major | ⚡ Quick winBlocking
Get()insidedefaultprevents context-driven shutdown.With
ctx.Done()only checked between iterations, once execution falls intodefaultand blocks ins.eventCh.Get(), a context cancellation cannot unblock the loop until a new event arrives oreventChis closed. UseGetWithContext(ctx)(the idiomatic pattern already used incloudstorage/dml_writers.go) so cancellation is honored promptly.♻️ Proposed fix
func (s *sink) Run(ctx context.Context) error { for { - select { - case <-ctx.Done(): - return nil - default: - event, ok := s.eventCh.Get() - if !ok { - log.Info("blackhole sink event channel closed") - return nil - } - s.statistics.RecordBatchExecution(func() (int, int64, error) { - return int(event.Len()), event.GetSize(), nil - }) - event.PostFlush() - } + event, ok, err := s.eventCh.GetWithContext(ctx) + if err != nil { + return nil + } + if !ok { + log.Info("blackhole sink event channel closed") + return nil + } + if err := s.statistics.RecordBatchExecution(func() (int, int64, error) { + return int(event.Len()), event.GetSize(), nil + }); err != nil { + log.Warn("failed to record batch execution", zap.Error(err)) + } + event.PostFlush() } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/sink/blackhole/sink.go` around lines 97 - 102, The loop currently calls s.eventCh.Get() inside the default case which blocks and ignores ctx cancellation; replace that blocking call with the context-aware API used elsewhere by calling s.eventCh.GetWithContext(ctx) (matching the pattern in cloudstorage/dml_writers.go), check the returned (event, ok, err) semantics as appropriate, handle ok==false by logging and returning nil, and ensure any error from GetWithContext is handled or propagated so ctx.Done() can promptly stop the loop.
71-74:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winUnchecked error from
RecordDDLExecution(still flagged by errcheck).The error returned by
s.statistics.RecordDDLExecutionis silently discarded; pipeline errcheck reported this on line 72. Either capture and log it via the repository logger, or propagate it out ofWriteBlockEvent. As per coding guidelines, use predefined repository errors; see docs/agents/error-handling.md before changing error creation, wrapping, or propagation.🛡️ Proposed fix
ddlType := e.GetDDLType().String() - s.statistics.RecordDDLExecution(func() (string, error) { + if err := s.statistics.RecordDDLExecution(func() (string, error) { return ddlType, nil - }) + }); err != nil { + log.Warn("failed to record DDL execution", zap.Error(err)) + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/sink/blackhole/sink.go` around lines 71 - 74, Record the error returned by s.statistics.RecordDDLExecution instead of discarding it: in WriteBlockEvent check the error from s.statistics.RecordDDLExecution(…) and either (a) log it via the repository logger instance used in this package (e.g., s.logger or s.repoLogger) with a clear message, or (b) wrap/translate it into the appropriate predefined repository error and return it from WriteBlockEvent so callers can handle it; follow the project error-handling conventions when creating/wrapping the error.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@downstreamadapter/sink/blackhole/sink.go`:
- Around line 87-90: The Close method on sink races with a concurrently running
Run goroutine that may still be touching Prometheus metrics (e.g.,
RecordBatchExecution calling Inc/Observe/Add) before Close calls
statistics.Close/DeleteLabelValues; fix by adding explicit synchronization: add
a done channel or sync.WaitGroup field to the sink struct, have Run signal
completion (close(done) or wg.Done()) when it returns, and make Close wait for
that signal (select with ctx if needed) before closing eventCh and calling
statistics.Close/DeleteLabelValues; alternatively, ensure the dispatcher manager
always waits for Run to exit before calling sink.Close and document this
shutdown invariant in the relevant dispatcher code and sink comments,
referencing sink.Run, sink.Close, sink.eventCh and sink.statistics.
---
Duplicate comments:
In `@downstreamadapter/sink/blackhole/sink.go`:
- Around line 103-105: The call to s.statistics.RecordBatchExecution in sink.go
ignores the returned error; capture its error return and handle it (at minimum
log it) instead of dropping it. Update the invocation in the block that
currently calls s.statistics.RecordBatchExecution(func() (int, int64, error) {
... }) to capture the error value and pass it to the repository's logging
facility (e.g., processLogger or the sink's logger) using the repository error
patterns described in docs/agents/error-handling.md; do not create ad-hoc error
types—use the predefined error helpers/wrappers and include contextual text
indicating the batch execution metrics and any upstream event info when logging.
- Around line 97-102: The loop currently calls s.eventCh.Get() inside the
default case which blocks and ignores ctx cancellation; replace that blocking
call with the context-aware API used elsewhere by calling
s.eventCh.GetWithContext(ctx) (matching the pattern in
cloudstorage/dml_writers.go), check the returned (event, ok, err) semantics as
appropriate, handle ok==false by logging and returning nil, and ensure any error
from GetWithContext is handled or propagated so ctx.Done() can promptly stop the
loop.
- Around line 71-74: Record the error returned by
s.statistics.RecordDDLExecution instead of discarding it: in WriteBlockEvent
check the error from s.statistics.RecordDDLExecution(…) and either (a) log it
via the repository logger instance used in this package (e.g., s.logger or
s.repoLogger) with a clear message, or (b) wrap/translate it into the
appropriate predefined repository error and return it from WriteBlockEvent so
callers can handle it; follow the project error-handling conventions when
creating/wrapping the error.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 21e5551d-dbd7-49cb-b778-90ad5bc9def3
📒 Files selected for processing (1)
downstreamadapter/sink/blackhole/sink.go
| func (s *sink) Close() { | ||
| s.eventCh.Close() | ||
| s.statistics.Close() | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Inspect Statistics.Close semantics and check how the factory/lifecycle calls blackhole.Close() vs Run()
ast-grep --pattern $'func ($_ *Statistics) Close() {
$$$
}'
rg -nP --type=go -C5 '\bblackhole\.New\b|\bblackhole\.Sink\b|\bblackhole\b.*\.Close\(\)'
rg -nP --type=go -C5 'RecordBatchExecution\b'Repository: pingcap/ticdc
Length of output: 8844
🏁 Script executed:
# Get full RecordBatchExecution implementation to check for locks
sed -n '81,120p' pkg/metrics/statistics.go
# Check blackhole sink full Run and Close methods
sed -n '85,110p' downstreamadapter/sink/blackhole/sink.go
# Check blackhole sink struct definition and initialization
sed -n '1,50p' downstreamadapter/sink/blackhole/sink.go
# Search for how blackhole sink is used in the application
rg -n 'blackhole\.' --type=go -B3 -A3 | head -100Repository: pingcap/ticdc
Length of output: 6253
🏁 Script executed:
# Check if there's synchronization in statistics struct
rg -n 'type Statistics struct' -A20 pkg/metrics/statistics.go
# Check callers of sink.Close() to see if they coordinate with Run exit
rg -n '\.Close\(\)' --type=go downstreamadapter/sink/blackhole/ -B5 -A2Repository: pingcap/ticdc
Length of output: 2209
🏁 Script executed:
# Check the eventCh type and Get() behavior
rg -n 'eventCh.*Close\|eventCh.*Get' downstreamadapter/sink/blackhole/sink.go -B2 -A2Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Find calls to sink.Run() and sink.Close() to understand usage pattern
rg -n '\.Run\(|\.Close\(\)' --type=go downstreamadapter/sink/ -B3 -A3 | grep -E "(Run|Close|sink|defer)" | head -80Repository: pingcap/ticdc
Length of output: 7231
🏁 Script executed:
# Check if there's any WaitGroup or context cancellation coordination
rg -n 'WaitGroup|ctx.*Done|Done\(\)' --type=go downstreamadapter/sink/blackhole/ -B2 -A2Repository: pingcap/ticdc
Length of output: 341
🏁 Script executed:
# Check UnlimitedChannel implementation to understand Get() behavior after Close()
fd -name "*.go" -path "*/chann/*" --type fRepository: pingcap/ticdc
Length of output: 288
🏁 Script executed:
# Look for actual sink lifecycle management in manager/runner
rg -n 'blackhole\|sink\.' --type=go pkg/ | grep -E 'Run\(|Close\(|defer' | head -40Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Find where sinks are created and used, especially looking for Run() and Close() patterns
rg -n 'sink\.Run|sink\.Close' --type=go -B5 -A5 | head -150Repository: pingcap/ticdc
Length of output: 10899
🏁 Script executed:
# Check if there's a wrapper or manager that coordinates Run and Close
rg -n 'type.*Manager|type.*Runner' --type=go pkg/ downstreamadapter/ -A10 | head -100Repository: pingcap/ticdc
Length of output: 6793
🏁 Script executed:
# Look for context cancellation and goroutine coordination patterns in the codebase
rg -n 'errgroup\.Group|WaitGroup.*Run|defer.*Close' --type=go pkg/ | grep -i sink | head -30Repository: pingcap/ticdc
Length of output: 1928
🏁 Script executed:
# Check the UnlimitedChannel interface to understand Get() behavior after Close()
rg -n 'type UnlimitedChannel|func.*Get\(\)|func.*Close\(\)' --type=go utils/chann/ -A3Repository: pingcap/ticdc
Length of output: 1065
🏁 Script executed:
# Get full UnlimitedChannel implementation, especially Get/getWithContext
sed -n '61,100p' utils/chann/unlimited_chann.goRepository: pingcap/ticdc
Length of output: 1207
🏁 Script executed:
# Check the getWithContext method
rg -n 'getWithContext' utils/chann/unlimited_chann.go -A20Repository: pingcap/ticdc
Length of output: 1259
🏁 Script executed:
# Look at the dispatcher_manager shutdown sequence in detail
sed -n '920,960p' downstreamadapter/dispatchermanager/dispatcher_manager.goRepository: pingcap/ticdc
Length of output: 1235
Coordination between Close and concurrent Run is underspecified.
While the dispatcher manager does cancel the context before calling sink.Close(), it does not wait for the Run goroutine to exit before closing resources. This creates a window where Run could still be mid-RecordBatchExecution (executing Prometheus metric operations like Inc(), Observe(), Add()) when Close() invokes statistics.Close() and calls DeleteLabelValues() on those same metrics.
This works in practice because the Prometheus Go client library provides thread-safe metric operations, but the synchronization is implicit and fragile. To be explicit: callers should either (1) wait for Run to exit via context cancellation before invoking Close(), or (2) ensure Close() is only called after Run has returned. Confirm that the calling code (dispatcher manager) maintains this invariant, and consider documenting the expected shutdown sequence.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@downstreamadapter/sink/blackhole/sink.go` around lines 87 - 90, The Close
method on sink races with a concurrently running Run goroutine that may still be
touching Prometheus metrics (e.g., RecordBatchExecution calling Inc/Observe/Add)
before Close calls statistics.Close/DeleteLabelValues; fix by adding explicit
synchronization: add a done channel or sync.WaitGroup field to the sink struct,
have Run signal completion (close(done) or wg.Done()) when it returns, and make
Close wait for that signal (select with ctx if needed) before closing eventCh
and calling statistics.Close/DeleteLabelValues; alternatively, ensure the
dispatcher manager always waits for Run to exit before calling sink.Close and
document this shutdown invariant in the relevant dispatcher code and sink
comments, referencing sink.Run, sink.Close, sink.eventCh and sink.statistics.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: lidezhu The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
What problem does this PR solve?
Issue Number: ref #2594
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Chores
Tests