diff --git a/Cargo.lock b/Cargo.lock index c9ca53dd..fb02b52a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2993,6 +2993,7 @@ version = "0.3.27" dependencies = [ "chrono", "config", + "nix 0.30.1", "snafu", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index 5d636161..be9648fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ futures-lite = "2.6" glob = "0.3" http = "1.1" indicatif = "0.17" +nix = { version = "0.30", features = ["signal"] } pem = "3" promptly = "0.3" rand = "0.8" diff --git a/crates/tower-runtime/Cargo.toml b/crates/tower-runtime/Cargo.toml index 7bed0858..8ab42fb5 100644 --- a/crates/tower-runtime/Cargo.toml +++ b/crates/tower-runtime/Cargo.toml @@ -8,9 +8,10 @@ license = { workspace = true } [dependencies] chrono = { workspace = true } +nix = { workspace = true } +snafu = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } -snafu = { workspace = true } tower-package = { workspace = true } tower-telemetry = { workspace = true } tower-uv = { workspace = true } diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index a9da7dfb..82121f31 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -17,6 +17,7 @@ use tokio::{ fs, io::{AsyncRead, BufReader, AsyncBufReadExt}, process::{Child, Command}, + runtime::Handle, sync::{ Mutex, oneshot::{ @@ -28,10 +29,19 @@ use tokio::{ time::{timeout, Duration}, }; +#[cfg(unix)] +use nix::{ + unistd::Pid, + sys::signal::{ + Signal, + killpg, + }, +}; + use tokio_util::sync::CancellationToken; use tower_package::{Manifest, Package}; -use tower_telemetry::debug; +use tower_telemetry::{debug, error}; use tower_uv::Uv; use crate::{ @@ -49,7 +59,7 @@ pub struct LocalApp { waiter: Mutex>, // terminator is what we use to flag that we want to terminate the child process. - terminator: Mutex, + terminator: CancellationToken, // execute_handle keeps track of the current state of the execution lifecycle. execute_handle: Option>>, @@ -271,20 +281,28 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender, cancel_ impl Drop for LocalApp { fn drop(&mut self) { - // We want to ensure that we cancel the process if it is still running. - let _ = self.terminate(); + // CancellationToken::cancel() is not async + self.terminator.cancel(); + + // Optionally spawn a task to wait for the handle + if let Some(execute_handle) = self.execute_handle.take() { + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + let _ = execute_handle.await; + }); + } + } } } impl App for LocalApp { async fn start(opts: StartOptions) -> Result { - let cancel_token = CancellationToken::new(); - let terminator = Mutex::new(cancel_token.clone()); + let terminator = CancellationToken::new(); let (sx, rx) = oneshot::channel::(); let waiter = Mutex::new(rx); - let handle = tokio::spawn(execute_local_app(opts, sx, cancel_token)); + let handle = tokio::spawn(execute_local_app(opts, sx, terminator.clone())); let execute_handle = Some(handle); Ok(Self { @@ -323,8 +341,7 @@ impl App for LocalApp { } async fn terminate(&mut self) -> Result<(), Error> { - let terminator = self.terminator.lock().await; - terminator.cancel(); + self.terminator.cancel(); // Now we should wait for the join handle to finish. if let Some(execute_handle) = self.execute_handle.take() { @@ -421,11 +438,57 @@ fn make_env_vars(ctx: &tower_telemetry::Context, env: &str, cwd: &PathBuf, secs: res } +#[cfg(unix)] +async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) { + let pid = match child.id() { + Some(pid) => pid, + None => { + // We didn't get anything, so we can't do anything. Let's just exit with a debug + // message. + error!(ctx: &ctx, "child process has no pid, cannot kill"); + return; + } + }; + + // This is the actual converted pid. + let pid = Pid::from_raw(pid as i32); + + // We first send a SIGTERM to ensure that the child processes are terminated. Using SIGKILL + // (default behavior in Child::kill) can leave orphaned processes behind. + killpg( + pid, + Signal::SIGTERM + ).ok(); + + // If it doesn't die after 2 seconds then we'll forcefully kill it. This timeout should be less + // than the overall timeout for the process (which should likely live on the context as a + // deadline). + let timeout = timeout( + Duration::from_secs(2), + child.wait() + ).await; + + if timeout.is_err() { + killpg( + pid, + Signal::SIGKILL + ).ok(); + } +} + +#[cfg(not(unix))] +async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) { + match child.kill().await { + Ok(_) => debug!(ctx: &ctx, "child process killed successfully"), + Err(e) => debug!(ctx: &ctx, "failed to kill child process: {}", e), + }; +} + async fn wait_for_process(ctx: tower_telemetry::Context, cancel_token: &CancellationToken, mut child: Child) -> i32 { let code = loop { if cancel_token.is_cancelled() { debug!(ctx: &ctx, "process cancelled, terminating child process"); - let _ = child.kill().await; + kill_child_process(&ctx, child).await; break -1; // return -1 to indicate that the process was cancelled. } diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 878faecd..e86cf65f 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -95,8 +95,8 @@ impl Uv { // that's easy. if cwd.join("pyproject.toml").exists() { debug!("Executing UV ({:?}) sync in {:?}", &self.uv_path, cwd); - let child = Command::new(&self.uv_path) - .kill_on_drop(true) + let mut cmd = Command::new(&self.uv_path); + cmd.kill_on_drop(true) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -105,16 +105,22 @@ impl Uv { .arg("never") .arg("--no-progress") .arg("sync") - .envs(env_vars) - .spawn()?; + .envs(env_vars); + + #[cfg(unix)] + { + cmd.process_group(0); + } + + let child = cmd.spawn()?; Ok(child) } else if cwd.join("requirements.txt").exists() { debug!("Executing UV ({:?}) sync with requirements in {:?}", &self.uv_path, cwd); // If there is a requirements.txt, then we can use that to sync. - let child = Command::new(&self.uv_path) - .kill_on_drop(true) + let mut cmd = Command::new(&self.uv_path); + cmd.kill_on_drop(true) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -125,8 +131,14 @@ impl Uv { .arg("install") .arg("-r") .arg(cwd.join("requirements.txt")) - .envs(env_vars) - .spawn()?; + .envs(env_vars); + + #[cfg(unix)] + { + cmd.process_group(0); + } + + let child = cmd.spawn()?; Ok(child) } else { @@ -140,8 +152,8 @@ impl Uv { pub async fn run(&self, cwd: &PathBuf, program: &PathBuf, env_vars: &HashMap) -> Result { debug!("Executing UV ({:?}) run {:?} in {:?}", &self.uv_path, program, cwd); - let child = Command::new(&self.uv_path) - .kill_on_drop(true) + let mut cmd = Command::new(&self.uv_path); + cmd.kill_on_drop(true) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -151,9 +163,14 @@ impl Uv { .arg("--no-progress") .arg("run") .arg(program) - .envs(env_vars) - .spawn()?; + .envs(env_vars); + + #[cfg(unix)] + { + cmd.process_group(0); + } + let child = cmd.spawn()?; Ok(child) }