Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ futures-lite = "2.6"
glob = "0.3"
http = "1.1"
indicatif = "0.17"
nix = { version = "0.30", features = ["signal"] }
pem = "3"
promptly = "0.3"
rand = "0.8"
Expand Down
3 changes: 2 additions & 1 deletion crates/tower-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ license = { workspace = true }

[dependencies]
chrono = { workspace = true }
nix = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
snafu = { workspace = true }
tower-package = { workspace = true }
tower-telemetry = { workspace = true }
tower-uv = { workspace = true }
Expand Down
83 changes: 73 additions & 10 deletions crates/tower-runtime/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tokio::{
fs,
io::{AsyncRead, BufReader, AsyncBufReadExt},
process::{Child, Command},
runtime::Handle,
sync::{
Mutex,
oneshot::{
Expand All @@ -28,10 +29,19 @@ use tokio::{
time::{timeout, Duration},
};

#[cfg(unix)]
use nix::{
unistd::Pid,
sys::signal::{
Signal,
killpg,
},
};

use tokio_util::sync::CancellationToken;

use tower_package::{Manifest, Package};
use tower_telemetry::debug;
use tower_telemetry::{debug, error};
use tower_uv::Uv;

use crate::{
Expand All @@ -49,7 +59,7 @@ pub struct LocalApp {
waiter: Mutex<oneshot::Receiver<i32>>,

// terminator is what we use to flag that we want to terminate the child process.
terminator: Mutex<CancellationToken>,
terminator: CancellationToken,

// execute_handle keeps track of the current state of the execution lifecycle.
execute_handle: Option<JoinHandle<Result<(), Error>>>,
Expand Down Expand Up @@ -271,20 +281,28 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender<i32>, cancel_

impl Drop for LocalApp {
fn drop(&mut self) {
// We want to ensure that we cancel the process if it is still running.
let _ = self.terminate();
// CancellationToken::cancel() is not async
self.terminator.cancel();

// Optionally spawn a task to wait for the handle
if let Some(execute_handle) = self.execute_handle.take() {
if let Ok(handle) = Handle::try_current() {
handle.spawn(async move {
let _ = execute_handle.await;
});
Copy link

Copilot AI Sep 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Handle::try_current() fails, the execute handle is never awaited, potentially leaving the spawned task running indefinitely. Consider handling this case or documenting the expected behavior when no runtime handle is available.

Suggested change
});
});
} else {
eprintln!("Warning: No Tokio runtime found when dropping LocalApp; spawned task may not be awaited and could run indefinitely.");

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

}
Comment on lines +289 to +293
Copy link

Copilot AI Sep 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spawned task silently discards any error from the execute handle. Consider logging the error or result to aid in debugging potential issues during cleanup.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional. It's not strictly required and we don't care if it fails really.

}
}
}

impl App for LocalApp {
async fn start(opts: StartOptions) -> Result<Self, Error> {
let cancel_token = CancellationToken::new();
let terminator = Mutex::new(cancel_token.clone());
let terminator = CancellationToken::new();

let (sx, rx) = oneshot::channel::<i32>();
let waiter = Mutex::new(rx);

let handle = tokio::spawn(execute_local_app(opts, sx, cancel_token));
let handle = tokio::spawn(execute_local_app(opts, sx, terminator.clone()));
let execute_handle = Some(handle);

Ok(Self {
Expand Down Expand Up @@ -323,8 +341,7 @@ impl App for LocalApp {
}

async fn terminate(&mut self) -> Result<(), Error> {
let terminator = self.terminator.lock().await;
terminator.cancel();
self.terminator.cancel();

// Now we should wait for the join handle to finish.
if let Some(execute_handle) = self.execute_handle.take() {
Expand Down Expand Up @@ -421,11 +438,57 @@ fn make_env_vars(ctx: &tower_telemetry::Context, env: &str, cwd: &PathBuf, secs:
res
}

#[cfg(unix)]
async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) {
let pid = match child.id() {
Some(pid) => pid,
None => {
// We didn't get anything, so we can't do anything. Let's just exit with a debug
// message.
error!(ctx: &ctx, "child process has no pid, cannot kill");
return;
}
};

// This is the actual converted pid.
let pid = Pid::from_raw(pid as i32);

// We first send a SIGTERM to ensure that the child processes are terminated. Using SIGKILL
// (default behavior in Child::kill) can leave orphaned processes behind.
killpg(
pid,
Signal::SIGTERM
).ok();

// If it doesn't die after 2 seconds then we'll forcefully kill it. This timeout should be less
// than the overall timeout for the process (which should likely live on the context as a
// deadline).
let timeout = timeout(
Duration::from_secs(2),
child.wait()
).await;

if timeout.is_err() {
killpg(
pid,
Signal::SIGKILL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a bit old school but i reckon a SIGHUP before SIGKILL might be an idea

).ok();
}
}

#[cfg(not(unix))]
async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so in windows will default back to whatever child.kill() tries to do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, apparently (untested by me) it's not an issue on Windows.

match child.kill().await {
Ok(_) => debug!(ctx: &ctx, "child process killed successfully"),
Err(e) => debug!(ctx: &ctx, "failed to kill child process: {}", e),
};
}

async fn wait_for_process(ctx: tower_telemetry::Context, cancel_token: &CancellationToken, mut child: Child) -> i32 {
let code = loop {
if cancel_token.is_cancelled() {
debug!(ctx: &ctx, "process cancelled, terminating child process");
let _ = child.kill().await;
kill_child_process(&ctx, child).await;
break -1; // return -1 to indicate that the process was cancelled.
}

Expand Down
41 changes: 29 additions & 12 deletions crates/tower-uv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ impl Uv {
// that's easy.
if cwd.join("pyproject.toml").exists() {
debug!("Executing UV ({:?}) sync in {:?}", &self.uv_path, cwd);
let child = Command::new(&self.uv_path)
.kill_on_drop(true)
let mut cmd = Command::new(&self.uv_path);
cmd.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
Expand All @@ -105,16 +105,22 @@ impl Uv {
.arg("never")
.arg("--no-progress")
.arg("sync")
.envs(env_vars)
.spawn()?;
.envs(env_vars);

#[cfg(unix)]
{
cmd.process_group(0);
}

let child = cmd.spawn()?;

Ok(child)
} else if cwd.join("requirements.txt").exists() {
debug!("Executing UV ({:?}) sync with requirements in {:?}", &self.uv_path, cwd);

// If there is a requirements.txt, then we can use that to sync.
let child = Command::new(&self.uv_path)
.kill_on_drop(true)
let mut cmd = Command::new(&self.uv_path);
cmd.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
Expand All @@ -125,8 +131,14 @@ impl Uv {
.arg("install")
.arg("-r")
.arg(cwd.join("requirements.txt"))
.envs(env_vars)
.spawn()?;
.envs(env_vars);

#[cfg(unix)]
{
cmd.process_group(0);
}

let child = cmd.spawn()?;

Ok(child)
} else {
Expand All @@ -140,8 +152,8 @@ impl Uv {
pub async fn run(&self, cwd: &PathBuf, program: &PathBuf, env_vars: &HashMap<String, String>) -> Result<Child, Error> {
debug!("Executing UV ({:?}) run {:?} in {:?}", &self.uv_path, program, cwd);

let child = Command::new(&self.uv_path)
.kill_on_drop(true)
let mut cmd = Command::new(&self.uv_path);
cmd.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
Expand All @@ -151,9 +163,14 @@ impl Uv {
.arg("--no-progress")
.arg("run")
.arg(program)
.envs(env_vars)
.spawn()?;
.envs(env_vars);

#[cfg(unix)]
{
cmd.process_group(0);
}

let child = cmd.spawn()?;
Ok(child)
}

Expand Down
Loading