-
Notifications
You must be signed in to change notification settings - Fork 2
Don't cancel run context on stdin EOF #296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -100,6 +100,7 @@ func (app *application) runRPC(device, command string, cmdArgs []string) error { | |||||||||||||||||||||
| defer cancel() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| errChan := make(chan error, numWorkers) | ||||||||||||||||||||||
| recvDone := make(chan struct{}) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| stream := app.rpcClient.Run(runCtx) | ||||||||||||||||||||||
| req := &pb.RunRequest{ | ||||||||||||||||||||||
|
|
@@ -127,6 +128,7 @@ func (app *application) runRPC(device, command string, cmdArgs []string) error { | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Receive routine | ||||||||||||||||||||||
| go func() { | ||||||||||||||||||||||
| defer close(recvDone) | ||||||||||||||||||||||
| defer cancel() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| for { | ||||||||||||||||||||||
|
|
@@ -226,10 +228,9 @@ func (app *application) runRPC(device, command string, cmdArgs []string) error { | |||||||||||||||||||||
| } | ||||||||||||||||||||||
| }() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Send routine | ||||||||||||||||||||||
| // Send routine — stdin EOF must not cancel the context; only the receive | ||||||||||||||||||||||
| // routine drives termination so that all server responses are processed. | ||||||||||||||||||||||
| go func() { | ||||||||||||||||||||||
| defer cancel() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| reader := bufio.NewReader(app.stdin) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| for { | ||||||||||||||||||||||
|
|
@@ -267,9 +268,10 @@ func (app *application) runRPC(device, command string, cmdArgs []string) error { | |||||||||||||||||||||
| } | ||||||||||||||||||||||
| }() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Wait for completion or error | ||||||||||||||||||||||
| // Wait for the receive routine to finish (stream closed or cancelled) or for | ||||||||||||||||||||||
| // any worker to report an error. | ||||||||||||||||||||||
| select { | ||||||||||||||||||||||
| case <-runCtx.Done(): | ||||||||||||||||||||||
| case <-recvDone: | ||||||||||||||||||||||
|
||||||||||||||||||||||
| case <-recvDone: | |
| case <-recvDone: | |
| // Prefer any pending worker error over a silent success. | |
| select { | |
| case err := <-errChan: | |
| if err != nil { | |
| return err | |
| } | |
| default: | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that stdin EOF no longer cancels the context (good), the send routine still exits on
io.EOFwithout half-closing the client->server request stream. Server-side code treats client stream closure (io.EOFon Receive) as the signal that no more requests are coming; consider explicitly closing the request side (e.g.,stream.CloseRequest()/equivalent forconnect.BidiStreamForClient) when stdin reaches EOF so the server can stop waiting for further client messages without relying on full context cancellation.