From 574d5ba5bb27650b3e98c1f03bd090fb1d55a9ca Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 7 Sep 2025 16:44:35 +0100 Subject: [PATCH 1/6] Remove async code from the Drop handler --- crates/tower-runtime/src/local.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index a9da7dfb..62fbef56 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -17,6 +17,7 @@ use tokio::{ fs, io::{AsyncRead, BufReader, AsyncBufReadExt}, process::{Child, Command}, + runtime::Handle, sync::{ Mutex, oneshot::{ @@ -49,7 +50,7 @@ pub struct LocalApp { waiter: Mutex>, // terminator is what we use to flag that we want to terminate the child process. - terminator: Mutex, + terminator: CancellationToken, // execute_handle keeps track of the current state of the execution lifecycle. execute_handle: Option>>, @@ -271,20 +272,28 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender, cancel_ impl Drop for LocalApp { fn drop(&mut self) { - // We want to ensure that we cancel the process if it is still running. - let _ = self.terminate(); + // CancellationToken::cancel() is not async + self.terminator.cancel(); + + // Optionally spawn a task to wait for the handle + if let Some(execute_handle) = self.execute_handle.take() { + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + let _ = execute_handle.await; + }); + } + } } } impl App for LocalApp { async fn start(opts: StartOptions) -> Result { - let cancel_token = CancellationToken::new(); - let terminator = Mutex::new(cancel_token.clone()); + let terminator = CancellationToken::new(); let (sx, rx) = oneshot::channel::(); let waiter = Mutex::new(rx); - let handle = tokio::spawn(execute_local_app(opts, sx, cancel_token)); + let handle = tokio::spawn(execute_local_app(opts, sx, terminator.clone())); let execute_handle = Some(handle); Ok(Self { @@ -323,8 +332,7 @@ impl App for LocalApp { } async fn terminate(&mut self) -> Result<(), Error> { - let terminator = self.terminator.lock().await; - terminator.cancel(); + self.terminator.cancel(); // Now we should wait for the join handle to finish. if let Some(execute_handle) = self.execute_handle.take() { From 9ea11603c25f679a7f137bb01e0cf2032180cb5b Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 7 Sep 2025 17:46:48 +0100 Subject: [PATCH 2/6] feat: More graceful process shutdown to be gentle to uv --- Cargo.lock | 1 + Cargo.toml | 1 + crates/tower-runtime/Cargo.toml | 3 ++- crates/tower-runtime/src/local.rs | 37 ++++++++++++++++++++++++++++++- crates/tower-uv/src/lib.rs | 3 +++ 5 files changed, 43 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9ca53dd..fb02b52a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2993,6 +2993,7 @@ version = "0.3.27" dependencies = [ "chrono", "config", + "nix 0.30.1", "snafu", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index 5d636161..be9648fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ futures-lite = "2.6" glob = "0.3" http = "1.1" indicatif = "0.17" +nix = { version = "0.30", features = ["signal"] } pem = "3" promptly = "0.3" rand = "0.8" diff --git a/crates/tower-runtime/Cargo.toml b/crates/tower-runtime/Cargo.toml index 7bed0858..8ab42fb5 100644 --- a/crates/tower-runtime/Cargo.toml +++ b/crates/tower-runtime/Cargo.toml @@ -8,9 +8,10 @@ license = { workspace = true } [dependencies] chrono = { workspace = true } +nix = { workspace = true } +snafu = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } -snafu = { workspace = true } tower-package = { workspace = true } tower-telemetry = { workspace = true } tower-uv = { workspace = true } diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 62fbef56..11943efb 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -429,11 +429,46 @@ fn make_env_vars(ctx: &tower_telemetry::Context, env: &str, cwd: &PathBuf, secs: res } +#[cfg(unix)] +async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) { + let pid = child.id().unwrap(); + + // We first send a SIGTERM to ensure that the child processes are terminated. Using SIGKILL + // (default behavior in Child::kill) can leave orphaned processes behind. + nix::sys::signal::killpg( + nix::unistd::Pid::from_raw(pid as i32), + nix::sys::signal::Signal::SIGTERM + ).ok(); + + // If it doesn't die after 5 seconds then we'll forcefullt kill it. + let timeout = tokio::time::timeout( + Duration::from_secs(5), + + // Wait for the child process to finish. + child.wait() + ).await; + + if timeout.is_err() { + nix::sys::signal::killpg( + nix::unistd::Pid::from_raw(pid as i32), + nix::sys::signal::Signal::SIGKILL + ).ok(); + } +} + +#[cfg(not(unix))] +async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) { + match child.kill().await { + Ok(_) => debug!(ctx: &ctx, "child process killed successfully"), + Err(e) => debug!(ctx: &ctx, "failed to kill child process: {}", e), + }; +} + async fn wait_for_process(ctx: tower_telemetry::Context, cancel_token: &CancellationToken, mut child: Child) -> i32 { let code = loop { if cancel_token.is_cancelled() { debug!(ctx: &ctx, "process cancelled, terminating child process"); - let _ = child.kill().await; + kill_child_process(&ctx, child).await; break -1; // return -1 to indicate that the process was cancelled. } diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 878faecd..13314b80 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -106,6 +106,7 @@ impl Uv { .arg("--no-progress") .arg("sync") .envs(env_vars) + .process_group(0) .spawn()?; Ok(child) @@ -126,6 +127,7 @@ impl Uv { .arg("-r") .arg(cwd.join("requirements.txt")) .envs(env_vars) + .process_group(0) .spawn()?; Ok(child) @@ -152,6 +154,7 @@ impl Uv { .arg("run") .arg(program) .envs(env_vars) + .process_group(0) .spawn()?; Ok(child) From 816b2041ee9da7731423c2ef02e0d1b670ce2a9d Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 8 Sep 2025 08:21:39 +0100 Subject: [PATCH 3/6] chore: Refactor kill_child_process based on @copilot feedback --- crates/tower-runtime/src/local.rs | 45 ++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 11943efb..5d6598bf 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -29,10 +29,18 @@ use tokio::{ time::{timeout, Duration}, }; +use nix::{ + unistd::Pid, + sys::signal::{ + Signal, + killpg, + }, +}; + use tokio_util::sync::CancellationToken; use tower_package::{Manifest, Package}; -use tower_telemetry::debug; +use tower_telemetry::{debug, error}; use tower_uv::Uv; use crate::{ @@ -431,27 +439,38 @@ fn make_env_vars(ctx: &tower_telemetry::Context, env: &str, cwd: &PathBuf, secs: #[cfg(unix)] async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) { - let pid = child.id().unwrap(); + let pid = match child.id() { + Some(pid) => pid, + None => { + // We didn't get anything, so we can't do anything. Let's just exist with a debug + // message. + error!(ctx: &ctx, "child process has no pid, cannot kill"); + return; + } + }; + + // This is the actual converted pid. + let pid = Pid::from_raw(pid as i32); // We first send a SIGTERM to ensure that the child processes are terminated. Using SIGKILL // (default behavior in Child::kill) can leave orphaned processes behind. - nix::sys::signal::killpg( - nix::unistd::Pid::from_raw(pid as i32), - nix::sys::signal::Signal::SIGTERM + killpg( + pid, + Signal::SIGTERM ).ok(); - // If it doesn't die after 5 seconds then we'll forcefullt kill it. - let timeout = tokio::time::timeout( - Duration::from_secs(5), - - // Wait for the child process to finish. + // If it doesn't die after 2 seconds then we'll forcefully kill it. This timeout should be less + // than the overall timeout for the process (which should likely live on the context as a + // deadline). + let timeout = timeout( + Duration::from_secs(2), child.wait() ).await; if timeout.is_err() { - nix::sys::signal::killpg( - nix::unistd::Pid::from_raw(pid as i32), - nix::sys::signal::Signal::SIGKILL + killpg( + pid, + Signal::SIGKILL ).ok(); } } From 94946c40db2fecbeeb100200f540f3e5a8e52db5 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 8 Sep 2025 08:27:09 +0100 Subject: [PATCH 4/6] chore: Only use process_group on unix systems --- crates/tower-uv/src/lib.rs | 44 +++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 13314b80..e86cf65f 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -95,8 +95,8 @@ impl Uv { // that's easy. if cwd.join("pyproject.toml").exists() { debug!("Executing UV ({:?}) sync in {:?}", &self.uv_path, cwd); - let child = Command::new(&self.uv_path) - .kill_on_drop(true) + let mut cmd = Command::new(&self.uv_path); + cmd.kill_on_drop(true) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -105,17 +105,22 @@ impl Uv { .arg("never") .arg("--no-progress") .arg("sync") - .envs(env_vars) - .process_group(0) - .spawn()?; + .envs(env_vars); + + #[cfg(unix)] + { + cmd.process_group(0); + } + + let child = cmd.spawn()?; Ok(child) } else if cwd.join("requirements.txt").exists() { debug!("Executing UV ({:?}) sync with requirements in {:?}", &self.uv_path, cwd); // If there is a requirements.txt, then we can use that to sync. - let child = Command::new(&self.uv_path) - .kill_on_drop(true) + let mut cmd = Command::new(&self.uv_path); + cmd.kill_on_drop(true) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -126,9 +131,14 @@ impl Uv { .arg("install") .arg("-r") .arg(cwd.join("requirements.txt")) - .envs(env_vars) - .process_group(0) - .spawn()?; + .envs(env_vars); + + #[cfg(unix)] + { + cmd.process_group(0); + } + + let child = cmd.spawn()?; Ok(child) } else { @@ -142,8 +152,8 @@ impl Uv { pub async fn run(&self, cwd: &PathBuf, program: &PathBuf, env_vars: &HashMap) -> Result { debug!("Executing UV ({:?}) run {:?} in {:?}", &self.uv_path, program, cwd); - let child = Command::new(&self.uv_path) - .kill_on_drop(true) + let mut cmd = Command::new(&self.uv_path); + cmd.kill_on_drop(true) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -153,10 +163,14 @@ impl Uv { .arg("--no-progress") .arg("run") .arg(program) - .envs(env_vars) - .process_group(0) - .spawn()?; + .envs(env_vars); + + #[cfg(unix)] + { + cmd.process_group(0); + } + let child = cmd.spawn()?; Ok(child) } From 0b6daa5d163fe983c79f58aa489ba018ab7e8438 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 8 Sep 2025 09:27:31 +0100 Subject: [PATCH 5/6] chore: Only import the nix::* content in Unix land --- crates/tower-runtime/src/local.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 5d6598bf..79a1cb77 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -29,6 +29,7 @@ use tokio::{ time::{timeout, Duration}, }; +#[cfg(unix)] use nix::{ unistd::Pid, sys::signal::{ From 3e66c7af1bc0b79baf55003696df2d83647fa9ea Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 8 Sep 2025 13:57:33 +0100 Subject: [PATCH 6/6] Update crates/tower-runtime/src/local.rs Co-authored-by: Konstantinos St --- crates/tower-runtime/src/local.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 79a1cb77..82121f31 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -443,7 +443,7 @@ async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) { let pid = match child.id() { Some(pid) => pid, None => { - // We didn't get anything, so we can't do anything. Let's just exist with a debug + // We didn't get anything, so we can't do anything. Let's just exit with a debug // message. error!(ctx: &ctx, "child process has no pid, cannot kill"); return;