From e38f31a6362d8c3b6d86dec4ec9ae9cf86ed8284 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 8 Sep 2025 14:44:26 +0100 Subject: [PATCH 1/3] Remove async code from the Drop handler and make cancellation more gentle for `uv` (#90) * Remove async code from the Drop handler * feat: More graceful process shutdown to be gentle to uv * chore: Refactor kill_child_process based on @copilot feedback * chore: Only use process_group on unix systems * chore: Only import the nix::* content in Unix land * Update crates/tower-runtime/src/local.rs Co-authored-by: Konstantinos St --------- Co-authored-by: Konstantinos St --- Cargo.lock | 1 + Cargo.toml | 1 + crates/tower-runtime/Cargo.toml | 3 +- crates/tower-runtime/src/local.rs | 83 +++++++++++++++++++++++++++---- crates/tower-uv/src/lib.rs | 41 ++++++++++----- 5 files changed, 106 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9ca53dd..fb02b52a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2993,6 +2993,7 @@ version = "0.3.27" dependencies = [ "chrono", "config", + "nix 0.30.1", "snafu", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index 5d636161..be9648fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ futures-lite = "2.6" glob = "0.3" http = "1.1" indicatif = "0.17" +nix = { version = "0.30", features = ["signal"] } pem = "3" promptly = "0.3" rand = "0.8" diff --git a/crates/tower-runtime/Cargo.toml b/crates/tower-runtime/Cargo.toml index 7bed0858..8ab42fb5 100644 --- a/crates/tower-runtime/Cargo.toml +++ b/crates/tower-runtime/Cargo.toml @@ -8,9 +8,10 @@ license = { workspace = true } [dependencies] chrono = { workspace = true } +nix = { workspace = true } +snafu = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } -snafu = { workspace = true } tower-package = { workspace = true } tower-telemetry = { workspace = true } tower-uv = { workspace = true } diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index a9da7dfb..82121f31 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -17,6 +17,7 @@ use tokio::{ fs, io::{AsyncRead, BufReader, AsyncBufReadExt}, process::{Child, Command}, + runtime::Handle, sync::{ Mutex, oneshot::{ @@ -28,10 +29,19 @@ use tokio::{ time::{timeout, Duration}, }; +#[cfg(unix)] +use nix::{ + unistd::Pid, + sys::signal::{ + Signal, + killpg, + }, +}; + use tokio_util::sync::CancellationToken; use tower_package::{Manifest, Package}; -use tower_telemetry::debug; +use tower_telemetry::{debug, error}; use tower_uv::Uv; use crate::{ @@ -49,7 +59,7 @@ pub struct LocalApp { waiter: Mutex>, // terminator is what we use to flag that we want to terminate the child process. - terminator: Mutex, + terminator: CancellationToken, // execute_handle keeps track of the current state of the execution lifecycle. execute_handle: Option>>, @@ -271,20 +281,28 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender, cancel_ impl Drop for LocalApp { fn drop(&mut self) { - // We want to ensure that we cancel the process if it is still running. - let _ = self.terminate(); + // CancellationToken::cancel() is not async + self.terminator.cancel(); + + // Optionally spawn a task to wait for the handle + if let Some(execute_handle) = self.execute_handle.take() { + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + let _ = execute_handle.await; + }); + } + } } } impl App for LocalApp { async fn start(opts: StartOptions) -> Result { - let cancel_token = CancellationToken::new(); - let terminator = Mutex::new(cancel_token.clone()); + let terminator = CancellationToken::new(); let (sx, rx) = oneshot::channel::(); let waiter = Mutex::new(rx); - let handle = tokio::spawn(execute_local_app(opts, sx, cancel_token)); + let handle = tokio::spawn(execute_local_app(opts, sx, terminator.clone())); let execute_handle = Some(handle); Ok(Self { @@ -323,8 +341,7 @@ impl App for LocalApp { } async fn terminate(&mut self) -> Result<(), Error> { - let terminator = self.terminator.lock().await; - terminator.cancel(); + self.terminator.cancel(); // Now we should wait for the join handle to finish. if let Some(execute_handle) = self.execute_handle.take() { @@ -421,11 +438,57 @@ fn make_env_vars(ctx: &tower_telemetry::Context, env: &str, cwd: &PathBuf, secs: res } +#[cfg(unix)] +async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) { + let pid = match child.id() { + Some(pid) => pid, + None => { + // We didn't get anything, so we can't do anything. Let's just exit with a debug + // message. + error!(ctx: &ctx, "child process has no pid, cannot kill"); + return; + } + }; + + // This is the actual converted pid. + let pid = Pid::from_raw(pid as i32); + + // We first send a SIGTERM to ensure that the child processes are terminated. Using SIGKILL + // (default behavior in Child::kill) can leave orphaned processes behind. + killpg( + pid, + Signal::SIGTERM + ).ok(); + + // If it doesn't die after 2 seconds then we'll forcefully kill it. This timeout should be less + // than the overall timeout for the process (which should likely live on the context as a + // deadline). + let timeout = timeout( + Duration::from_secs(2), + child.wait() + ).await; + + if timeout.is_err() { + killpg( + pid, + Signal::SIGKILL + ).ok(); + } +} + +#[cfg(not(unix))] +async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) { + match child.kill().await { + Ok(_) => debug!(ctx: &ctx, "child process killed successfully"), + Err(e) => debug!(ctx: &ctx, "failed to kill child process: {}", e), + }; +} + async fn wait_for_process(ctx: tower_telemetry::Context, cancel_token: &CancellationToken, mut child: Child) -> i32 { let code = loop { if cancel_token.is_cancelled() { debug!(ctx: &ctx, "process cancelled, terminating child process"); - let _ = child.kill().await; + kill_child_process(&ctx, child).await; break -1; // return -1 to indicate that the process was cancelled. } diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 878faecd..e86cf65f 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -95,8 +95,8 @@ impl Uv { // that's easy. if cwd.join("pyproject.toml").exists() { debug!("Executing UV ({:?}) sync in {:?}", &self.uv_path, cwd); - let child = Command::new(&self.uv_path) - .kill_on_drop(true) + let mut cmd = Command::new(&self.uv_path); + cmd.kill_on_drop(true) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -105,16 +105,22 @@ impl Uv { .arg("never") .arg("--no-progress") .arg("sync") - .envs(env_vars) - .spawn()?; + .envs(env_vars); + + #[cfg(unix)] + { + cmd.process_group(0); + } + + let child = cmd.spawn()?; Ok(child) } else if cwd.join("requirements.txt").exists() { debug!("Executing UV ({:?}) sync with requirements in {:?}", &self.uv_path, cwd); // If there is a requirements.txt, then we can use that to sync. - let child = Command::new(&self.uv_path) - .kill_on_drop(true) + let mut cmd = Command::new(&self.uv_path); + cmd.kill_on_drop(true) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -125,8 +131,14 @@ impl Uv { .arg("install") .arg("-r") .arg(cwd.join("requirements.txt")) - .envs(env_vars) - .spawn()?; + .envs(env_vars); + + #[cfg(unix)] + { + cmd.process_group(0); + } + + let child = cmd.spawn()?; Ok(child) } else { @@ -140,8 +152,8 @@ impl Uv { pub async fn run(&self, cwd: &PathBuf, program: &PathBuf, env_vars: &HashMap) -> Result { debug!("Executing UV ({:?}) run {:?} in {:?}", &self.uv_path, program, cwd); - let child = Command::new(&self.uv_path) - .kill_on_drop(true) + let mut cmd = Command::new(&self.uv_path); + cmd.kill_on_drop(true) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -151,9 +163,14 @@ impl Uv { .arg("--no-progress") .arg("run") .arg(program) - .envs(env_vars) - .spawn()?; + .envs(env_vars); + + #[cfg(unix)] + { + cmd.process_group(0); + } + let child = cmd.spawn()?; Ok(child) } From bb539fd7e6018be083c96d92ea494f168f5903c0 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 8 Sep 2025 14:46:51 +0100 Subject: [PATCH 2/3] Bump version to v0.3.28 --- Cargo.lock | 22 +++++++++++----------- Cargo.toml | 3 ++- pyproject.toml | 3 ++- uv.lock | 2 +- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb02b52a..d6ed7c9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,7 +387,7 @@ dependencies = [ [[package]] name = "config" -version = "0.3.27" +version = "0.3.28" dependencies = [ "chrono", "clap", @@ -474,7 +474,7 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crypto" -version = "0.3.27" +version = "0.3.28" dependencies = [ "aes-gcm", "base64", @@ -2629,7 +2629,7 @@ dependencies = [ [[package]] name = "testutils" -version = "0.3.27" +version = "0.3.28" dependencies = [ "pem", "rsa", @@ -2875,7 +2875,7 @@ checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" [[package]] name = "tower" -version = "0.3.27" +version = "0.3.28" dependencies = [ "tokio", "tower-api", @@ -2899,7 +2899,7 @@ dependencies = [ [[package]] name = "tower-api" -version = "0.3.27" +version = "0.3.28" dependencies = [ "reqwest", "serde", @@ -2911,7 +2911,7 @@ dependencies = [ [[package]] name = "tower-cmd" -version = "0.3.27" +version = "0.3.28" dependencies = [ "anyhow", "bytes", @@ -2970,7 +2970,7 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-package" -version = "0.3.27" +version = "0.3.28" dependencies = [ "async-compression", "config", @@ -2989,7 +2989,7 @@ dependencies = [ [[package]] name = "tower-runtime" -version = "0.3.27" +version = "0.3.28" dependencies = [ "chrono", "config", @@ -3010,7 +3010,7 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tower-telemetry" -version = "0.3.27" +version = "0.3.28" dependencies = [ "tracing", "tracing-appender", @@ -3019,7 +3019,7 @@ dependencies = [ [[package]] name = "tower-uv" -version = "0.3.27" +version = "0.3.28" dependencies = [ "async-compression", "async_zip", @@ -3033,7 +3033,7 @@ dependencies = [ [[package]] name = "tower-version" -version = "0.3.27" +version = "0.3.28" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index be9648fe..8e3f20c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ resolver = "2" [workspace.package] edition = "2021" -version = "0.3.27" +version = "0.3.28" + diff --git a/pyproject.toml b/pyproject.toml index 9603cd92..c655e0c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,8 @@ build-backend = "maturin" [project] name = "tower" -version = "0.3.27" +version = "0.3.28" + diff --git a/uv.lock b/uv.lock index ea82fc30..1179969b 100644 --- a/uv.lock +++ b/uv.lock @@ -1217,7 +1217,7 @@ wheels = [ [[package]] name = "tower" -version = "0.3.27" +version = "0.3.28" source = { editable = "." } dependencies = [ { name = "attrs" }, From adf5049e10b7830cc09f5c99a8f45b504443aaff Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 8 Sep 2025 15:08:06 +0100 Subject: [PATCH 3/3] chore: Avoid unneeded import on Windows --- crates/tower-runtime/src/local.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 82121f31..3ddc779d 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -41,7 +41,7 @@ use nix::{ use tokio_util::sync::CancellationToken; use tower_package::{Manifest, Package}; -use tower_telemetry::{debug, error}; +use tower_telemetry::debug; use tower_uv::Uv; use crate::{ @@ -445,7 +445,7 @@ async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) { None => { // We didn't get anything, so we can't do anything. Let's just exit with a debug // message. - error!(ctx: &ctx, "child process has no pid, cannot kill"); + tower_telemetry::error!(ctx: &ctx, "child process has no pid, cannot kill"); return; } };