Skip to content

Commit 33eaa0b

Browse files
committed
Add SendWithContext helper to avoid deadlocks
1 parent ddc2ec1 commit 33eaa0b

6 files changed

Lines changed: 74 additions & 91 deletions

File tree

.github/CONTRIBUTING.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,24 @@ Here are a few things you can do that will increase the likelihood of your pull
1919
- Keep your change as focused as possible. If there are multiple changes you would like to make that are not dependent upon each other, consider submitting them as separate pull requests.
2020
- Write a [good commit message](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html).
2121

22+
## Development Guidelines
23+
24+
### Channel Safety
25+
26+
When working with channels in goroutines, it's critical to prevent deadlocks that can occur when a channel receiver exits due to an error while senders are still trying to send values. Always use `base.SendWithContext` for channel sends to avoid deadlocks:
27+
28+
```go
29+
// ✅ CORRECT - Uses helper to prevent deadlock
30+
if err := base.SendWithContext(ctx, ch, value); err != nil {
31+
return err // context was cancelled
32+
}
33+
34+
// ❌ WRONG - Can deadlock if receiver exits
35+
ch <- value
36+
```
37+
38+
Even if the destination channel is buffered, deadlocks could still occur if the buffer fills up and the receiver exits, so it's important to use `SendWithContext` in those cases as well.
39+
2240
## Resources
2341

2442
- [Contributing to Open Source on GitHub](https://guides.github.com/activities/contributing-to-open-source/)

go/base/context.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,3 +1027,24 @@ func (this *MigrationContext) CancelContext() {
10271027
this.cancelFunc()
10281028
}
10291029
}
1030+
1031+
// SendWithContext attempts to send a value to a channel, but returns early
1032+
// if the context is cancelled. This prevents goroutine deadlocks when the
1033+
// channel receiver has exited due to an error.
1034+
//
1035+
// Use this instead of bare channel sends (ch <- val) in goroutines to ensure
1036+
// proper cleanup when the migration is aborted.
1037+
//
1038+
// Example:
1039+
//
1040+
// if err := base.SendWithContext(ctx, ch, value); err != nil {
1041+
// return err // context was cancelled
1042+
// }
1043+
func SendWithContext[T any](ctx context.Context, ch chan<- T, val T) error {
1044+
select {
1045+
case ch <- val:
1046+
return nil
1047+
case <-ctx.Done():
1048+
return ctx.Err()
1049+
}
1050+
}

go/logic/applier.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -716,12 +716,8 @@ func (this *Applier) InitiateHeartbeat() {
716716
continue
717717
}
718718
if err := injectHeartbeat(); err != nil {
719-
select {
720-
case this.migrationContext.PanicAbort <- fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err):
721-
// Error sent successfully
722-
case <-this.migrationContext.GetContext().Done():
723-
// Context cancelled, someone else already reported an error
724-
}
719+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
720+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err))
725721
return
726722
}
727723
}

go/logic/migrator.go

Lines changed: 25 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,8 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
163163
// there's an error. Let's try again.
164164
}
165165
if len(notFatalHint) == 0 {
166-
select {
167-
case this.migrationContext.PanicAbort <- err:
168-
// Error sent successfully
169-
case <-this.migrationContext.GetContext().Done():
170-
// Context cancelled, someone else already reported an error
171-
}
166+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
167+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
172168
}
173169
return err
174170
}
@@ -196,12 +192,8 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro
196192
}
197193
}
198194
if len(notFatalHint) == 0 {
199-
select {
200-
case this.migrationContext.PanicAbort <- err:
201-
// Error sent successfully
202-
case <-this.migrationContext.GetContext().Done():
203-
// Context cancelled, someone else already reported an error
204-
}
195+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
196+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
205197
}
206198
return err
207199
}
@@ -210,24 +202,16 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro
210202
// consumes and drops any further incoming events that may be left hanging.
211203
func (this *Migrator) consumeRowCopyComplete() {
212204
if err := <-this.rowCopyComplete; err != nil {
213-
select {
214-
case this.migrationContext.PanicAbort <- err:
215-
// Error sent successfully
216-
case <-this.migrationContext.GetContext().Done():
217-
// Context cancelled, someone else already reported an error
218-
}
205+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
206+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
219207
}
220208
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
221209
this.migrationContext.MarkRowCopyEndTime()
222210
go func() {
223211
for err := range this.rowCopyComplete {
224212
if err != nil {
225-
select {
226-
case this.migrationContext.PanicAbort <- err:
227-
// Error sent successfully
228-
case <-this.migrationContext.GetContext().Done():
229-
// Context cancelled, someone else already reported an error
230-
}
213+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
214+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
231215
}
232216
}
233217
}()
@@ -258,32 +242,23 @@ func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e
258242
case Migrated, ReadMigrationRangeValues:
259243
// no-op event
260244
case GhostTableMigrated:
261-
select {
262-
case this.ghostTableMigrated <- true:
263-
// Successfully sent
264-
case <-this.migrationContext.GetContext().Done():
265-
// Context cancelled, migration is aborting
266-
}
245+
// Use helper to prevent deadlock if migration aborts before receiver is ready
246+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.ghostTableMigrated, true)
267247
case AllEventsUpToLockProcessed:
268248
var applyEventFunc tableWriteFunc = func() error {
269-
this.allEventsUpToLockProcessed <- &lockProcessedStruct{
249+
return base.SendWithContext(this.migrationContext.GetContext(), this.allEventsUpToLockProcessed, &lockProcessedStruct{
270250
state: changelogStateString,
271251
coords: dmlEntry.Coordinates.Clone(),
272-
}
273-
return nil
252+
})
274253
}
275254
// at this point we know all events up to lock have been read from the streamer,
276255
// because the streamer works sequentially. So those events are either already handled,
277256
// or have event functions in applyEventsQueue.
278257
// So as not to create a potential deadlock, we write this func to applyEventsQueue
279258
// asynchronously, understanding it doesn't really matter.
280259
go func() {
281-
select {
282-
case this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc):
283-
// Successfully enqueued
284-
case <-this.migrationContext.GetContext().Done():
285-
// Context cancelled, migration is aborting
286-
}
260+
// Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits
261+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.applyEventsQueue, newApplyEventStructByFunc(&applyEventFunc))
287262
}()
288263
default:
289264
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
@@ -1387,12 +1362,8 @@ func (this *Migrator) initiateStreaming() error {
13871362
this.migrationContext.Log.Debugf("Beginning streaming")
13881363
err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
13891364
if err != nil {
1390-
select {
1391-
case this.migrationContext.PanicAbort <- err:
1392-
// Error sent successfully
1393-
case <-this.migrationContext.GetContext().Done():
1394-
// Context cancelled, someone else already reported an error
1395-
}
1365+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
1366+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
13961367
}
13971368
this.migrationContext.Log.Debugf("Done streaming")
13981369
}()
@@ -1418,14 +1389,9 @@ func (this *Migrator) addDMLEventsListener() error {
14181389
this.migrationContext.DatabaseName,
14191390
this.migrationContext.OriginalTableName,
14201391
func(dmlEntry *binlog.BinlogEntry) error {
1421-
select {
1422-
case this.applyEventsQueue <- newApplyEventStructByDML(dmlEntry):
1423-
// Successfully enqueued
1424-
return nil
1425-
case <-this.migrationContext.GetContext().Done():
1426-
// Context cancelled, stop processing events
1427-
return this.migrationContext.GetContext().Err()
1428-
}
1392+
// Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits
1393+
// This is critical because this callback blocks the event streamer
1394+
return base.SendWithContext(this.migrationContext.GetContext(), this.applyEventsQueue, newApplyEventStructByDML(dmlEntry))
14291395
},
14301396
)
14311397
return err
@@ -1503,7 +1469,7 @@ func (this *Migrator) initiateApplier() error {
15031469
// a chunk of rows onto the ghost table.
15041470
func (this *Migrator) iterateChunks() error {
15051471
terminateRowIteration := func(err error) error {
1506-
this.rowCopyComplete <- err
1472+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.rowCopyComplete, err)
15071473
return this.migrationContext.Log.Errore(err)
15081474
}
15091475
if this.migrationContext.Noop {
@@ -1590,15 +1556,13 @@ func (this *Migrator) iterateChunks() error {
15901556
return nil
15911557
}
15921558
// Enqueue copy operation; to be executed by executeWriteFuncs()
1593-
select {
1594-
case this.copyRowsQueue <- copyRowsFunc:
1595-
// Successfully enqueued
1596-
case <-this.migrationContext.GetContext().Done():
1559+
// Use helper to prevent deadlock if executeWriteFuncs exits
1560+
if err := base.SendWithContext(this.migrationContext.GetContext(), this.copyRowsQueue, copyRowsFunc); err != nil {
15971561
// Context cancelled, check for abort and exit
1598-
if err := this.checkAbort(); err != nil {
1599-
return terminateRowIteration(err)
1562+
if abortErr := this.checkAbort(); abortErr != nil {
1563+
return terminateRowIteration(abortErr)
16001564
}
1601-
return terminateRowIteration(this.migrationContext.GetContext().Err())
1565+
return terminateRowIteration(err)
16021566
}
16031567
}
16041568
}

go/logic/server.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -450,12 +450,8 @@ help # This message
450450
return NoPrintStatusRule, err
451451
}
452452
err := fmt.Errorf("User commanded 'panic'. The migration will be aborted without cleanup. Please drop the gh-ost tables before trying again.")
453-
select {
454-
case this.migrationContext.PanicAbort <- err:
455-
// Error sent successfully
456-
case <-this.migrationContext.GetContext().Done():
457-
// Context cancelled, someone else already reported an error
458-
}
453+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
454+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
459455
return NoPrintStatusRule, err
460456
}
461457
default:

go/logic/throttler.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -362,12 +362,8 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
362362
// Regardless of throttle, we take opportunity to check for panic-abort
363363
if this.migrationContext.PanicFlagFile != "" {
364364
if base.FileExists(this.migrationContext.PanicFlagFile) {
365-
select {
366-
case this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile):
367-
// Error sent successfully
368-
case <-this.migrationContext.GetContext().Done():
369-
// Context cancelled, someone else already reported an error
370-
}
365+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
366+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile))
371367
}
372368
}
373369

@@ -390,25 +386,17 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
390386
}
391387

392388
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
393-
select {
394-
case this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold):
395-
// Error sent successfully
396-
case <-this.migrationContext.GetContext().Done():
397-
// Context cancelled, someone else already reported an error
398-
}
389+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
390+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold))
399391
}
400392
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 {
401393
this.migrationContext.Log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds)
402394
go func() {
403395
timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds))
404396
<-timer.C
405397
if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain {
406-
select {
407-
case this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold):
408-
// Error sent successfully
409-
case <-this.migrationContext.GetContext().Done():
410-
// Context cancelled, someone else already reported an error
411-
}
398+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
399+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold))
412400
}
413401
}()
414402
}

0 commit comments

Comments
 (0)