From b7a8a563108e27052a2b4e72423d33d2ef150ad7 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 20 Jul 2025 12:01:08 +0200 Subject: [PATCH 01/15] fix: Don't run `sync` if there's a missing pyproject.toml --- crates/tower-runtime/src/errors.rs | 1 + crates/tower-runtime/src/local.rs | 41 ++++++++++++++++++++++-------- crates/tower-uv/src/lib.rs | 7 +++++ 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/crates/tower-runtime/src/errors.rs b/crates/tower-runtime/src/errors.rs index 5162fc6d..6dd4a193 100644 --- a/crates/tower-runtime/src/errors.rs +++ b/crates/tower-runtime/src/errors.rs @@ -85,6 +85,7 @@ impl From for Error { tower_uv::Error::NotFound(_) => Error::SpawnFailed, tower_uv::Error::PermissionDenied(_) => Error::SpawnFailed, tower_uv::Error::Other(_) => Error::SpawnFailed, + tower_uv::Error::MissingPyprojectToml => Error::SpawnFailed, } } } diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 76c16124..0fc1ca1d 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -194,20 +194,39 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender, cancel_ return Err(Error::Cancelled); } - let mut child = uv.sync(&working_dir, &env_vars).await?; - - // Drain the logs to the output channel. - if let Some(ref sender) = opts.output_sender { - let stdout = child.stdout.take().expect("no stdout"); - tokio::spawn(drain_output(FD::Stdout, Channel::Setup, sender.clone(), BufReader::new(stdout))); + match uv.sync(&working_dir, &env_vars).await { + Err(e) => { + // If we were missing a pyproject.toml, then that's fine for us--we'll just + // continue execution. + // + // Note that we do a match here instead of an if. That's because of the way + // tower_uv::Error is implemented. Namely, it doesn't implement PartialEq and can't + // do so due to it's dependency on std::io::Error. + match e { + tower_uv::Error::MissingPyprojectToml => { + debug!(ctx: &ctx, "no pyproject.toml found, continuing without sync"); + }, + _ => { + // If we got any other error, we want to return it. + return Err(e.into()); + } + } + }, + Ok(mut child) => { + // Drain the logs to the output channel. + if let Some(ref sender) = opts.output_sender { + let stdout = child.stdout.take().expect("no stdout"); + tokio::spawn(drain_output(FD::Stdout, Channel::Setup, sender.clone(), BufReader::new(stdout))); + + let stderr = child.stderr.take().expect("no stderr"); + tokio::spawn(drain_output(FD::Stderr, Channel::Setup, sender.clone(), BufReader::new(stderr))); + } - let stderr = child.stderr.take().expect("no stderr"); - tokio::spawn(drain_output(FD::Stderr, Channel::Setup, sender.clone(), BufReader::new(stderr))); + // Let's wait for the setup to finish. We don't care about the results. + wait_for_process(ctx.clone(), &cancel_token, child).await; + } } - // Let's wait for the setup to finish. We don't care about the results. - wait_for_process(ctx.clone(), &cancel_token, child).await; - // Check once more to see if the process was cancelled, this will bail us out early. if cancel_token.is_cancelled() { // if there's a waiter, we want them to know that the process was cancelled so we have diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 1998b21b..7cc15fad 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -15,6 +15,7 @@ pub enum Error { NotFound(String), PermissionDenied(String), Other(String), + MissingPyprojectToml, } impl From for Error { @@ -120,6 +121,12 @@ impl Uv { } pub async fn sync(&self, cwd: &PathBuf, env_vars: &HashMap) -> Result { + // Make sure there's a pyproject.toml in the cwd. If there isn't wont, then we don't want + // to do this otherwise uv will return an error on the CLI! + if !cwd.join("pyproject.toml").exists() { + return Err(Error::MissingPyprojectToml); + } + debug!("Executing UV ({:?}) sync in {:?}", &self.uv_path, cwd); let child = Command::new(&self.uv_path) From 923392c31e52dafacf2d7c3c38dfab0aec87b79f Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 20 Jul 2025 12:36:56 +0200 Subject: [PATCH 02/15] chore: Simple integration tests for local execution --- Cargo.lock | 1 + crates/tower-runtime/Cargo.toml | 3 + crates/tower-runtime/src/lib.rs | 2 +- .../example-apps/01-hello-world/Towerfile | 3 + .../tests/example-apps/01-hello-world/task.py | 2 + .../tests/example-apps/02-use-faker/README.md | 0 .../tests/example-apps/02-use-faker/Towerfile | 3 + .../tests/example-apps/02-use-faker/main.py | 9 ++ .../example-apps/02-use-faker/pyproject.toml | 9 ++ .../tests/example-apps/02-use-faker/uv.lock | 35 +++++ crates/tower-runtime/tests/local_test.rs | 124 ++++++++++++++++++ 11 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 crates/tower-runtime/tests/example-apps/01-hello-world/Towerfile create mode 100644 crates/tower-runtime/tests/example-apps/01-hello-world/task.py create mode 100644 crates/tower-runtime/tests/example-apps/02-use-faker/README.md create mode 100644 crates/tower-runtime/tests/example-apps/02-use-faker/Towerfile create mode 100644 crates/tower-runtime/tests/example-apps/02-use-faker/main.py create mode 100644 crates/tower-runtime/tests/example-apps/02-use-faker/pyproject.toml create mode 100644 crates/tower-runtime/tests/example-apps/02-use-faker/uv.lock create mode 100644 crates/tower-runtime/tests/local_test.rs diff --git a/Cargo.lock b/Cargo.lock index 3d4d2b93..8ac228cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2847,6 +2847,7 @@ name = "tower-runtime" version = "0.3.21-rc.1" dependencies = [ "chrono", + "config", "snafu", "tokio", "tokio-util", diff --git a/crates/tower-runtime/Cargo.toml b/crates/tower-runtime/Cargo.toml index 99770ec3..7bed0858 100644 --- a/crates/tower-runtime/Cargo.toml +++ b/crates/tower-runtime/Cargo.toml @@ -14,3 +14,6 @@ snafu = { workspace = true } tower-package = { workspace = true } tower-telemetry = { workspace = true } tower-uv = { workspace = true } + +[dev-dependencies] +config = { workspace = true } diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 4384521d..6f755012 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -41,7 +41,7 @@ pub struct Output { pub line: String, } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, PartialEq)] pub enum Status { None, Running, diff --git a/crates/tower-runtime/tests/example-apps/01-hello-world/Towerfile b/crates/tower-runtime/tests/example-apps/01-hello-world/Towerfile new file mode 100644 index 00000000..9406d9bc --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/01-hello-world/Towerfile @@ -0,0 +1,3 @@ +[app] +name = "01-hello-world" +script = "./task.py" diff --git a/crates/tower-runtime/tests/example-apps/01-hello-world/task.py b/crates/tower-runtime/tests/example-apps/01-hello-world/task.py new file mode 100644 index 00000000..215234df --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/01-hello-world/task.py @@ -0,0 +1,2 @@ +for i in range(0, 5): + print("Hello, world!") diff --git a/crates/tower-runtime/tests/example-apps/02-use-faker/README.md b/crates/tower-runtime/tests/example-apps/02-use-faker/README.md new file mode 100644 index 00000000..e69de29b diff --git a/crates/tower-runtime/tests/example-apps/02-use-faker/Towerfile b/crates/tower-runtime/tests/example-apps/02-use-faker/Towerfile new file mode 100644 index 00000000..97e276cf --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/02-use-faker/Towerfile @@ -0,0 +1,3 @@ +[app] +name = "02-use-faker" +script = "./main.py" diff --git a/crates/tower-runtime/tests/example-apps/02-use-faker/main.py b/crates/tower-runtime/tests/example-apps/02-use-faker/main.py new file mode 100644 index 00000000..95b8a506 --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/02-use-faker/main.py @@ -0,0 +1,9 @@ +from faker import Faker + +def main(): + fake = Faker() + print(fake.name()) + + +if __name__ == "__main__": + main() diff --git a/crates/tower-runtime/tests/example-apps/02-use-faker/pyproject.toml b/crates/tower-runtime/tests/example-apps/02-use-faker/pyproject.toml new file mode 100644 index 00000000..f88cfd39 --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/02-use-faker/pyproject.toml @@ -0,0 +1,9 @@ +[project] +name = "02-get-current-ip" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "faker>=37.4.2", +] diff --git a/crates/tower-runtime/tests/example-apps/02-use-faker/uv.lock b/crates/tower-runtime/tests/example-apps/02-use-faker/uv.lock new file mode 100644 index 00000000..391ed29e --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/02-use-faker/uv.lock @@ -0,0 +1,35 @@ +version = 1 +revision = 2 +requires-python = ">=3.12" + +[[package]] +name = "02-get-current-ip" +version = "0.1.0" +source = { virtual = "." } +dependencies = [ + { name = "faker" }, +] + +[package.metadata] +requires-dist = [{ name = "faker", specifier = ">=37.4.2" }] + +[[package]] +name = "faker" +version = "37.4.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzdata" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/65/95/da573e055608e180086e2ac3208f8c15d8b44220912f565a9821b9bff33a/faker-37.4.2.tar.gz", hash = "sha256:8e281bbaea30e5658895b8bea21cc50d27aaf3a43db3f2694409ca5701c56b0a", size = 1902890, upload-time = "2025-07-15T16:38:24.803Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/26/1c/b909a055be556c11f13cf058cfa0e152f9754d803ff3694a937efe300709/faker-37.4.2-py3-none-any.whl", hash = "sha256:b70ed1af57bfe988cbcd0afd95f4768c51eaf4e1ce8a30962e127ac5c139c93f", size = 1943179, upload-time = "2025-07-15T16:38:23.053Z" }, +] + +[[package]] +name = "tzdata" +version = "2025.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/95/32/1a225d6164441be760d75c2c42e2780dc0873fe382da3e98a2e1e48361e5/tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9", size = 196380, upload-time = "2025-03-23T13:54:43.652Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839, upload-time = "2025-03-23T13:54:41.845Z" }, +] diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs new file mode 100644 index 00000000..c6ad59ce --- /dev/null +++ b/crates/tower-runtime/tests/local_test.rs @@ -0,0 +1,124 @@ +use std::collections::HashMap; +use std::path::PathBuf; + +use tower_runtime::{ + App, + StartOptions, + Status, + create_output_stream, + local::LocalApp, +}; + +use config::Towerfile; +use tower_package::{Package, PackageSpec}; + +fn get_example_app_dir(name: &str) -> PathBuf { + // This is where the root of the app lives. + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("tests"); + path.push("example-apps"); + path.push(name); + + if !path.exists() { + panic!("Example app directory does not exist: {}", path.display()); + } + + path +} + +async fn build_package_from_dir(dir: &PathBuf) -> Package { + let towerfile = Towerfile::from_path(dir.join("Towerfile")).expect("Failed to load Towerfile"); + let spec = PackageSpec::from_towerfile(&towerfile); + let mut package = Package::build(spec).await.expect("Failed to build package from directory"); + package.unpack().await.expect("Failed to unpack package"); + package +} + +#[tokio::test] +async fn test_running_hello_world() { + let hello_world_dir = get_example_app_dir("01-hello-world"); + let package = build_package_from_dir(&hello_world_dir).await; + let (sender, receiver) = create_output_stream(); + + // We need to create the package, which will load the app + let opts = StartOptions{ + ctx: tower_telemetry::Context::new(), + package, + output_sender: Some(sender), + cwd: None, + environment: "local".to_string(), + secrets: HashMap::new(), + parameters: HashMap::new(), + env_vars: HashMap::new(), + }; + + // Start the app using the LocalApp runtime + let app = LocalApp::start(opts).await.expect("Failed to start app"); + + // The status should be running + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Running, "App should be running"); + + // Now we should wait for the output + let mut receiver = receiver.lock().await; + + while let Some(output) = receiver.recv().await { + assert!(output.line.contains("Hello, world!"), "Log should contain 'Hello, world!'"); + } + + // check the status once more, should be done. + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Exited, "App should be running"); +} + +#[tokio::test] +async fn test_running_use_faker() { + // This test is a simple test that outputs some text to the console; however, this time it has + // a depedency defined in pyprojec.toml, which means that it'll have to do a uv sync first. + let use_faker_dir = get_example_app_dir("02-use-faker"); + let package = build_package_from_dir(&use_faker_dir).await; + let (sender, receiver) = create_output_stream(); + + // We need to create the package, which will load the app + let opts = StartOptions{ + ctx: tower_telemetry::Context::new(), + package, + output_sender: Some(sender), + cwd: None, + environment: "local".to_string(), + secrets: HashMap::new(), + parameters: HashMap::new(), + env_vars: HashMap::new(), + }; + + // Start the app using the LocalApp runtime + let app = LocalApp::start(opts).await.expect("Failed to start app"); + + // The status should be running + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Running, "App should be running"); + + // Now we should wait for the output + let mut receiver = receiver.lock().await; + + let mut count_setup = 0; + let mut count_stdout = 0; + + while let Some(output) = receiver.recv().await { + match output.channel { + tower_runtime::Channel::Setup => { + count_setup += 1; + }, + tower_runtime::Channel::Program => { + count_stdout += 1; + } + } + } + + assert!(count_setup > 0, "There should be some setup output"); + assert!(count_stdout == 1, "There should be exactly one stdout output"); + + // check the status once more, should be done. + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Exited, "App should be running"); +} From b218d3bb646721f6c714399ce3896868a8b499c4 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 20 Jul 2025 12:44:45 +0200 Subject: [PATCH 03/15] Update crates/tower-runtime/tests/local_test.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/tower-runtime/tests/local_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index c6ad59ce..257f1765 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -74,7 +74,7 @@ async fn test_running_hello_world() { #[tokio::test] async fn test_running_use_faker() { // This test is a simple test that outputs some text to the console; however, this time it has - // a depedency defined in pyprojec.toml, which means that it'll have to do a uv sync first. + // a dependency defined in pyproject.toml, which means that it'll have to do a uv sync first. let use_faker_dir = get_example_app_dir("02-use-faker"); let package = build_package_from_dir(&use_faker_dir).await; let (sender, receiver) = create_output_stream(); From 7b2dc96774cf640d0c0340739035cfaf1d75929d Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 20 Jul 2025 12:44:51 +0200 Subject: [PATCH 04/15] Update crates/tower-uv/src/lib.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/tower-uv/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 7cc15fad..1af7653b 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -121,7 +121,7 @@ impl Uv { } pub async fn sync(&self, cwd: &PathBuf, env_vars: &HashMap) -> Result { - // Make sure there's a pyproject.toml in the cwd. If there isn't wont, then we don't want + // Make sure there's a pyproject.toml in the cwd. If there isn't one, then we don't want // to do this otherwise uv will return an error on the CLI! if !cwd.join("pyproject.toml").exists() { return Err(Error::MissingPyprojectToml); From 8d2dd86392564af4dc3f6ad200c84f5dfddc52f5 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 20 Jul 2025 12:56:33 +0200 Subject: [PATCH 05/15] chore: Reduce lock contention on reading output --- crates/tower-cmd/src/run.rs | 4 ++-- crates/tower-runtime/src/lib.rs | 17 +++++------------ crates/tower-runtime/src/local.rs | 22 ++++++++-------------- crates/tower-runtime/tests/local_test.rs | 14 ++++---------- 4 files changed, 19 insertions(+), 38 deletions(-) diff --git a/crates/tower-cmd/src/run.rs b/crates/tower-cmd/src/run.rs index aad608a7..08f2e7b0 100644 --- a/crates/tower-cmd/src/run.rs +++ b/crates/tower-cmd/src/run.rs @@ -342,9 +342,9 @@ async fn build_package(towerfile: &Towerfile) -> Package { /// monitor_output is a helper function that will monitor the output of a given output channel and /// plops it down on stdout. -async fn monitor_output(output: OutputReceiver) { +async fn monitor_output(mut output: OutputReceiver) { loop { - if let Some(line) = output.lock().await.recv().await { + if let Some(line) = output.recv().await { let ts = &line.time; let msg = &line.line; output::log_line(&ts.to_rfc3339(), msg, output::LogLineType::Local); diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 6f755012..97cf56e2 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -49,13 +49,9 @@ pub enum Status { Crashed { code: i32 }, } -type SharedReceiver = Arc>>; +pub type OutputReceiver = UnboundedReceiver; -type SharedSender = Arc>>; - -pub type OutputReceiver = SharedReceiver; - -pub type OutputSender = SharedSender; +pub type OutputSender = UnboundedSender; pub trait App { // start will start the process @@ -83,10 +79,7 @@ impl std::default::Default for AppLauncher { pub fn create_output_stream() -> (OutputSender, OutputReceiver) { let (sender, receiver) = unbounded_channel::(); - - let output_sender = Arc::new(Mutex::new(sender)); - let output_receiver = Arc::new(Mutex::new(receiver)); - (output_sender, output_receiver) + (sender, receiver) } impl AppLauncher { @@ -104,7 +97,7 @@ impl AppLauncher { let opts = StartOptions { ctx, - output_sender: Some(output_sender), + output_sender, cwd: Some(cwd), environment, secrets, @@ -152,7 +145,7 @@ pub struct StartOptions { pub secrets: HashMap, pub parameters: HashMap, pub env_vars: HashMap, - pub output_sender: Option, + pub output_sender: OutputSender, } pub struct ExecuteOptions { diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 0fc1ca1d..4fb9e54e 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -214,13 +214,11 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender, cancel_ }, Ok(mut child) => { // Drain the logs to the output channel. - if let Some(ref sender) = opts.output_sender { - let stdout = child.stdout.take().expect("no stdout"); - tokio::spawn(drain_output(FD::Stdout, Channel::Setup, sender.clone(), BufReader::new(stdout))); + let stdout = child.stdout.take().expect("no stdout"); + tokio::spawn(drain_output(FD::Stdout, Channel::Setup, opts.output_sender.clone(), BufReader::new(stdout))); - let stderr = child.stderr.take().expect("no stderr"); - tokio::spawn(drain_output(FD::Stderr, Channel::Setup, sender.clone(), BufReader::new(stderr))); - } + let stderr = child.stderr.take().expect("no stderr"); + tokio::spawn(drain_output(FD::Stderr, Channel::Setup, opts.output_sender.clone(), BufReader::new(stderr))); // Let's wait for the setup to finish. We don't care about the results. wait_for_process(ctx.clone(), &cancel_token, child).await; @@ -238,13 +236,11 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender, cancel_ let mut child = uv.run(&working_dir, &program_path, &env_vars).await?; // Drain the logs to the output channel. - if let Some(ref sender) = opts.output_sender { - let stdout = child.stdout.take().expect("no stdout"); - tokio::spawn(drain_output(FD::Stdout, Channel::Program, sender.clone(), BufReader::new(stdout))); + let stdout = child.stdout.take().expect("no stdout"); + tokio::spawn(drain_output(FD::Stdout, Channel::Program, opts.output_sender.clone(), BufReader::new(stdout))); - let stderr = child.stderr.take().expect("no stderr"); - tokio::spawn(drain_output(FD::Stderr, Channel::Program, sender.clone(), BufReader::new(stderr))); - } + let stderr = child.stderr.take().expect("no stderr"); + tokio::spawn(drain_output(FD::Stderr, Channel::Program, opts.output_sender.clone(), BufReader::new(stderr))); let _ = sx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); } @@ -429,8 +425,6 @@ async fn drain_output(fd: FD, channel: Channel, output: Ou let mut lines = input.lines(); while let Some(line) = lines.next_line().await.expect("line iteration fialed") { - let output = output.lock().await; - let _ = output.send(Output{ channel, fd, diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index 257f1765..eddbc9fc 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -38,13 +38,13 @@ async fn build_package_from_dir(dir: &PathBuf) -> Package { async fn test_running_hello_world() { let hello_world_dir = get_example_app_dir("01-hello-world"); let package = build_package_from_dir(&hello_world_dir).await; - let (sender, receiver) = create_output_stream(); + let (sender, mut receiver) = create_output_stream(); // We need to create the package, which will load the app let opts = StartOptions{ ctx: tower_telemetry::Context::new(), package, - output_sender: Some(sender), + output_sender: sender, cwd: None, environment: "local".to_string(), secrets: HashMap::new(), @@ -59,9 +59,6 @@ async fn test_running_hello_world() { let status = app.status().await.expect("Failed to get app status"); assert!(status == Status::Running, "App should be running"); - // Now we should wait for the output - let mut receiver = receiver.lock().await; - while let Some(output) = receiver.recv().await { assert!(output.line.contains("Hello, world!"), "Log should contain 'Hello, world!'"); } @@ -77,13 +74,13 @@ async fn test_running_use_faker() { // a dependency defined in pyproject.toml, which means that it'll have to do a uv sync first. let use_faker_dir = get_example_app_dir("02-use-faker"); let package = build_package_from_dir(&use_faker_dir).await; - let (sender, receiver) = create_output_stream(); + let (sender, mut receiver) = create_output_stream(); // We need to create the package, which will load the app let opts = StartOptions{ ctx: tower_telemetry::Context::new(), package, - output_sender: Some(sender), + output_sender: sender, cwd: None, environment: "local".to_string(), secrets: HashMap::new(), @@ -98,9 +95,6 @@ async fn test_running_use_faker() { let status = app.status().await.expect("Failed to get app status"); assert!(status == Status::Running, "App should be running"); - // Now we should wait for the output - let mut receiver = receiver.lock().await; - let mut count_setup = 0; let mut count_stdout = 0; From bd3e7ccd345661eacdb11c1d78a9783465ca1fbd Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 20 Jul 2025 16:06:16 +0200 Subject: [PATCH 06/15] chore: Attempts to resolve `uv` problems on first download --- Cargo.lock | 1 + crates/tower-runtime/src/errors.rs | 2 ++ crates/tower-runtime/tests/local_test.rs | 9 +++++++++ crates/tower-uv/Cargo.toml | 1 + crates/tower-uv/src/install.rs | 4 +++- crates/tower-uv/src/lib.rs | 25 ++++++++++++++++++++++++ 6 files changed, 41 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 8ac228cc..d578b393 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2877,6 +2877,7 @@ version = "0.3.21-rc.1" dependencies = [ "async-compression", "async_zip", + "dirs", "futures-lite", "reqwest", "tokio", diff --git a/crates/tower-runtime/src/errors.rs b/crates/tower-runtime/src/errors.rs index 6dd4a193..73c1b7cc 100644 --- a/crates/tower-runtime/src/errors.rs +++ b/crates/tower-runtime/src/errors.rs @@ -86,6 +86,8 @@ impl From for Error { tower_uv::Error::PermissionDenied(_) => Error::SpawnFailed, tower_uv::Error::Other(_) => Error::SpawnFailed, tower_uv::Error::MissingPyprojectToml => Error::SpawnFailed, + tower_uv::Error::InvalidUv => Error::SpawnFailed, + tower_uv::Error::UnsupportedPlatform => Error::UnsupportedPlatform, } } } diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index eddbc9fc..06628b79 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -11,6 +11,7 @@ use tower_runtime::{ use config::Towerfile; use tower_package::{Package, PackageSpec}; +use tower_telemetry::{self, debug}; fn get_example_app_dir(name: &str) -> PathBuf { // This is where the root of the app lives. @@ -36,6 +37,13 @@ async fn build_package_from_dir(dir: &PathBuf) -> Package { #[tokio::test] async fn test_running_hello_world() { + tower_telemetry::enable_logging( + tower_telemetry::LogLevel::Debug, + tower_telemetry::LogFormat::Plain, + tower_telemetry::LogDestination::Stdout, + ); + + debug!("Running 01-hello-world"); let hello_world_dir = get_example_app_dir("01-hello-world"); let package = build_package_from_dir(&hello_world_dir).await; let (sender, mut receiver) = create_output_stream(); @@ -70,6 +78,7 @@ async fn test_running_hello_world() { #[tokio::test] async fn test_running_use_faker() { + debug!("Running 02-use-faker"); // This test is a simple test that outputs some text to the console; however, this time it has // a dependency defined in pyproject.toml, which means that it'll have to do a uv sync first. let use_faker_dir = get_example_app_dir("02-use-faker"); diff --git a/crates/tower-uv/Cargo.toml b/crates/tower-uv/Cargo.toml index ffd63f59..b85e9d0c 100644 --- a/crates/tower-uv/Cargo.toml +++ b/crates/tower-uv/Cargo.toml @@ -9,6 +9,7 @@ license = { workspace = true } [dependencies] async-compression = { workspace = true } async_zip = { workspace = true } +dirs = { workspace = true } futures-lite = { workspace = true } reqwest = { workspace = true } tokio = { workspace = true } diff --git a/crates/tower-uv/src/install.rs b/crates/tower-uv/src/install.rs index 25677da3..60eb39fa 100644 --- a/crates/tower-uv/src/install.rs +++ b/crates/tower-uv/src/install.rs @@ -14,6 +14,7 @@ const UV_VERSION: &str = crate::UV_VERSION; #[derive(Debug)] pub enum Error { + UnsupportedPlatform, IoError(std::io::Error), Other(String), } @@ -31,7 +32,8 @@ impl From for Error { } pub fn get_default_uv_bin_dir() -> Result { - Ok(PathBuf::from(".tower/bin")) + let dir = dirs::data_local_dir().ok_or(Error::UnsupportedPlatform)?; + Ok(dir.join("tower").join("bin")) } #[derive(Debug)] diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 1af7653b..521589ec 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -16,6 +16,8 @@ pub enum Error { PermissionDenied(String), Other(String), MissingPyprojectToml, + InvalidUv, + UnsupportedPlatform, } impl From for Error { @@ -28,6 +30,7 @@ impl From for Error { impl From for Error { fn from(err: install::Error) -> Self { match err { + install::Error::UnsupportedPlatform => Error::UnsupportedPlatform, install::Error::IoError(e) => Error::IoError(e), install::Error::Other(msg) => Error::Other(msg), } @@ -68,11 +71,14 @@ async fn find_uv_binary() -> Option { } async fn find_or_setup_uv() -> Result { + // If FORCE_DOWNLOAD_UV is set, we will always download UV + // // If we get here, uv wasn't found in PATH, so let's download it if let Some(path) = find_uv_binary().await { Ok(path) } else { let path = install::get_default_uv_bin_dir()?; + debug!("UV binary not found in PATH, setting up UV at {:?}", path); // Create the directory if it doesn't exist std::fs::create_dir_all(&path).map_err(Error::IoError)?; @@ -95,6 +101,24 @@ async fn find_or_setup_uv() -> Result { } } +async fn test_uv_path(path: &PathBuf) -> Result<(), Error> { + let res = Command::new(&path) + .arg("--color") + .arg("never") + .arg("--no-progress") + .arg("--help") + .output() + .await; + + match res { + Ok(_) => Ok(()), + Err(e) => { + debug!("Testing UV failed: {:?}", e); + Err(Error::InvalidUv) + } + } +} + pub struct Uv { pub uv_path: PathBuf, } @@ -102,6 +126,7 @@ pub struct Uv { impl Uv { pub async fn new() -> Result { let uv_path = find_or_setup_uv().await?; + test_uv_path(&uv_path).await?; Ok(Uv { uv_path }) } From 5496d1701b05cdeb35b4ccba63b1e6980b9a2ccb Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 20 Jul 2025 16:36:16 +0200 Subject: [PATCH 07/15] chore: Concurrent test runs cause problems! --- crates/tower-runtime/src/lib.rs | 2 -- crates/tower-runtime/tests/local_test.rs | 6 ++-- crates/tower-uv/src/install.rs | 1 + crates/tower-uv/src/lib.rs | 46 ++++++++++++------------ crates/tower-uv/tests/install_test.rs | 15 ++++++++ 5 files changed, 42 insertions(+), 28 deletions(-) create mode 100644 crates/tower-uv/tests/install_test.rs diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 97cf56e2..00f7da98 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -1,8 +1,6 @@ use std::path::PathBuf; use std::future::Future; -use std::sync::Arc; use std::collections::HashMap; -use tokio::sync::Mutex; use tokio::sync::mpsc::{ UnboundedReceiver, UnboundedSender, diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index 06628b79..6b26b2ba 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -35,7 +35,7 @@ async fn build_package_from_dir(dir: &PathBuf) -> Package { package } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn test_running_hello_world() { tower_telemetry::enable_logging( tower_telemetry::LogLevel::Debug, @@ -68,6 +68,7 @@ async fn test_running_hello_world() { assert!(status == Status::Running, "App should be running"); while let Some(output) = receiver.recv().await { + debug!("Received output: {:?}", output.line); assert!(output.line.contains("Hello, world!"), "Log should contain 'Hello, world!'"); } @@ -76,7 +77,7 @@ async fn test_running_hello_world() { assert!(status == Status::Exited, "App should be running"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn test_running_use_faker() { debug!("Running 02-use-faker"); // This test is a simple test that outputs some text to the console; however, this time it has @@ -108,6 +109,7 @@ async fn test_running_use_faker() { let mut count_stdout = 0; while let Some(output) = receiver.recv().await { + debug!("Received output: {:?}", output.line); match output.channel { tower_runtime::Channel::Setup => { count_setup += 1; diff --git a/crates/tower-uv/src/install.rs b/crates/tower-uv/src/install.rs index 60eb39fa..73945586 100644 --- a/crates/tower-uv/src/install.rs +++ b/crates/tower-uv/src/install.rs @@ -303,6 +303,7 @@ async fn download_uv_archive(path: &PathBuf, archive: String) -> Result Result { + debug!("Starting download of UV for current architecture"); let archive = ArchiveSelector::get_archive_name().await?; let path = download_uv_archive(path, archive).await?; debug!("Downloaded UV to: {:?}", path); diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 521589ec..58574e46 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -4,7 +4,7 @@ use std::process::Stdio; use tokio::process::{Command, Child}; use tower_telemetry::debug; -mod install; +pub mod install; // UV_VERSION is the version of UV to download and install when setting up a local UV deployment. pub const UV_VERSION: &str = "0.7.13"; @@ -47,27 +47,8 @@ async fn find_uv_binary() -> Option { } } } - - // First, check if uv is already in the PATH - let output = Command::new("which") - .arg("uv") - .output() - .await; - - if let Ok(output) = output { - let path_str = String::from_utf8_lossy(&output.stdout); - let path = PathBuf::from(path_str.trim()); - - // If this is a path that actually exists, then we assume that it's `uv` and we can - // continue. - if path.exists() { - Some(path) - } else { - None - } - } else { - None - } + + None } async fn find_or_setup_uv() -> Result { @@ -94,8 +75,18 @@ async fn find_or_setup_uv() -> Result { let target = path.join("uv"); // Copy the `uv` binary into the default directory - std::fs::copy(&exe, &target) - .map_err(|e| Error::IoError(e))?; + tokio::fs::copy(&exe, &target) + .await?; + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mut perms = std::fs::metadata(&target)?.permissions(); + perms.set_mode(0o755); + std::fs::set_permissions(&target, perms)?; + } + + debug!("Copied UV binary from {:?} to {:?}", exe, target); Ok(target) } @@ -187,4 +178,11 @@ impl Uv { Ok(child) } + + pub async fn is_valid(&self) -> bool { + match test_uv_path(&self.uv_path).await { + Ok(_) => true, + Err(_) => false, + } + } } diff --git a/crates/tower-uv/tests/install_test.rs b/crates/tower-uv/tests/install_test.rs new file mode 100644 index 00000000..93511ec6 --- /dev/null +++ b/crates/tower-uv/tests/install_test.rs @@ -0,0 +1,15 @@ +use tower_uv::{ + install::get_default_uv_bin_dir, + Uv, +}; + +#[tokio::test] +async fn test_installing_uv() { + // Ensure there is no `uv` in the directory that we will install it to by default. + let default_uv_bin_dir = get_default_uv_bin_dir().unwrap(); + let _ = tokio::fs::remove_dir_all(&default_uv_bin_dir).await; + + // Now if we instantiate a Uv instance, it should install the `uv` bianry. + let uv = Uv::new().await.expect("Failed to create a Uv instance"); + assert!(uv.is_valid().await); +} From 92277c3073505cb2da3ad5cea4ad2fc5f28a9d8c Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 20 Jul 2025 16:46:24 +0200 Subject: [PATCH 08/15] chore: Only allow one UV setup to proceed at a time. --- crates/tower-uv/src/install.rs | 69 +++++++++++++++++++++++++++++++++- crates/tower-uv/src/lib.rs | 58 +--------------------------- 2 files changed, 70 insertions(+), 57 deletions(-) diff --git a/crates/tower-uv/src/install.rs b/crates/tower-uv/src/install.rs index 73945586..6a296adb 100644 --- a/crates/tower-uv/src/install.rs +++ b/crates/tower-uv/src/install.rs @@ -1,8 +1,12 @@ use std::env; use std::path::PathBuf; +use std::sync::OnceLock; use tokio_tar::Archive; -use tokio::process::Command; +use tokio::{ + sync::Mutex, + process::Command, +}; use async_compression::tokio::bufread::GzipDecoder; use async_zip::tokio::read::seek::ZipFileReader; use futures_lite::io::AsyncReadExt; @@ -12,8 +16,15 @@ use tower_telemetry::debug; // Copy the UV_VERSION locally to make this a bit more ergonomic. const UV_VERSION: &str = crate::UV_VERSION; +static GLOBAL_LOCK: OnceLock> = OnceLock::new(); + +fn get_global_lock() -> &'static Mutex<()> { + GLOBAL_LOCK.get_or_init(|| Mutex::new(())) +} + #[derive(Debug)] pub enum Error { + NotFound(String), UnsupportedPlatform, IoError(std::io::Error), Other(String), @@ -309,3 +320,59 @@ pub async fn download_uv_for_arch(path: &PathBuf) -> Result { debug!("Downloaded UV to: {:?}", path); Ok(path) } + +async fn find_uv_binary() -> Option { + if let Ok(default_path) = get_default_uv_bin_dir() { + // Check if the default path exists + if default_path.exists() { + let uv_path = default_path.join("uv"); + if uv_path.exists() { + return Some(uv_path); + } + } + } + + None +} + +pub async fn find_or_setup_uv() -> Result { + // We only allow setup in the process space at a given time. + let _ = get_global_lock().lock().await; + + // If we get here, uv wasn't found in PATH, so let's download it + if let Some(path) = find_uv_binary().await { + Ok(path) + } else { + let path = get_default_uv_bin_dir()?; + debug!("UV binary not found in PATH, setting up UV at {:?}", path); + + // Create the directory if it doesn't exist + std::fs::create_dir_all(&path).map_err(Error::IoError)?; + + let parent = path.parent() + .ok_or_else(|| Error::NotFound("Parent directory not found".to_string()))? + .to_path_buf(); + + // We download this code to the UV directory + let exe = download_uv_for_arch(&parent).await?; + + // Target is the UV binary we want. + let target = path.join("uv"); + + // Copy the `uv` binary into the default directory + tokio::fs::copy(&exe, &target) + .await?; + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mut perms = std::fs::metadata(&target)?.permissions(); + perms.set_mode(0o755); + std::fs::set_permissions(&target, perms)?; + } + + debug!("Copied UV binary from {:?} to {:?}", exe, target); + + Ok(target) + } +} diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 58574e46..11af2c9d 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -30,6 +30,7 @@ impl From for Error { impl From for Error { fn from(err: install::Error) -> Self { match err { + install::Error::NotFound(msg) => Error::NotFound(msg), install::Error::UnsupportedPlatform => Error::UnsupportedPlatform, install::Error::IoError(e) => Error::IoError(e), install::Error::Other(msg) => Error::Other(msg), @@ -37,61 +38,6 @@ impl From for Error { } } -async fn find_uv_binary() -> Option { - if let Ok(default_path) = install::get_default_uv_bin_dir() { - // Check if the default path exists - if default_path.exists() { - let uv_path = default_path.join("uv"); - if uv_path.exists() { - return Some(uv_path); - } - } - } - - None -} - -async fn find_or_setup_uv() -> Result { - // If FORCE_DOWNLOAD_UV is set, we will always download UV - // - // If we get here, uv wasn't found in PATH, so let's download it - if let Some(path) = find_uv_binary().await { - Ok(path) - } else { - let path = install::get_default_uv_bin_dir()?; - debug!("UV binary not found in PATH, setting up UV at {:?}", path); - - // Create the directory if it doesn't exist - std::fs::create_dir_all(&path).map_err(Error::IoError)?; - - let parent = path.parent() - .ok_or_else(|| Error::NotFound("Parent directory not found".to_string()))? - .to_path_buf(); - - // We download this code to the UV directory - let exe = install::download_uv_for_arch(&parent).await?; - - // Target is the UV binary we want. - let target = path.join("uv"); - - // Copy the `uv` binary into the default directory - tokio::fs::copy(&exe, &target) - .await?; - - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let mut perms = std::fs::metadata(&target)?.permissions(); - perms.set_mode(0o755); - std::fs::set_permissions(&target, perms)?; - } - - debug!("Copied UV binary from {:?} to {:?}", exe, target); - - Ok(target) - } -} - async fn test_uv_path(path: &PathBuf) -> Result<(), Error> { let res = Command::new(&path) .arg("--color") @@ -116,7 +62,7 @@ pub struct Uv { impl Uv { pub async fn new() -> Result { - let uv_path = find_or_setup_uv().await?; + let uv_path = install::find_or_setup_uv().await?; test_uv_path(&uv_path).await?; Ok(Uv { uv_path }) } From c3034023cb2b8d43fc0ef953a0678570f7093535 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 20 Jul 2025 16:47:06 +0200 Subject: [PATCH 09/15] chore: Allow tests to proceed concurrently --- crates/tower-runtime/tests/local_test.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index 6b26b2ba..d36c226b 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -35,7 +35,7 @@ async fn build_package_from_dir(dir: &PathBuf) -> Package { package } -#[tokio::test(flavor = "current_thread")] +#[tokio::test] async fn test_running_hello_world() { tower_telemetry::enable_logging( tower_telemetry::LogLevel::Debug, @@ -77,7 +77,7 @@ async fn test_running_hello_world() { assert!(status == Status::Exited, "App should be running"); } -#[tokio::test(flavor = "current_thread")] +#[tokio::test] async fn test_running_use_faker() { debug!("Running 02-use-faker"); // This test is a simple test that outputs some text to the console; however, this time it has From 288009f833d98bee5ee8472f8424203ab1466c79 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 20 Jul 2025 16:59:02 +0200 Subject: [PATCH 10/15] chore: We need to actually hold the lock --- crates/tower-runtime/tests/local_test.rs | 2 +- crates/tower-uv/src/install.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index d36c226b..2ea2ee74 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -121,7 +121,7 @@ async fn test_running_use_faker() { } assert!(count_setup > 0, "There should be some setup output"); - assert!(count_stdout == 1, "There should be exactly one stdout output"); + assert!(count_stdout > 0, "should be more than one output"); // check the status once more, should be done. let status = app.status().await.expect("Failed to get app status"); diff --git a/crates/tower-uv/src/install.rs b/crates/tower-uv/src/install.rs index 6a296adb..d3c052c0 100644 --- a/crates/tower-uv/src/install.rs +++ b/crates/tower-uv/src/install.rs @@ -337,10 +337,11 @@ async fn find_uv_binary() -> Option { pub async fn find_or_setup_uv() -> Result { // We only allow setup in the process space at a given time. - let _ = get_global_lock().lock().await; + let _guard = get_global_lock().lock().await; // If we get here, uv wasn't found in PATH, so let's download it if let Some(path) = find_uv_binary().await { + debug!("UV binary found at {:?}", path); Ok(path) } else { let path = get_default_uv_bin_dir()?; From 24eeedc3660e27fd3e8de8dc1896b053a467bf73 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 20 Jul 2025 17:03:14 +0200 Subject: [PATCH 11/15] chore: Remove debug output for now --- .github/workflows/test-rust.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/test-rust.yml b/.github/workflows/test-rust.yml index 044d7125..a03ea32f 100644 --- a/.github/workflows/test-rust.yml +++ b/.github/workflows/test-rust.yml @@ -48,8 +48,6 @@ jobs: if: github.ref_name != 'main' run: > cargo test --all-features - env: - RUST_LOG: debug - run: bash .github/check-for-naughty-dependencies.sh From 76924e017cb3ed77003532db56c019aef842d180 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 21 Jul 2025 13:09:37 +0200 Subject: [PATCH 12/15] chore: Add support for installing apps that use requirements.txt --- crates/tower-runtime/src/local.rs | 21 ++++++- .../example-apps/03-legacy-app/Towerfile | 3 + .../03-legacy-app/requirements.txt | 2 + .../tests/example-apps/03-legacy-app/task.py | 2 + crates/tower-runtime/tests/local_test.rs | 59 ++++++++++++++++++- crates/tower-uv/src/lib.rs | 59 +++++++++++++------ 6 files changed, 124 insertions(+), 22 deletions(-) create mode 100644 crates/tower-runtime/tests/example-apps/03-legacy-app/Towerfile create mode 100644 crates/tower-runtime/tests/example-apps/03-legacy-app/requirements.txt create mode 100644 crates/tower-runtime/tests/example-apps/03-legacy-app/task.py diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 4fb9e54e..56977443 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -186,6 +186,26 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender, cancel_ // Now we also need to find the program to execute. let program_path = working_dir.join(&manifest.invoke); + // Quickly do a check to see if there was a cancellation before we do a subprocess spawn to + // ensure everything is in place. + if cancel_token.is_cancelled() { + // again tell any waiters that we cancelled. + let _ = sx.send(-1); + return Err(Error::Cancelled); + } + + let mut child = uv.venv(&working_dir, &env_vars).await?; + + // Drain the logs to the output channel. + let stdout = child.stdout.take().expect("no stdout"); + tokio::spawn(drain_output(FD::Stdout, Channel::Setup, opts.output_sender.clone(), BufReader::new(stdout))); + + let stderr = child.stderr.take().expect("no stderr"); + tokio::spawn(drain_output(FD::Stderr, Channel::Setup, opts.output_sender.clone(), BufReader::new(stderr))); + + // Wait for venv to finish up. + wait_for_process(ctx.clone(), &cancel_token, child).await; + // Check once more if the process was cancelled before we do a uv sync. The sync itself, // once started, will take a while and we have logic for checking for cancellation. if cancel_token.is_cancelled() { @@ -434,7 +454,6 @@ async fn drain_output(fd: FD, channel: Channel, output: Ou } } - fn is_bash_package(package: &Package) -> bool { return package.manifest.invoke.ends_with(".sh") } diff --git a/crates/tower-runtime/tests/example-apps/03-legacy-app/Towerfile b/crates/tower-runtime/tests/example-apps/03-legacy-app/Towerfile new file mode 100644 index 00000000..f2e741ef --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/03-legacy-app/Towerfile @@ -0,0 +1,3 @@ +[app] +name = "03-legacy-app" +script = "task.py" diff --git a/crates/tower-runtime/tests/example-apps/03-legacy-app/requirements.txt b/crates/tower-runtime/tests/example-apps/03-legacy-app/requirements.txt new file mode 100644 index 00000000..79b039ba --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/03-legacy-app/requirements.txt @@ -0,0 +1,2 @@ +dlt[snowflake,filesystem]==1.4.0 +pandas diff --git a/crates/tower-runtime/tests/example-apps/03-legacy-app/task.py b/crates/tower-runtime/tests/example-apps/03-legacy-app/task.py new file mode 100644 index 00000000..2005385d --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/03-legacy-app/task.py @@ -0,0 +1,2 @@ +import dlt +print(dlt.version.__version__) diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index 2ea2ee74..8cccc9ed 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -68,8 +68,12 @@ async fn test_running_hello_world() { assert!(status == Status::Running, "App should be running"); while let Some(output) = receiver.recv().await { - debug!("Received output: {:?}", output.line); - assert!(output.line.contains("Hello, world!"), "Log should contain 'Hello, world!'"); + let valid_line = output.line.contains("Hello, world!") || + output.line.contains("Using CPython") || + output.line.contains("Creating virtual environment") || + output.line.contains("Activate with"); + + assert!(valid_line, "Log should contain 'Hello, world!' or a setup line"); } // check the status once more, should be done. @@ -127,3 +131,54 @@ async fn test_running_use_faker() { let status = app.status().await.expect("Failed to get app status"); assert!(status == Status::Exited, "App should be running"); } + +#[tokio::test] +async fn test_running_legacy_app() { + debug!("Running 03-legacy-app"); + // This test is a simple test that outputs some text to the console; however, this time it has + // a dependency defined in pyproject.toml, which means that it'll have to do a uv sync first. + let legacy_app_dir = get_example_app_dir("03-legacy-app"); + let package = build_package_from_dir(&legacy_app_dir).await; + let (sender, mut receiver) = create_output_stream(); + + // We need to create the package, which will load the app + let opts = StartOptions{ + ctx: tower_telemetry::Context::new(), + package, + output_sender: sender, + cwd: None, + environment: "local".to_string(), + secrets: HashMap::new(), + parameters: HashMap::new(), + env_vars: HashMap::new(), + }; + + // Start the app using the LocalApp runtime + let app = LocalApp::start(opts).await.expect("Failed to start app"); + + // The status should be running + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Running, "App should be running"); + + let mut count_setup = 0; + let mut count_stdout = 0; + + while let Some(output) = receiver.recv().await { + debug!("Received output: {:?}", output.line); + match output.channel { + tower_runtime::Channel::Setup => { + count_setup += 1; + }, + tower_runtime::Channel::Program => { + count_stdout += 1; + } + } + } + + assert!(count_setup > 0, "There should be some setup output"); + assert!(count_stdout > 0, "should be more than one output"); + + // check the status once more, should be done. + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Exited, "App should be running"); +} diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 11af2c9d..99b7bbc7 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -83,27 +83,48 @@ impl Uv { } pub async fn sync(&self, cwd: &PathBuf, env_vars: &HashMap) -> Result { - // Make sure there's a pyproject.toml in the cwd. If there isn't one, then we don't want - // to do this otherwise uv will return an error on the CLI! - if !cwd.join("pyproject.toml").exists() { - return Err(Error::MissingPyprojectToml); - } + // We need to figure out which sync strategy to apply. If there is a pyproject.toml, then + // that's easy. + if cwd.join("pyproject.toml").exists() { + debug!("Executing UV ({:?}) sync in {:?}", &self.uv_path, cwd); + let child = Command::new(&self.uv_path) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(cwd) + .arg("--color") + .arg("never") + .arg("--no-progress") + .arg("sync") + .envs(env_vars) + .spawn()?; + + Ok(child) + } else if cwd.join("requirements.txt").exists() { + debug!("Executing UV ({:?}) sync with requirements in {:?}", &self.uv_path, cwd); + + // If there is a requirements.txt, then we can use that to sync. + let child = Command::new(&self.uv_path) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(cwd) + .arg("--color") + .arg("never") + .arg("pip") + .arg("install") + .arg("-r") + .arg(cwd.join("requirements.txt")) + .envs(env_vars) + .spawn()?; + + Ok(child) + } else { + // If there is no pyproject.toml or requirements.txt, then we can't sync. + Err(Error::MissingPyprojectToml) + } - debug!("Executing UV ({:?}) sync in {:?}", &self.uv_path, cwd); - let child = Command::new(&self.uv_path) - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(cwd) - .arg("--color") - .arg("never") - .arg("--no-progress") - .arg("sync") - .envs(env_vars) - .spawn()?; - - Ok(child) } pub async fn run(&self, cwd: &PathBuf, program: &PathBuf, env_vars: &HashMap) -> Result { From 3cef82ac177bd6996b642b53d42b8489bbd186ab Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 21 Jul 2025 15:27:49 +0200 Subject: [PATCH 13/15] Update crates/tower-uv/tests/install_test.rs Co-authored-by: Ben Lovell --- crates/tower-uv/tests/install_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/tower-uv/tests/install_test.rs b/crates/tower-uv/tests/install_test.rs index 93511ec6..82fe493c 100644 --- a/crates/tower-uv/tests/install_test.rs +++ b/crates/tower-uv/tests/install_test.rs @@ -9,7 +9,7 @@ async fn test_installing_uv() { let default_uv_bin_dir = get_default_uv_bin_dir().unwrap(); let _ = tokio::fs::remove_dir_all(&default_uv_bin_dir).await; - // Now if we instantiate a Uv instance, it should install the `uv` bianry. + // Now if we instantiate a Uv instance, it should install the `uv` binary. let uv = Uv::new().await.expect("Failed to create a Uv instance"); assert!(uv.is_valid().await); } From 39b062d43a81faab59272e798029b6f50d6142e4 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 21 Jul 2025 15:28:25 +0200 Subject: [PATCH 14/15] chore: Feedback from @socksy --- crates/tower-uv/src/lib.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 99b7bbc7..1ebf0aee 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -147,9 +147,6 @@ impl Uv { } pub async fn is_valid(&self) -> bool { - match test_uv_path(&self.uv_path).await { - Ok(_) => true, - Err(_) => false, - } + test_uv_path(&self.uv_path).await.is_ok() } } From f04a4ddbc0187c00bf12899f826285a59ec945ed Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 21 Jul 2025 15:36:39 +0200 Subject: [PATCH 15/15] chore: More feedback from @socksy --- crates/tower-cmd/src/run.rs | 4 +++- crates/tower-runtime/src/lib.rs | 6 ------ crates/tower-runtime/tests/local_test.rs | 9 +++++---- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/crates/tower-cmd/src/run.rs b/crates/tower-cmd/src/run.rs index 08f2e7b0..f1f72681 100644 --- a/crates/tower-cmd/src/run.rs +++ b/crates/tower-cmd/src/run.rs @@ -6,6 +6,8 @@ use tower_package::{Package, PackageSpec}; use tower_runtime::{local::LocalApp, App, AppLauncher, OutputReceiver}; use tower_telemetry::{Context, debug}; +use tokio::sync::mpsc::unbounded_channel; + use crate::{ output, api, @@ -133,7 +135,7 @@ async fn do_run_local(config: Config, path: PathBuf, env: &str, mut params: Hash std::process::exit(1); } - let (sender, receiver) = tower_runtime::create_output_stream(); + let (sender, receiver) = unbounded_channel(); output::success(&format!("Launching app `{}`", towerfile.app.name)); let output_task = tokio::spawn(monitor_output(receiver)); diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 00f7da98..bae6807a 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -4,7 +4,6 @@ use std::collections::HashMap; use tokio::sync::mpsc::{ UnboundedReceiver, UnboundedSender, - unbounded_channel, }; use chrono::{DateTime, Utc}; @@ -75,11 +74,6 @@ impl std::default::Default for AppLauncher { } } -pub fn create_output_stream() -> (OutputSender, OutputReceiver) { - let (sender, receiver) = unbounded_channel::(); - (sender, receiver) -} - impl AppLauncher { pub async fn launch( &mut self, diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index 8cccc9ed..8457d4ad 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -5,7 +5,6 @@ use tower_runtime::{ App, StartOptions, Status, - create_output_stream, local::LocalApp, }; @@ -13,6 +12,8 @@ use config::Towerfile; use tower_package::{Package, PackageSpec}; use tower_telemetry::{self, debug}; +use tokio::sync::mpsc::unbounded_channel; + fn get_example_app_dir(name: &str) -> PathBuf { // This is where the root of the app lives. let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); @@ -46,7 +47,7 @@ async fn test_running_hello_world() { debug!("Running 01-hello-world"); let hello_world_dir = get_example_app_dir("01-hello-world"); let package = build_package_from_dir(&hello_world_dir).await; - let (sender, mut receiver) = create_output_stream(); + let (sender, mut receiver) = unbounded_channel(); // We need to create the package, which will load the app let opts = StartOptions{ @@ -88,7 +89,7 @@ async fn test_running_use_faker() { // a dependency defined in pyproject.toml, which means that it'll have to do a uv sync first. let use_faker_dir = get_example_app_dir("02-use-faker"); let package = build_package_from_dir(&use_faker_dir).await; - let (sender, mut receiver) = create_output_stream(); + let (sender, mut receiver) = unbounded_channel(); // We need to create the package, which will load the app let opts = StartOptions{ @@ -139,7 +140,7 @@ async fn test_running_legacy_app() { // a dependency defined in pyproject.toml, which means that it'll have to do a uv sync first. let legacy_app_dir = get_example_app_dir("03-legacy-app"); let package = build_package_from_dir(&legacy_app_dir).await; - let (sender, mut receiver) = create_output_stream(); + let (sender, mut receiver) = unbounded_channel(); // We need to create the package, which will load the app let opts = StartOptions{