From 4a5f3bea27c3641665c24564c5f81cd28d211046 Mon Sep 17 00:00:00 2001 From: pulkitvats2007-crypto Date: Sat, 2 May 2026 23:53:53 +0530 Subject: [PATCH] fix: resolve zombie process leak on stage cancellation Signed-off-by: pulkitvats2007-crypto --- .../piped/executor/customsync/customsync.go | 10 ++- pkg/app/piped/executor/customsync/rollback.go | 7 +- pkg/app/piped/executor/kubernetes/rollback.go | 3 +- .../scriptrun/execute_command_test.go | 73 +++++++++++++++++++ pkg/app/piped/executor/scriptrun/scriptrun.go | 10 ++- 5 files changed, 93 insertions(+), 10 deletions(-) create mode 100644 pkg/app/piped/executor/scriptrun/execute_command_test.go diff --git a/pkg/app/piped/executor/customsync/customsync.go b/pkg/app/piped/executor/customsync/customsync.go index a3906e033a..348b15d5d1 100644 --- a/pkg/app/piped/executor/customsync/customsync.go +++ b/pkg/app/piped/executor/customsync/customsync.go @@ -15,6 +15,7 @@ package customsync import ( + "context" "os" "os/exec" "strings" @@ -64,10 +65,12 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus { e.appDir = ds.AppDir timeout := e.StageConfig.CustomSyncOptions.Timeout.Duration() + ctx, cancel := context.WithCancel(sig.Context()) + defer cancel() c := make(chan model.StageStatus, 1) go func() { - c <- e.executeCommand() + c <- e.executeCommand(ctx) }() timer := time.NewTimer(timeout) @@ -94,7 +97,7 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus { } } -func (e *deployExecutor) executeCommand() model.StageStatus { +func (e *deployExecutor) executeCommand(ctx context.Context) model.StageStatus { opts := e.StageConfig.CustomSyncOptions e.LogPersister.Infof("Runnnig commands...") @@ -109,7 +112,8 @@ func (e *deployExecutor) executeCommand() model.StageStatus { envs = append(envs, key+"="+value) } - cmd := exec.Command("/bin/sh", "-l", "-c", opts.Run) + cmd := exec.CommandContext(ctx, "/bin/sh", "-l", "-c", opts.Run) + cmd.WaitDelay = time.Second cmd.Dir = e.appDir cmd.Env = append(os.Environ(), envs...) cmd.Stdout = e.LogPersister diff --git a/pkg/app/piped/executor/customsync/rollback.go b/pkg/app/piped/executor/customsync/rollback.go index 4e27fc0940..cca33a426e 100644 --- a/pkg/app/piped/executor/customsync/rollback.go +++ b/pkg/app/piped/executor/customsync/rollback.go @@ -75,10 +75,10 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus } e.LogPersister.Infof("Start rollback for custom sync") - return e.executeCommand(runningDS.GenericApplicationConfig.Pipeline.Stages[0]) + return e.executeCommand(ctx, runningDS.GenericApplicationConfig.Pipeline.Stages[0]) } -func (e *rollbackExecutor) executeCommand(config config.PipelineStage) model.StageStatus { +func (e *rollbackExecutor) executeCommand(ctx context.Context, config config.PipelineStage) model.StageStatus { opts := config.CustomSyncOptions e.LogPersister.Infof("Runnnig commands...") @@ -93,7 +93,8 @@ func (e *rollbackExecutor) executeCommand(config config.PipelineStage) model.Sta envs = append(envs, key+"="+value) } - cmd := exec.Command("/bin/sh", "-l", "-c", opts.Run) + cmd := exec.CommandContext(ctx, "/bin/sh", "-l", "-c", opts.Run) + cmd.WaitDelay = time.Second cmd.Dir = e.appDir cmd.Env = append(os.Environ(), envs...) cmd.Stdout = e.LogPersister diff --git a/pkg/app/piped/executor/kubernetes/rollback.go b/pkg/app/piped/executor/kubernetes/rollback.go index 5317a4058a..5b925b1e97 100644 --- a/pkg/app/piped/executor/kubernetes/rollback.go +++ b/pkg/app/piped/executor/kubernetes/rollback.go @@ -230,7 +230,8 @@ func (e *rollbackExecutor) ensureScriptRunRollback(ctx context.Context) model.St envs = append(envs, key+"="+value) } - cmd := exec.Command("/bin/sh", "-l", "-c", onRollback) + cmd := exec.CommandContext(ctx, "/bin/sh", "-l", "-c", onRollback) + cmd.WaitDelay = time.Second cmd.Dir = e.appDir cmd.Env = append(os.Environ(), envs...) cmd.Stdout = e.LogPersister diff --git a/pkg/app/piped/executor/scriptrun/execute_command_test.go b/pkg/app/piped/executor/scriptrun/execute_command_test.go new file mode 100644 index 0000000000..df73b9607e --- /dev/null +++ b/pkg/app/piped/executor/scriptrun/execute_command_test.go @@ -0,0 +1,73 @@ +package scriptrun + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/pipe-cd/pipecd/pkg/app/piped/executor" + "github.com/pipe-cd/pipecd/pkg/config" + "github.com/pipe-cd/pipecd/pkg/model" +) + +type fakeLogPersister struct{} + +func (l *fakeLogPersister) Write(b []byte) (int, error) { fmt.Println(string(b)); return len(b), nil } +func (l *fakeLogPersister) Info(s string) { fmt.Println("INFO:", s) } +func (l *fakeLogPersister) Infof(s string, args ...interface{}) { fmt.Printf("INFO: "+s+"\n", args...) } +func (l *fakeLogPersister) Success(s string) { fmt.Println("SUCCESS:", s) } +func (l *fakeLogPersister) Successf(s string, args ...interface{}) { fmt.Printf("SUCCESS: "+s+"\n", args...) } +func (l *fakeLogPersister) Error(s string) { fmt.Println("ERROR:", s) } +func (l *fakeLogPersister) Errorf(s string, args ...interface{}) { fmt.Printf("ERROR: "+s+"\n", args...) } + +func TestExecuteCommandCancellation(t *testing.T) { + // Ensure that executeCommand correctly stops when its context is canceled. + e := &Executor{ + Input: executor.Input{ + Deployment: &model.Deployment{ + Id: "deploy-1", + Trigger: &model.DeploymentTrigger{ + Commit: &model.Commit{ + Hash: "hash-1", + }, + }, + GitPath: &model.ApplicationGitPath{ + Repo: &model.ApplicationGitRepository{ + Id: "repo-1", + Remote: "repo-url", + }, + }, + }, + LogPersister: &fakeLogPersister{}, + StageConfig: config.PipelineStage{ + ScriptRunStageOptions: &config.ScriptRunStageOptions{ + Run: "exec sleep 10", + }, + }, + }, + appDir: os.TempDir(), + } + + ctx, cancel := context.WithCancel(context.Background()) + + // Run command in a goroutine + done := make(chan model.StageStatus) + go func() { + done <- e.executeCommand(ctx) + }() + + // Cancel almost immediately + time.Sleep(100 * time.Millisecond) + cancel() + + select { + case status := <-done: + assert.Equal(t, model.StageStatus_STAGE_FAILURE, status, "Expected command to fail on cancellation") + case <-time.After(2 * time.Second): + t.Fatal("executeCommand did not return after context cancellation") + } +} diff --git a/pkg/app/piped/executor/scriptrun/scriptrun.go b/pkg/app/piped/executor/scriptrun/scriptrun.go index 24a7aa5913..61838dda2a 100644 --- a/pkg/app/piped/executor/scriptrun/scriptrun.go +++ b/pkg/app/piped/executor/scriptrun/scriptrun.go @@ -15,6 +15,7 @@ package scriptrun import ( + "context" "encoding/json" "os" "os/exec" @@ -59,10 +60,12 @@ func (e *Executor) Execute(sig executor.StopSignal) model.StageStatus { e.appDir = ds.AppDir timeout := e.StageConfig.ScriptRunStageOptions.Timeout.Duration() + ctx, cancel := context.WithCancel(sig.Context()) + defer cancel() c := make(chan model.StageStatus, 1) go func() { - c <- e.executeCommand() + c <- e.executeCommand(ctx) }() timer := time.NewTimer(timeout) @@ -92,7 +95,7 @@ func (e *Executor) Execute(sig executor.StopSignal) model.StageStatus { } } -func (e *Executor) executeCommand() model.StageStatus { +func (e *Executor) executeCommand(ctx context.Context) model.StageStatus { opts := e.StageConfig.ScriptRunStageOptions e.LogPersister.Infof("Runnnig commands...") @@ -118,7 +121,8 @@ func (e *Executor) executeCommand() model.StageStatus { envs = append(envs, key+"="+value) } - cmd := exec.Command("/bin/sh", "-l", "-c", opts.Run) + cmd := exec.CommandContext(ctx, "/bin/sh", "-l", "-c", opts.Run) + cmd.WaitDelay = time.Second cmd.Dir = e.appDir cmd.Env = append(os.Environ(), envs...) cmd.Stdout = e.LogPersister