Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ resolver = "2"

[workspace.package]
edition = "2021"
version = "0.3.27"
version = "0.3.28"




Expand Down Expand Up @@ -38,6 +39,7 @@ futures-lite = "2.6"
glob = "0.3"
http = "1.1"
indicatif = "0.17"
nix = { version = "0.30", features = ["signal"] }
pem = "3"
promptly = "0.3"
rand = "0.8"
Expand Down
3 changes: 2 additions & 1 deletion crates/tower-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ license = { workspace = true }

[dependencies]
chrono = { workspace = true }
nix = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
snafu = { workspace = true }
tower-package = { workspace = true }
tower-telemetry = { workspace = true }
tower-uv = { workspace = true }
Expand Down
81 changes: 72 additions & 9 deletions crates/tower-runtime/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tokio::{
fs,
io::{AsyncRead, BufReader, AsyncBufReadExt},
process::{Child, Command},
runtime::Handle,
sync::{
Mutex,
oneshot::{
Expand All @@ -28,6 +29,15 @@ use tokio::{
time::{timeout, Duration},
};

#[cfg(unix)]
use nix::{
unistd::Pid,
sys::signal::{
Signal,
killpg,
},
};

use tokio_util::sync::CancellationToken;

use tower_package::{Manifest, Package};
Expand All @@ -49,7 +59,7 @@ pub struct LocalApp {
waiter: Mutex<oneshot::Receiver<i32>>,

// terminator is what we use to flag that we want to terminate the child process.
terminator: Mutex<CancellationToken>,
terminator: CancellationToken,

// execute_handle keeps track of the current state of the execution lifecycle.
execute_handle: Option<JoinHandle<Result<(), Error>>>,
Expand Down Expand Up @@ -271,20 +281,28 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender<i32>, cancel_

impl Drop for LocalApp {
fn drop(&mut self) {
// We want to ensure that we cancel the process if it is still running.
let _ = self.terminate();
// CancellationToken::cancel() is not async
self.terminator.cancel();

// Optionally spawn a task to wait for the handle
if let Some(execute_handle) = self.execute_handle.take() {
if let Ok(handle) = Handle::try_current() {
handle.spawn(async move {
let _ = execute_handle.await;
});
}
}
}
}

impl App for LocalApp {
async fn start(opts: StartOptions) -> Result<Self, Error> {
let cancel_token = CancellationToken::new();
let terminator = Mutex::new(cancel_token.clone());
let terminator = CancellationToken::new();

let (sx, rx) = oneshot::channel::<i32>();
let waiter = Mutex::new(rx);

let handle = tokio::spawn(execute_local_app(opts, sx, cancel_token));
let handle = tokio::spawn(execute_local_app(opts, sx, terminator.clone()));
let execute_handle = Some(handle);

Ok(Self {
Expand Down Expand Up @@ -323,8 +341,7 @@ impl App for LocalApp {
}

async fn terminate(&mut self) -> Result<(), Error> {
let terminator = self.terminator.lock().await;
terminator.cancel();
self.terminator.cancel();

// Now we should wait for the join handle to finish.
if let Some(execute_handle) = self.execute_handle.take() {
Expand Down Expand Up @@ -421,11 +438,57 @@ fn make_env_vars(ctx: &tower_telemetry::Context, env: &str, cwd: &PathBuf, secs:
res
}

#[cfg(unix)]
async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) {
let pid = match child.id() {
Some(pid) => pid,
None => {
// We didn't get anything, so we can't do anything. Let's just exit with a debug
// message.
tower_telemetry::error!(ctx: &ctx, "child process has no pid, cannot kill");
return;
}
};

// This is the actual converted pid.
let pid = Pid::from_raw(pid as i32);

// We first send a SIGTERM to ensure that the child processes are terminated. Using SIGKILL
// (default behavior in Child::kill) can leave orphaned processes behind.
killpg(
pid,
Signal::SIGTERM
).ok();

// If it doesn't die after 2 seconds then we'll forcefully kill it. This timeout should be less
// than the overall timeout for the process (which should likely live on the context as a
// deadline).
let timeout = timeout(
Duration::from_secs(2),
child.wait()
).await;

if timeout.is_err() {
killpg(
pid,
Signal::SIGKILL
).ok();
}
}

#[cfg(not(unix))]
async fn kill_child_process(ctx: &tower_telemetry::Context, mut child: Child) {
match child.kill().await {
Ok(_) => debug!(ctx: &ctx, "child process killed successfully"),
Err(e) => debug!(ctx: &ctx, "failed to kill child process: {}", e),
};
}

async fn wait_for_process(ctx: tower_telemetry::Context, cancel_token: &CancellationToken, mut child: Child) -> i32 {
let code = loop {
if cancel_token.is_cancelled() {
debug!(ctx: &ctx, "process cancelled, terminating child process");
let _ = child.kill().await;
kill_child_process(&ctx, child).await;
break -1; // return -1 to indicate that the process was cancelled.
}

Expand Down
41 changes: 29 additions & 12 deletions crates/tower-uv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ impl Uv {
// that's easy.
if cwd.join("pyproject.toml").exists() {
debug!("Executing UV ({:?}) sync in {:?}", &self.uv_path, cwd);
let child = Command::new(&self.uv_path)
.kill_on_drop(true)
let mut cmd = Command::new(&self.uv_path);
cmd.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
Expand All @@ -105,16 +105,22 @@ impl Uv {
.arg("never")
.arg("--no-progress")
.arg("sync")
.envs(env_vars)
.spawn()?;
.envs(env_vars);

#[cfg(unix)]
{
cmd.process_group(0);
}

let child = cmd.spawn()?;

Ok(child)
} else if cwd.join("requirements.txt").exists() {
debug!("Executing UV ({:?}) sync with requirements in {:?}", &self.uv_path, cwd);

// If there is a requirements.txt, then we can use that to sync.
let child = Command::new(&self.uv_path)
.kill_on_drop(true)
let mut cmd = Command::new(&self.uv_path);
cmd.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
Expand All @@ -125,8 +131,14 @@ impl Uv {
.arg("install")
.arg("-r")
.arg(cwd.join("requirements.txt"))
.envs(env_vars)
.spawn()?;
.envs(env_vars);

#[cfg(unix)]
{
cmd.process_group(0);
}

let child = cmd.spawn()?;

Ok(child)
} else {
Expand All @@ -140,8 +152,8 @@ impl Uv {
pub async fn run(&self, cwd: &PathBuf, program: &PathBuf, env_vars: &HashMap<String, String>) -> Result<Child, Error> {
debug!("Executing UV ({:?}) run {:?} in {:?}", &self.uv_path, program, cwd);

let child = Command::new(&self.uv_path)
.kill_on_drop(true)
let mut cmd = Command::new(&self.uv_path);
cmd.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
Expand All @@ -151,9 +163,14 @@ impl Uv {
.arg("--no-progress")
.arg("run")
.arg(program)
.envs(env_vars)
.spawn()?;
.envs(env_vars);

#[cfg(unix)]
{
cmd.process_group(0);
}

let child = cmd.spawn()?;
Ok(child)
}

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ build-backend = "maturin"

[project]
name = "tower"
version = "0.3.27"
version = "0.3.28"




Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading