From 620d2197940cceeca2ef0cf5fc6b89c1bf4ae3ba Mon Sep 17 00:00:00 2001 From: Ben Lovell Date: Fri, 4 Jul 2025 16:00:16 +0200 Subject: [PATCH 1/6] Prevent accidental merges directly to `main` with same script as in main repo --- .github/workflows/pr-base-check.yaml | 34 ++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 .github/workflows/pr-base-check.yaml diff --git a/.github/workflows/pr-base-check.yaml b/.github/workflows/pr-base-check.yaml new file mode 100644 index 00000000..c72e111f --- /dev/null +++ b/.github/workflows/pr-base-check.yaml @@ -0,0 +1,34 @@ +name: PR Base Branch Check + +on: + pull_request: + types: [opened, edited, synchronize] + +jobs: + check-base-branch: + name: Check PR Base Branch + runs-on: ubuntu-latest + steps: + - name: Check if PR targets main instead of develop + if: github.event.pull_request.base.ref == 'main' && github.event.pull_request.head.ref != 'develop' + uses: actions/github-script@v7 + with: + script: | + const message = `⚠️ **WARNING: This PR targets \`main\` instead of \`develop\`** + + This PR is targeting \`main\` which will trigger a production deployment when merged. + + If this is a regular feature/fix PR, please change the base branch to \`develop\`. + If this is intentional (e.g., hotfix), you can ignore this warning. + + Current base: \`${context.payload.pull_request.base.ref}\` + Recommended base: \`develop\``; + + await github.rest.issues.createComment({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: context.payload.pull_request.number, + body: message + }); + + core.warning('PR targets main branch - please verify this is intentional'); \ No newline at end of file From ed0ec087181f179edb3363286193a48033141fa7 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Fri, 18 Jul 2025 15:28:51 +0200 Subject: [PATCH 2/6] Use `uv` to manage Python runtime and state (#66) * feat: Integrate `uv` into `tower-runtime` * chore: Plumb in `tower-uv` to `tower-runtime` * chore: Tuning to make `tower-runtime` work a bit easier * chore: Run the whole execution process in a thread * chore: Fix some issues from rebase * chore: Remove some build warnings and check for cancellation * chore: Clean up some formatting. * chore: Some final reintegration tests and/or cleanups. * chore: Only use our package working dir when required. * chore: Move our UV version * chore: Add a CI step to check for OpenSSL * chore: Slightly different syntax for cmdlet * chore: Naught dependency detection, now with 100% more Claude * chore: Shell needs to be bash! Of course! * chore: Extract script into file --- .github/check-for-naughty-dependencies.sh | 13 + .github/workflows/test-rust.yml | 2 + Cargo.lock | 79 +++- Cargo.toml | 3 + crates/tower-runtime/Cargo.toml | 2 + crates/tower-runtime/src/errors.rs | 14 + crates/tower-runtime/src/local.rs | 486 +++++++++------------- crates/tower-uv/Cargo.toml | 16 + crates/tower-uv/src/install.rs | 308 ++++++++++++++ crates/tower-uv/src/lib.rs | 158 +++++++ 10 files changed, 791 insertions(+), 290 deletions(-) create mode 100644 .github/check-for-naughty-dependencies.sh create mode 100644 crates/tower-uv/Cargo.toml create mode 100644 crates/tower-uv/src/install.rs create mode 100644 crates/tower-uv/src/lib.rs diff --git a/.github/check-for-naughty-dependencies.sh b/.github/check-for-naughty-dependencies.sh new file mode 100644 index 00000000..2e4d9632 --- /dev/null +++ b/.github/check-for-naughty-dependencies.sh @@ -0,0 +1,13 @@ +echo "Checking for openssl-sys in dependency tree..." +if cargo tree -i openssl-sys >/dev/null 2>&1; then + echo "openssl-sys is present in the dependency tree. Please evict it." + exit 1 +fi + +echo "Checking for native-tls in dependency tree..." +if cargo tree -i native-tls >/dev/null 2>&1; then + echo "native-tls is present in the dependency tree. Please evict it." + exit 1 +fi + +echo "✅ No naughty dependencies found" diff --git a/.github/workflows/test-rust.yml b/.github/workflows/test-rust.yml index 8268147f..044d7125 100644 --- a/.github/workflows/test-rust.yml +++ b/.github/workflows/test-rust.yml @@ -51,6 +51,8 @@ jobs: env: RUST_LOG: debug + - run: bash .github/check-for-naughty-dependencies.sh + # integration-test: # runs-on: ${{ matrix.os }} # strategy: diff --git a/Cargo.lock b/Cargo.lock index 2e545ea4..6e25f5b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,11 +140,27 @@ checksum = "59a194f9d963d8099596278594b3107448656ba73831c9d8c783e613ce86da64" dependencies = [ "flate2", "futures-core", + "futures-io", "memchr", "pin-project-lite", "tokio", ] +[[package]] +name = "async_zip" +version = "0.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527207465fb6dcafbf661b0d4a51d0d2306c9d0c2975423079a6caa807930daf" +dependencies = [ + "async-compression", + "crc32fast", + "futures-lite", + "pin-project", + "thiserror 1.0.69", + "tokio", + "tokio-util", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -675,6 +691,12 @@ dependencies = [ "str-buf", ] +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "fd-lock" version = "3.0.13" @@ -771,6 +793,19 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.31" @@ -1563,6 +1598,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" @@ -1611,6 +1652,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1747,7 +1808,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2648,6 +2709,7 @@ checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -2787,8 +2849,10 @@ dependencies = [ "chrono", "snafu", "tokio", + "tokio-util", "tower-package", "tower-telemetry", + "tower-uv", ] [[package]] @@ -2806,6 +2870,19 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "tower-uv" +version = "0.3.20" +dependencies = [ + "async-compression", + "async_zip", + "futures-lite", + "reqwest", + "tokio", + "tokio-tar", + "tower-telemetry", +] + [[package]] name = "tower-version" version = "0.3.20" diff --git a/Cargo.toml b/Cargo.toml index 151b0644..116d1a0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ repository = "https://github.com/tower/tower-cli" aes-gcm = "0.10" anyhow = "1.0.95" async-compression = { version = "0.4", features = ["tokio", "gzip"] } +async_zip = { version = "0.0.16", features = ["tokio", "tokio-fs", "deflate"] } base64 = "0.22" bytes = "1" chrono = { version = "0.4", features = ["serde"] } @@ -26,6 +27,7 @@ crypto = { path = "crates/crypto" } dirs = "5" futures = "0.3" futures-util = "0.3" +futures-lite = "2.6" glob = "0.3" http = "1.1" indicatif = "0.17" @@ -53,6 +55,7 @@ tower-cmd = { path = "crates/tower-cmd" } tower-package = { path = "crates/tower-package" } tower-runtime = { path = "crates/tower-runtime" } tower-telemetry = { path = "crates/tower-telemetry" } +tower-uv = { path = "crates/tower-uv" } tracing = { version = "0.1" } tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] } diff --git a/crates/tower-runtime/Cargo.toml b/crates/tower-runtime/Cargo.toml index 184a5be6..99770ec3 100644 --- a/crates/tower-runtime/Cargo.toml +++ b/crates/tower-runtime/Cargo.toml @@ -9,6 +9,8 @@ license = { workspace = true } [dependencies] chrono = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } snafu = { workspace = true } tower-package = { workspace = true } tower-telemetry = { workspace = true } +tower-uv = { workspace = true } diff --git a/crates/tower-runtime/src/errors.rs b/crates/tower-runtime/src/errors.rs index 78ac4785..5162fc6d 100644 --- a/crates/tower-runtime/src/errors.rs +++ b/crates/tower-runtime/src/errors.rs @@ -61,6 +61,9 @@ pub enum Error { #[snafu(display("running Tower apps on this platform is not supported"))] UnsupportedPlatform, + + #[snafu(display("cancelled"))] + Cancelled, } impl From for Error { @@ -74,3 +77,14 @@ impl From for Error { Error::UnsupportedPlatform } } + +impl From for Error { + fn from(err: tower_uv::Error) -> Self { + match err { + tower_uv::Error::IoError(_) => Error::SpawnFailed, + tower_uv::Error::NotFound(_) => Error::SpawnFailed, + tower_uv::Error::PermissionDenied(_) => Error::SpawnFailed, + tower_uv::Error::Other(_) => Error::SpawnFailed, + } + } +} diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 9a82efd4..76c16124 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -1,7 +1,6 @@ -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::env; use std::process::Stdio; -use std::sync::Arc; use std::collections::HashMap; #[cfg(unix)] @@ -17,15 +16,23 @@ use crate::{ use tokio::{ fs, io::{AsyncRead, BufReader, AsyncBufReadExt}, - time::{timeout, Duration}, - sync::Mutex, process::{Child, Command}, - sync::oneshot, - sync::oneshot::error::TryRecvError, + sync::{ + Mutex, + oneshot::{ + self, + error::TryRecvError, + }, + }, + task::JoinHandle, + time::{timeout, Duration}, }; +use tokio_util::sync::CancellationToken; + use tower_package::{Manifest, Package}; use tower_telemetry::debug; +use tower_uv::Uv; use crate::{ FD, @@ -35,17 +42,17 @@ use crate::{ }; pub struct LocalApp { - ctx: tower_telemetry::Context, - - // LocalApp needs to take ownership of the package as a way of taking responsibility for it's - // lifetime and, most importantly, it's contents. The compiler complains that we never actually - // use this struct member, so we allow the dead_code attribute to silence the warning. - #[allow(dead_code)] - package: Option, - - child: Option>>, status: Mutex>, + + // waiter is what we use to communicate that the overall process is finished by the execution + // handle. waiter: Mutex>, + + // terminator is what we use to flag that we want to terminate the child process. + terminator: Mutex, + + // execute_handle keeps track of the current state of the execution lifecycle. + execute_handle: Option>>, } // Helper function to check if a file is executable @@ -90,32 +97,6 @@ async fn find_executable_in_path(executable_name: &str) -> Option { None } -async fn find_pip(dir: PathBuf) -> Result { - if let Some(path) = find_executable_in_path_buf("pip", dir).await { - Ok(path) - } else { - Err(Error::MissingPip) - } -} - -async fn find_python(dir: Option) -> Result { - if let Some(dir) = dir { - // find a local python - if let Some(path) = find_executable_in_path_buf("python", dir).await { - Ok(path) - } else { - Err(Error::MissingPython) - } - } else { - // find the system installed python - if let Some(path) = find_executable_in_path("python").await { - Ok(path) - } else { - Err(Error::MissingPython) - } - } -} - async fn find_bash() -> Result { if let Some(path) = find_executable_in_path("bash").await { Ok(path) @@ -124,168 +105,152 @@ async fn find_bash() -> Result { } } +async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender, cancel_token: CancellationToken) -> Result<(), Error> { + let ctx = opts.ctx.clone(); + let package = opts.package; + let environment = opts.environment; + let package_path = package.unpacked_path + .clone() + .unwrap() + .to_path_buf(); + + // set for later on. + let working_dir = if package.manifest.version == Some(2) { + package_path.join(&package.manifest.app_dir_name) + } else { + package_path.to_path_buf() + }; -impl App for LocalApp { - async fn start(opts: StartOptions) -> Result { - let ctx = opts.ctx.clone(); - let package = opts.package; - let environment = opts.environment; - debug!(ctx: &ctx, "executing app with version {:?}", package.manifest.version); - - // This is the base path of where the package was unpacked. - let package_path = package.unpacked_path - .clone() - .unwrap() - .to_path_buf(); - - // We'll need the Python path for later on. - let mut python_path = find_python(None).await?; - debug!(ctx: &ctx, "using system python at {:?}", python_path); - - // set for later on. - let working_dir = if package.manifest.version == Some(2) { - package_path.join(&package.manifest.app_dir_name) - } else { - opts.cwd.unwrap_or(package_path.to_path_buf()) - }; - - let mut is_virtualenv = false; - - if Path::new(&working_dir.join("requirements.txt")).exists() { - debug!(ctx: &ctx, "requirements.txt file found. installing dependencies"); - - // There's a requirements.txt, so we'll create a new virtualenv and install the files - // taht we want in there. - let res = Command::new(python_path) - .current_dir(&working_dir) - .arg("-m") - .arg("venv") - .arg(".venv") - .kill_on_drop(true) - .spawn(); - - if let Ok(mut child) = res { - // Wait for the child to complete entirely. - child.wait().await.expect("child failed to exit"); - } else { - return Err(Error::VirtualEnvCreationFailed); - } + debug!(ctx: &ctx, " - working directory: {:?}", &working_dir); - let pip_path = find_pip(working_dir.join(".venv").join("bin")).await?; + let manifest = &package.manifest; + let secrets = opts.secrets; + let params = opts.parameters; + let mut other_env_vars = opts.env_vars; - // We need to update our local python, too - // - // TODO: Find a better way to operate in the context of a virtual env here. - python_path = find_python(Some(working_dir.join(".venv").join("bin"))).await?; - debug!(ctx: &ctx, "using virtualenv python at {:?}", python_path); + if !package.manifest.import_paths.is_empty() { + debug!(ctx: &ctx, "adding import paths to PYTHONPATH: {:?}", package.manifest.import_paths); - is_virtualenv = true; + let import_paths = package.manifest.import_paths + .iter() + .map(|p| package_path.join(p)) + .collect::>(); - let res = Command::new(pip_path) - .current_dir(&working_dir) - .arg("install") - .arg("-r") - .arg(working_dir.join("requirements.txt")) - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .kill_on_drop(true) - .spawn(); + let import_paths = std::env::join_paths(import_paths)? + .to_string_lossy() + .to_string(); - if let Ok(mut child) = res { - if let Some(ref sender) = opts.output_sender { - // Let's also send our logs to this output channel. - let stdout = child.stdout.take().expect("no stdout"); - tokio::spawn(drain_output(FD::Stdout, Channel::Setup, sender.clone(), BufReader::new(stdout))); + if other_env_vars.contains_key("PYTHONPATH") { + // If we already have a PYTHONPATH, we need to append to it. + let existing = other_env_vars.get("PYTHONPATH").unwrap(); + let pythonpath = std::env::join_paths(vec![existing, &import_paths])? + .to_string_lossy() + .to_string(); - let stderr = child.stderr.take().expect("no stderr"); - tokio::spawn(drain_output(FD::Stderr, Channel::Setup, sender.clone(), BufReader::new(stderr))); + other_env_vars.insert("PYTHONPATH".to_string(), pythonpath); + } else { + // Otherwise, we just set it. + other_env_vars.insert("PYTHONPATH".to_string(), import_paths); + } + } - } + // We insert these checks for cancellation along the way to see if the process was + // terminated by someone. + // + // We do this before instantiating `Uv` because that can be somewhat time consuming. Likewise + // this stops us from instantiating a bash process. + if cancel_token.is_cancelled() { + // if there's a waiter, we want them to know that the process was cancelled so we have + // to return something on the relevant channel. + let _ = sx.send(-1); + return Err(Error::Cancelled); + } - debug!(ctx: &ctx, "waiting for dependency installation to complete"); + if is_bash_package(&package) { + let child = execute_bash_program( + &ctx, + &environment, + working_dir, + package_path, + &manifest, + secrets, + params, + other_env_vars, + ).await?; + + let _ = sx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); + } else { + let uv = Uv::new().await?; + let env_vars = make_env_vars(&ctx, &environment, &package_path, &secrets, ¶ms, &other_env_vars); + + // Now we also need to find the program to execute. + let program_path = working_dir.join(&manifest.invoke); + + // Check once more if the process was cancelled before we do a uv sync. The sync itself, + // once started, will take a while and we have logic for checking for cancellation. + if cancel_token.is_cancelled() { + // again tell any waiters that we cancelled. + let _ = sx.send(-1); + return Err(Error::Cancelled); + } - // Wait for the child to complete entirely. - child.wait().await.expect("child failed to exit"); - } - } else { - debug!(ctx: &ctx, "missing requirements.txt file found. no dependencies to install"); + let mut child = uv.sync(&working_dir, &env_vars).await?; + + // Drain the logs to the output channel. + if let Some(ref sender) = opts.output_sender { + let stdout = child.stdout.take().expect("no stdout"); + tokio::spawn(drain_output(FD::Stdout, Channel::Setup, sender.clone(), BufReader::new(stdout))); + + let stderr = child.stderr.take().expect("no stderr"); + tokio::spawn(drain_output(FD::Stderr, Channel::Setup, sender.clone(), BufReader::new(stderr))); } - debug!(ctx: &ctx, " - working directory: {:?}", &working_dir); + // Let's wait for the setup to finish. We don't care about the results. + wait_for_process(ctx.clone(), &cancel_token, child).await; - let res = if package.manifest.invoke.ends_with(".sh") { - let manifest = &package.manifest; - let secrets = opts.secrets; - let params= opts.parameters; - let other_env_vars = opts.env_vars; + // Check once more to see if the process was cancelled, this will bail us out early. + if cancel_token.is_cancelled() { + // if there's a waiter, we want them to know that the process was cancelled so we have + // to return something on the relevant channel. + let _ = sx.send(-1); + return Err(Error::Cancelled); + } - Self::execute_bash_program(&ctx, &environment, working_dir, is_virtualenv, package_path, &manifest, secrets, params, other_env_vars).await - } else { - let manifest = &package.manifest; - let secrets = opts.secrets; - let params= opts.parameters; - let mut other_env_vars = opts.env_vars; - - if !package.manifest.import_paths.is_empty() { - debug!(ctx: &ctx, "adding import paths to PYTHONPATH: {:?}", package.manifest.import_paths); - - let import_paths = package.manifest.import_paths - .iter() - .map(|p| package_path.join(p)) - .collect::>(); - - let import_paths = std::env::join_paths(import_paths)? - .to_string_lossy() - .to_string(); - - if other_env_vars.contains_key("PYTHONPATH") { - // If we already have a PYTHONPATH, we need to append to it. - let existing = other_env_vars.get("PYTHONPATH").unwrap(); - let pythonpath = std::env::join_paths(vec![existing, &import_paths])? - .to_string_lossy() - .to_string(); - - other_env_vars.insert("PYTHONPATH".to_string(), pythonpath); - } else { - // Otherwise, we just set it. - other_env_vars.insert("PYTHONPATH".to_string(), import_paths); - } - } + let mut child = uv.run(&working_dir, &program_path, &env_vars).await?; - // We need to resolve the program - let program_path = working_dir.join(manifest.invoke.clone()); + // Drain the logs to the output channel. + if let Some(ref sender) = opts.output_sender { + let stdout = child.stdout.take().expect("no stdout"); + tokio::spawn(drain_output(FD::Stdout, Channel::Program, sender.clone(), BufReader::new(stdout))); - Self::execute_python_program(&ctx, &environment, working_dir, is_virtualenv, python_path, program_path, &manifest, secrets, params, other_env_vars).await - }; + let stderr = child.stderr.take().expect("no stderr"); + tokio::spawn(drain_output(FD::Stderr, Channel::Program, sender.clone(), BufReader::new(stderr))); + } - if let Ok(mut child) = res { - if let Some(ref sender) = opts.output_sender { - // Let's also send our logs to this output channel. - let stdout = child.stdout.take().expect("no stdout"); - tokio::spawn(drain_output(FD::Stdout, Channel::Setup, sender.clone(), BufReader::new(stdout))); + let _ = sx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); + } - let stderr = child.stderr.take().expect("no stderr"); - tokio::spawn(drain_output(FD::Stderr, Channel::Setup, sender.clone(), BufReader::new(stderr))); + // Everything was properly executed I suppose. + return Ok(()) +} - } +impl App for LocalApp { + async fn start(opts: StartOptions) -> Result { + let cancel_token = CancellationToken::new(); + let terminator = Mutex::new(cancel_token.clone()); - let child = Arc::new(Mutex::new(child)); - let (sx, rx) = oneshot::channel::(); + let (sx, rx) = oneshot::channel::(); + let waiter = Mutex::new(rx); - tokio::spawn(wait_for_process(ctx.clone(), sx, Arc::clone(&child))); + let handle = tokio::spawn(execute_local_app(opts, sx, cancel_token)); + let execute_handle = Some(handle); - Ok(Self { - ctx, - package: Some(package), - child: Some(child), - waiter: Mutex::new(rx), - status: Mutex::new(None), - }) - } else { - debug!(ctx: &ctx, "failed to spawn process: {}", res.err().unwrap()); - Err(Error::SpawnFailed) - } + Ok(Self { + execute_handle, + terminator, + waiter, + status: Mutex::new(None), + }) } async fn status(&self) -> Result { @@ -316,90 +281,45 @@ impl App for LocalApp { } async fn terminate(&mut self) -> Result<(), Error> { - if let Some(proc) = &mut self.child { - let mut child = proc.lock().await; + let terminator = self.terminator.lock().await; + terminator.cancel(); - if let Err(err) = child.kill().await { - debug!(ctx: &self.ctx, "failed to terminate app: {}", err); - Err(Error::TerminateFailed) - } else { - Ok(()) - } - } else { - // Nothing to terminate. Should this be an error? - Ok(()) - } - } -} + // Now we should wait for the join handle to finish. + if let Some(execute_handle) = self.execute_handle.take() { + let _ = execute_handle.await; + self.execute_handle = None; + } -impl LocalApp { - async fn execute_python_program( - ctx: &tower_telemetry::Context, - env: &str, - cwd: PathBuf, - is_virtualenv: bool, - python_path: PathBuf, - program_path: PathBuf, - manifest: &Manifest, - secrets: HashMap, - params: HashMap, - other_env_vars: HashMap, - ) -> Result { - let env_vars = make_env_vars( - &ctx, - env, - &cwd, - is_virtualenv, - &secrets, - ¶ms, - &other_env_vars, - ); - - debug!(ctx: &ctx, " - python script {}", manifest.invoke); - debug!(ctx: &ctx, " - python path {}", env_vars.get("PYTHONPATH").unwrap()); - - let child = Command::new(python_path) - .current_dir(&cwd) - .arg("-u") - .arg(program_path) - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .envs(env_vars) - .kill_on_drop(true) - .spawn()?; - - Ok(child) + Ok(()) } +} - async fn execute_bash_program( - ctx: &tower_telemetry::Context, - env: &str, - cwd: PathBuf, - is_virtualenv: bool, - package_path: PathBuf, - manifest: &Manifest, - secrets: HashMap, - params: HashMap, - other_env_vars: HashMap, - ) -> Result { - let bash_path = find_bash().await?; - debug!(ctx: &ctx, "using bash at {:?}", bash_path); - - debug!(ctx: &ctx, " - bash script {}", manifest.invoke); - - let child = Command::new(bash_path) - .current_dir(&cwd) - .arg(package_path.join(manifest.invoke.clone())) - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .envs(make_env_vars(&ctx, env, &cwd, is_virtualenv, &secrets, ¶ms, &other_env_vars)) - .kill_on_drop(true) - .spawn()?; - - Ok(child) - } +async fn execute_bash_program( + ctx: &tower_telemetry::Context, + env: &str, + cwd: PathBuf, + package_path: PathBuf, + manifest: &Manifest, + secrets: HashMap, + params: HashMap, + other_env_vars: HashMap, +) -> Result { + let bash_path = find_bash().await?; + debug!(ctx: &ctx, "using bash at {:?}", bash_path); + + debug!(ctx: &ctx, " - bash script {}", manifest.invoke); + + let child = Command::new(bash_path) + .current_dir(&cwd) + .arg(package_path.join(manifest.invoke.clone())) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .envs(make_env_vars(&ctx, env, &cwd, &secrets, ¶ms, &other_env_vars)) + .kill_on_drop(true) + .spawn()?; + + Ok(child) } fn make_env_var_key(src: &str) -> String { @@ -411,7 +331,7 @@ fn make_env_var_key(src: &str) -> String { } } -fn make_env_vars(ctx: &tower_telemetry::Context, env: &str, cwd: &PathBuf, is_virtualenv: bool, secs: &HashMap, params: &HashMap, other_env_vars: &HashMap) -> HashMap { +fn make_env_vars(ctx: &tower_telemetry::Context, env: &str, cwd: &PathBuf, secs: &HashMap, params: &HashMap, other_env_vars: &HashMap) -> HashMap { let mut res = HashMap::new(); debug!(ctx: &ctx, "converting {} env variables", (params.len() + secs.len())); @@ -431,28 +351,6 @@ fn make_env_vars(ctx: &tower_telemetry::Context, env: &str, cwd: &PathBuf, is_vi res.insert(key.to_string(), value.to_string()); } - // If we're in a virtual environment, we need to add the bin directory to the PATH so that we - // can find any executables that were installed there. - if is_virtualenv { - let venv_dir = cwd.join(".venv"); - let venv_path = venv_dir - .to_string_lossy() - .to_string(); - - let bin_path = venv_dir.join("bin") - .to_string_lossy() - .to_string(); - - if let Ok(path) = std::env::var("PATH") { - res.insert("PATH".to_string(), format!("{}:{}", bin_path, path)); - } else { - res.insert("PATH".to_string(), bin_path); - } - - // We also insert a VIRTUAL_ENV path such that we can - res.insert("VIRTUAL_ENV".to_string(), venv_path); - } - // We also need a PYTHONPATH that is set to the current working directory to help with the // dependency resolution problem at runtime. let pythonpath = cwd.to_string_lossy().to_string(); @@ -476,16 +374,22 @@ fn make_env_vars(ctx: &tower_telemetry::Context, env: &str, cwd: &PathBuf, is_vi res.insert("TOWER_ENVIRONMENT".to_string(), env.to_string()); } + res.insert("PYTHONUNBUFFERED".to_string(), "x".to_string()); + res } -async fn wait_for_process(ctx: tower_telemetry::Context, sx: oneshot::Sender, proc: Arc>) { +async fn wait_for_process(ctx: tower_telemetry::Context, cancel_token: &CancellationToken, mut child: Child) -> i32 { let code = loop { - let mut child = proc.lock().await; - let timeout = timeout(Duration::from_millis(250), child.wait()).await; + if cancel_token.is_cancelled() { + debug!(ctx: &ctx, "process cancelled, terminating child process"); + let _ = child.kill().await; + break -1; // return -1 to indicate that the process was cancelled. + } - if let Ok(res) = timeout { + let timeout = timeout(Duration::from_millis(25), child.wait()).await; + if let Ok(res) = timeout { if let Ok(status) = res { break status.code().expect("no status code"); } else { @@ -499,7 +403,7 @@ async fn wait_for_process(ctx: tower_telemetry::Context, sx: oneshot::Sender(fd: FD, channel: Channel, output: OutputSender, input: BufReader) { @@ -517,3 +421,7 @@ async fn drain_output(fd: FD, channel: Channel, output: Ou } } + +fn is_bash_package(package: &Package) -> bool { + return package.manifest.invoke.ends_with(".sh") +} diff --git a/crates/tower-uv/Cargo.toml b/crates/tower-uv/Cargo.toml new file mode 100644 index 00000000..ffd63f59 --- /dev/null +++ b/crates/tower-uv/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "tower-uv" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } +license = { workspace = true } + +[dependencies] +async-compression = { workspace = true } +async_zip = { workspace = true } +futures-lite = { workspace = true } +reqwest = { workspace = true } +tokio = { workspace = true } +tokio-tar = { workspace = true } +tower-telemetry = { workspace = true } diff --git a/crates/tower-uv/src/install.rs b/crates/tower-uv/src/install.rs new file mode 100644 index 00000000..25677da3 --- /dev/null +++ b/crates/tower-uv/src/install.rs @@ -0,0 +1,308 @@ +use std::env; +use std::path::PathBuf; + +use tokio_tar::Archive; +use tokio::process::Command; +use async_compression::tokio::bufread::GzipDecoder; +use async_zip::tokio::read::seek::ZipFileReader; +use futures_lite::io::AsyncReadExt; + +use tower_telemetry::debug; + +// Copy the UV_VERSION locally to make this a bit more ergonomic. +const UV_VERSION: &str = crate::UV_VERSION; + +#[derive(Debug)] +pub enum Error { + IoError(std::io::Error), + Other(String), +} + +impl From for Error { + fn from(err: std::io::Error) -> Self { + Error::IoError(err) + } +} + +impl From for Error { + fn from(err: String) -> Self { + Error::Other(err) + } +} + +pub fn get_default_uv_bin_dir() -> Result { + Ok(PathBuf::from(".tower/bin")) +} + +#[derive(Debug)] +pub struct ArchiveSelector; + +impl ArchiveSelector { + /// Get the appropriate archive name for the current platform + pub async fn get_archive_name() -> Result { + let arch = env::consts::ARCH; + let os = env::consts::OS; + + match (arch, os) { + // macOS + ("aarch64", "macos") => Ok("uv-aarch64-apple-darwin.tar.gz".to_string()), + ("x86_64", "macos") => Ok("uv-x86_64-apple-darwin.tar.gz".to_string()), + + // Windows + ("aarch64", "windows") => Ok("uv-aarch64-pc-windows-msvc.zip".to_string()), + ("x86_64", "windows") => Ok("uv-x86_64-pc-windows-msvc.zip".to_string()), + ("x86", "windows") => Ok("uv-i686-pc-windows-msvc.zip".to_string()), + + // Linux + ("aarch64", "linux") => { + if Self::is_musl_target() { + Ok("uv-aarch64-unknown-linux-musl.tar.gz".to_string()) + } else if Self::check_glibc(2, 28).await { + Ok("uv-aarch64-unknown-linux-gnu.tar.gz".to_string()) + } else { + Ok("uv-aarch64-unknown-linux-musl.tar.gz".to_string()) + } + } + ("x86_64", "linux") => { + if Self::is_musl_target() { + Ok("uv-x86_64-unknown-linux-musl.tar.gz".to_string()) + } else if Self::check_glibc(2, 17).await { + Ok("uv-x86_64-unknown-linux-gnu.tar.gz".to_string()) + } else { + Ok("uv-x86_64-unknown-linux-musl.tar.gz".to_string()) + } + } + ("x86", "linux") => { + if Self::is_musl_target() { + Ok("uv-i686-unknown-linux-musl.tar.gz".to_string()) + } else if Self::check_glibc(2, 17).await { + Ok("uv-i686-unknown-linux-gnu.tar.gz".to_string()) + } else { + Ok("uv-i686-unknown-linux-musl.tar.gz".to_string()) + } + } + ("arm", "linux") => { + // ARM v6 - only musl available + Ok("uv-arm-unknown-linux-musleabihf.tar.gz".to_string()) + } + ("armv7", "linux") => { + if Self::is_musl_target() { + Ok("uv-armv7-unknown-linux-musleabihf.tar.gz".to_string()) + } else if Self::check_glibc(2, 17).await { + Ok("uv-armv7-unknown-linux-gnueabihf.tar.gz".to_string()) + } else { + Ok("uv-armv7-unknown-linux-musleabihf.tar.gz".to_string()) + } + } + ("powerpc64", "linux") => { + if Self::check_glibc(2, 17).await { + Ok("uv-powerpc64-unknown-linux-gnu.tar.gz".to_string()) + } else { + Err("PowerPC64 requires glibc 2.17 or newer".to_string()) + } + } + ("powerpc64le", "linux") => { + if Self::check_glibc(2, 17).await { + Ok("uv-powerpc64le-unknown-linux-gnu.tar.gz".to_string()) + } else { + Err("PowerPC64LE requires glibc 2.17 or newer".to_string()) + } + } + ("riscv64", "linux") => { + if Self::check_glibc(2, 31).await { + Ok("uv-riscv64gc-unknown-linux-gnu.tar.gz".to_string()) + } else { + Err("RISC-V 64 requires glibc 2.31 or newer".to_string()) + } + } + ("s390x", "linux") => { + if Self::check_glibc(2, 17).await { + Ok("uv-s390x-unknown-linux-gnu.tar.gz".to_string()) + } else { + Err("s390x requires glibc 2.17 or newer".to_string()) + } + } + + _ => Err(format!("Unsupported platform: {} {}", arch, os)), + } + } + + /// Check if the current target uses musl libc + fn is_musl_target() -> bool { + // Check if we're compiled with musl + cfg!(target_env = "musl") + } + + /// Check if glibc version meets minimum requirements + async fn check_glibc(major: u32, minor: u32) -> bool { + // Only check glibc on Linux with gnu env + if !cfg!(target_os = "linux") || cfg!(target_env = "musl") { + return false; + } + + // Try to get glibc version using ldd + if let Ok(output) = Command::new("ldd") + .arg("--version") + .output() + .await + { + if let Ok(version_str) = String::from_utf8(output.stdout) { + return Self::parse_glibc_version(&version_str, major, minor); + } + } + + // Fallback: try to read from /lib/libc.so.6 + if let Ok(output) = Command::new("/lib/libc.so.6") + .output() + .await + { + if let Ok(version_str) = String::from_utf8(output.stdout) { + return Self::parse_glibc_version(&version_str, major, minor); + } + } + + // If we can't determine the version, assume it's old + false + } + + /// Parse glibc version string and compare with required version + fn parse_glibc_version(version_str: &str, req_major: u32, req_minor: u32) -> bool { + // Look for version pattern like "2.17" or "2.31" + for line in version_str.lines() { + if let Some(version_part) = line.split_whitespace() + .find(|part| part.contains('.') && part.chars().next().unwrap_or('0').is_ascii_digit()) + { + let version_clean = version_part.trim_matches(|c: char| !c.is_ascii_digit() && c != '.'); + let parts: Vec<&str> = version_clean.split('.').collect(); + + if parts.len() >= 2 { + if let (Ok(major), Ok(minor)) = (parts[0].parse::(), parts[1].parse::()) { + return major > req_major || (major == req_major && minor >= req_minor); + } + } + } + } + false + } +} + +fn extract_package_name(archive: String) -> String { + // Remove .tar.gz or .zip extension + archive + .strip_suffix(".tar.gz") + .or(archive.strip_suffix(".zip")) + .unwrap_or(&archive) + .to_string() +} + +async fn download_uv_archive(path: &PathBuf, archive: String) -> Result { + debug!("Downloading UV archive: {}", archive); + let url = format!("https://github.com/astral-sh/uv/releases/download/{}/{}", UV_VERSION, archive); + + // Create the directory if it doesn't exist + std::fs::create_dir_all(&path).map_err(Error::IoError)?; + + // Download the file + let response = reqwest::get(url) + .await + .map_err(|e| Error::Other(e.to_string()))?; + + let bytes = response.bytes() + .await + .map_err(|e| Error::Other(e.to_string()))?; + + // Determine archive type from extension + if archive.ends_with(".tar.gz") { + let cursor = std::io::Cursor::new(bytes); + let tar = GzipDecoder::new(cursor); + + // Extract the tar.gz archive + Archive::new(tar) + .unpack(path) + .await?; + + let package_name = extract_package_name(archive.clone()); + Ok(path.join(package_name).join("uv")) + } else if archive.ends_with(".zip") { + // Write zip data to a temporary file since async-zip works with files + let temp_path = path.join("temp.zip"); + tokio::fs::write(&temp_path, bytes).await?; + + // Open the zip file using seek reader with compression support + let file = tokio::fs::File::open(&temp_path).await?; + let mut zip = ZipFileReader::with_tokio(file) + .await + .map_err(|e| Error::Other(format!("Failed to open zip file: {}", e)))?; + + let package_name = extract_package_name(archive.clone()); + let uv_path = "uv".to_string(); + let uv_exe_path = "uv.exe".to_string(); + + // Find the UV executable entry + let entries = zip.file().entries(); + let entry_index = entries + .iter() + .enumerate() + .find(|(_, entry)| { + let name = entry.filename().as_str().unwrap_or(""); + name == uv_path || name == uv_exe_path + }) + .map(|(index, _)| index) + .ok_or_else(|| Error::Other("UV executable not found in archive".to_string()))?; + + // Create the package directory + let target_dir = path.join(&package_name); + std::fs::create_dir_all(&target_dir)?; + + // Extract the file with proper error handling for compression + let filename = entries[entry_index].filename().as_str().unwrap_or("uv").to_string(); + let is_exe = filename.ends_with(".exe"); + let target_path = target_dir.join(if is_exe { "uv.exe" } else { "uv" }); + + let mut reader = zip.reader_with_entry(entry_index) + .await + .map_err(|e| Error::Other(format!("Failed to create entry reader for {}: {}", filename, e)))?; + let mut file = tokio::fs::File::create(&target_path).await?; + + // Manually copy data since ZipEntryReader doesn't implement AsyncRead + let mut buffer = [0u8; 8192]; + let mut total_bytes = 0; + loop { + let bytes_read = reader.read(&mut buffer) + .await + .map_err(|e| Error::Other(format!("Failed to read from zip entry: {}", e)))?; + if bytes_read == 0 { + break; + } + tokio::io::AsyncWriteExt::write_all(&mut file, &buffer[..bytes_read]) + .await + .map_err(|e| Error::Other(format!("Failed to write to output file: {}", e)))?; + total_bytes += bytes_read; + } + + debug!("Successfully extracted {} bytes to {:?}", total_bytes, target_path); + + // Make the file executable on Unix systems + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mut perms = std::fs::metadata(&target_path)?.permissions(); + perms.set_mode(0o755); + std::fs::set_permissions(&target_path, perms)?; + } + + // Clean up temporary zip file + tokio::fs::remove_file(temp_path).await?; + + Ok(target_path) + } else { + return Err(Error::Other(format!("Unsupported archive format: {}", archive))); + } +} + +pub async fn download_uv_for_arch(path: &PathBuf) -> Result { + let archive = ArchiveSelector::get_archive_name().await?; + let path = download_uv_archive(path, archive).await?; + debug!("Downloaded UV to: {:?}", path); + Ok(path) +} diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs new file mode 100644 index 00000000..1998b21b --- /dev/null +++ b/crates/tower-uv/src/lib.rs @@ -0,0 +1,158 @@ +use std::path::PathBuf; +use std::collections::HashMap; +use std::process::Stdio; +use tokio::process::{Command, Child}; +use tower_telemetry::debug; + +mod install; + +// UV_VERSION is the version of UV to download and install when setting up a local UV deployment. +pub const UV_VERSION: &str = "0.7.13"; + +#[derive(Debug)] +pub enum Error { + IoError(std::io::Error), + NotFound(String), + PermissionDenied(String), + Other(String), +} + +impl From for Error { + fn from(err: std::io::Error) -> Self { + // Convert std::fs::Error to your custom Error type + Error::IoError(err) + } +} + +impl From for Error { + fn from(err: install::Error) -> Self { + match err { + install::Error::IoError(e) => Error::IoError(e), + install::Error::Other(msg) => Error::Other(msg), + } + } +} + +async fn find_uv_binary() -> Option { + if let Ok(default_path) = install::get_default_uv_bin_dir() { + // Check if the default path exists + if default_path.exists() { + let uv_path = default_path.join("uv"); + if uv_path.exists() { + return Some(uv_path); + } + } + } + + // First, check if uv is already in the PATH + let output = Command::new("which") + .arg("uv") + .output() + .await; + + if let Ok(output) = output { + let path_str = String::from_utf8_lossy(&output.stdout); + let path = PathBuf::from(path_str.trim()); + + // If this is a path that actually exists, then we assume that it's `uv` and we can + // continue. + if path.exists() { + Some(path) + } else { + None + } + } else { + None + } +} + +async fn find_or_setup_uv() -> Result { + // If we get here, uv wasn't found in PATH, so let's download it + if let Some(path) = find_uv_binary().await { + Ok(path) + } else { + let path = install::get_default_uv_bin_dir()?; + + // Create the directory if it doesn't exist + std::fs::create_dir_all(&path).map_err(Error::IoError)?; + + let parent = path.parent() + .ok_or_else(|| Error::NotFound("Parent directory not found".to_string()))? + .to_path_buf(); + + // We download this code to the UV directory + let exe = install::download_uv_for_arch(&parent).await?; + + // Target is the UV binary we want. + let target = path.join("uv"); + + // Copy the `uv` binary into the default directory + std::fs::copy(&exe, &target) + .map_err(|e| Error::IoError(e))?; + + Ok(target) + } +} + +pub struct Uv { + pub uv_path: PathBuf, +} + +impl Uv { + pub async fn new() -> Result { + let uv_path = find_or_setup_uv().await?; + Ok(Uv { uv_path }) + } + + pub async fn venv(&self, cwd: &PathBuf, env_vars: &HashMap) -> Result { + debug!("Executing UV ({:?}) venv in {:?}", &self.uv_path, cwd); + + let child = Command::new(&self.uv_path) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(cwd) + .arg("venv") + .envs(env_vars) + .spawn()?; + + Ok(child) + } + + pub async fn sync(&self, cwd: &PathBuf, env_vars: &HashMap) -> Result { + debug!("Executing UV ({:?}) sync in {:?}", &self.uv_path, cwd); + + let child = Command::new(&self.uv_path) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(cwd) + .arg("--color") + .arg("never") + .arg("--no-progress") + .arg("sync") + .envs(env_vars) + .spawn()?; + + Ok(child) + } + + pub async fn run(&self, cwd: &PathBuf, program: &PathBuf, env_vars: &HashMap) -> Result { + debug!("Executing UV ({:?}) run {:?} in {:?}", &self.uv_path, program, cwd); + + let child = Command::new(&self.uv_path) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(cwd) + .arg("--color") + .arg("never") + .arg("--no-progress") + .arg("run") + .arg(program) + .envs(env_vars) + .spawn()?; + + Ok(child) + } +} From a958c428c9a4c6327e2782223ec291cdaeddfe26 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Fri, 18 Jul 2025 16:14:14 +0200 Subject: [PATCH 3/6] chore: Bump version to v0.3.21-rc1 --- Cargo.lock | 22 +++++++++++----------- Cargo.toml | 4 +++- pyproject.toml | 4 +++- uv.lock | 2 +- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e25f5b2..3d4d2b93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,7 +387,7 @@ dependencies = [ [[package]] name = "config" -version = "0.3.20" +version = "0.3.21-rc.1" dependencies = [ "chrono", "clap", @@ -474,7 +474,7 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crypto" -version = "0.3.20" +version = "0.3.21-rc.1" dependencies = [ "aes-gcm", "base64", @@ -2513,7 +2513,7 @@ dependencies = [ [[package]] name = "testutils" -version = "0.3.20" +version = "0.3.21-rc.1" dependencies = [ "pem", "rsa", @@ -2751,7 +2751,7 @@ dependencies = [ [[package]] name = "tower" -version = "0.3.20" +version = "0.3.21-rc.1" dependencies = [ "tokio", "tower-api", @@ -2775,7 +2775,7 @@ dependencies = [ [[package]] name = "tower-api" -version = "0.3.20" +version = "0.3.21-rc.1" dependencies = [ "reqwest", "serde", @@ -2787,7 +2787,7 @@ dependencies = [ [[package]] name = "tower-cmd" -version = "0.3.20" +version = "0.3.21-rc.1" dependencies = [ "anyhow", "bytes", @@ -2826,7 +2826,7 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-package" -version = "0.3.20" +version = "0.3.21-rc.1" dependencies = [ "async-compression", "config", @@ -2844,7 +2844,7 @@ dependencies = [ [[package]] name = "tower-runtime" -version = "0.3.20" +version = "0.3.21-rc.1" dependencies = [ "chrono", "snafu", @@ -2863,7 +2863,7 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tower-telemetry" -version = "0.3.20" +version = "0.3.21-rc.1" dependencies = [ "tracing", "tracing-appender", @@ -2872,7 +2872,7 @@ dependencies = [ [[package]] name = "tower-uv" -version = "0.3.20" +version = "0.3.21-rc.1" dependencies = [ "async-compression", "async_zip", @@ -2885,7 +2885,7 @@ dependencies = [ [[package]] name = "tower-version" -version = "0.3.20" +version = "0.3.21-rc.1" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 116d1a0d..c94262bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,9 @@ resolver = "2" [workspace.package] edition = "2021" -version = "0.3.20" +version = "0.3.21-rc.1" + + description = "Tower is the best way to host Python data apps in production" rust-version = "1.81" authors = ["Brad Heller "] diff --git a/pyproject.toml b/pyproject.toml index a5aa638e..49b21c76 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,9 @@ build-backend = "maturin" [project] name = "tower" -version = "0.3.20" +version = "0.3.21rc1" + + description = "Tower CLI and runtime environment for Tower." authors = [{ name = "Tower Computing Inc.", email = "brad@tower.dev" }] readme = "README.md" diff --git a/uv.lock b/uv.lock index 80d9007f..cdb7efc8 100644 --- a/uv.lock +++ b/uv.lock @@ -1201,7 +1201,7 @@ wheels = [ [[package]] name = "tower" -version = "0.3.20" +version = "0.3.21rc1" source = { editable = "." } dependencies = [ { name = "attrs" }, From 79cbe85be1ce35b267c01740c6aca3910f821fc6 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 21 Jul 2025 15:57:38 +0200 Subject: [PATCH 4/6] More refined dependency management for different app types (#70) * fix: Don't run `sync` if there's a missing pyproject.toml * chore: Simple integration tests for local execution * Update crates/tower-runtime/tests/local_test.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update crates/tower-uv/src/lib.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * chore: Reduce lock contention on reading output * chore: Attempts to resolve `uv` problems on first download * chore: Concurrent test runs cause problems! * chore: Only allow one UV setup to proceed at a time. * chore: Allow tests to proceed concurrently * chore: We need to actually hold the lock * chore: Remove debug output for now * chore: Add support for installing apps that use requirements.txt * Update crates/tower-uv/tests/install_test.rs Co-authored-by: Ben Lovell * chore: Feedback from @socksy * chore: More feedback from @socksy --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Ben Lovell --- .github/workflows/test-rust.yml | 2 - Cargo.lock | 2 + crates/tower-cmd/src/run.rs | 8 +- crates/tower-runtime/Cargo.toml | 3 + crates/tower-runtime/src/errors.rs | 3 + crates/tower-runtime/src/lib.rs | 25 +-- crates/tower-runtime/src/local.rs | 70 +++++-- .../example-apps/01-hello-world/Towerfile | 3 + .../tests/example-apps/01-hello-world/task.py | 2 + .../tests/example-apps/02-use-faker/README.md | 0 .../tests/example-apps/02-use-faker/Towerfile | 3 + .../tests/example-apps/02-use-faker/main.py | 9 + .../example-apps/02-use-faker/pyproject.toml | 9 + .../tests/example-apps/02-use-faker/uv.lock | 35 ++++ .../example-apps/03-legacy-app/Towerfile | 3 + .../03-legacy-app/requirements.txt | 2 + .../tests/example-apps/03-legacy-app/task.py | 2 + crates/tower-runtime/tests/local_test.rs | 185 ++++++++++++++++++ crates/tower-uv/Cargo.toml | 1 + crates/tower-uv/src/install.rs | 75 ++++++- crates/tower-uv/src/lib.rs | 132 ++++++------- crates/tower-uv/tests/install_test.rs | 15 ++ 22 files changed, 474 insertions(+), 115 deletions(-) create mode 100644 crates/tower-runtime/tests/example-apps/01-hello-world/Towerfile create mode 100644 crates/tower-runtime/tests/example-apps/01-hello-world/task.py create mode 100644 crates/tower-runtime/tests/example-apps/02-use-faker/README.md create mode 100644 crates/tower-runtime/tests/example-apps/02-use-faker/Towerfile create mode 100644 crates/tower-runtime/tests/example-apps/02-use-faker/main.py create mode 100644 crates/tower-runtime/tests/example-apps/02-use-faker/pyproject.toml create mode 100644 crates/tower-runtime/tests/example-apps/02-use-faker/uv.lock create mode 100644 crates/tower-runtime/tests/example-apps/03-legacy-app/Towerfile create mode 100644 crates/tower-runtime/tests/example-apps/03-legacy-app/requirements.txt create mode 100644 crates/tower-runtime/tests/example-apps/03-legacy-app/task.py create mode 100644 crates/tower-runtime/tests/local_test.rs create mode 100644 crates/tower-uv/tests/install_test.rs diff --git a/.github/workflows/test-rust.yml b/.github/workflows/test-rust.yml index 044d7125..a03ea32f 100644 --- a/.github/workflows/test-rust.yml +++ b/.github/workflows/test-rust.yml @@ -48,8 +48,6 @@ jobs: if: github.ref_name != 'main' run: > cargo test --all-features - env: - RUST_LOG: debug - run: bash .github/check-for-naughty-dependencies.sh diff --git a/Cargo.lock b/Cargo.lock index 3d4d2b93..d578b393 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2847,6 +2847,7 @@ name = "tower-runtime" version = "0.3.21-rc.1" dependencies = [ "chrono", + "config", "snafu", "tokio", "tokio-util", @@ -2876,6 +2877,7 @@ version = "0.3.21-rc.1" dependencies = [ "async-compression", "async_zip", + "dirs", "futures-lite", "reqwest", "tokio", diff --git a/crates/tower-cmd/src/run.rs b/crates/tower-cmd/src/run.rs index aad608a7..f1f72681 100644 --- a/crates/tower-cmd/src/run.rs +++ b/crates/tower-cmd/src/run.rs @@ -6,6 +6,8 @@ use tower_package::{Package, PackageSpec}; use tower_runtime::{local::LocalApp, App, AppLauncher, OutputReceiver}; use tower_telemetry::{Context, debug}; +use tokio::sync::mpsc::unbounded_channel; + use crate::{ output, api, @@ -133,7 +135,7 @@ async fn do_run_local(config: Config, path: PathBuf, env: &str, mut params: Hash std::process::exit(1); } - let (sender, receiver) = tower_runtime::create_output_stream(); + let (sender, receiver) = unbounded_channel(); output::success(&format!("Launching app `{}`", towerfile.app.name)); let output_task = tokio::spawn(monitor_output(receiver)); @@ -342,9 +344,9 @@ async fn build_package(towerfile: &Towerfile) -> Package { /// monitor_output is a helper function that will monitor the output of a given output channel and /// plops it down on stdout. -async fn monitor_output(output: OutputReceiver) { +async fn monitor_output(mut output: OutputReceiver) { loop { - if let Some(line) = output.lock().await.recv().await { + if let Some(line) = output.recv().await { let ts = &line.time; let msg = &line.line; output::log_line(&ts.to_rfc3339(), msg, output::LogLineType::Local); diff --git a/crates/tower-runtime/Cargo.toml b/crates/tower-runtime/Cargo.toml index 99770ec3..7bed0858 100644 --- a/crates/tower-runtime/Cargo.toml +++ b/crates/tower-runtime/Cargo.toml @@ -14,3 +14,6 @@ snafu = { workspace = true } tower-package = { workspace = true } tower-telemetry = { workspace = true } tower-uv = { workspace = true } + +[dev-dependencies] +config = { workspace = true } diff --git a/crates/tower-runtime/src/errors.rs b/crates/tower-runtime/src/errors.rs index 5162fc6d..73c1b7cc 100644 --- a/crates/tower-runtime/src/errors.rs +++ b/crates/tower-runtime/src/errors.rs @@ -85,6 +85,9 @@ impl From for Error { tower_uv::Error::NotFound(_) => Error::SpawnFailed, tower_uv::Error::PermissionDenied(_) => Error::SpawnFailed, tower_uv::Error::Other(_) => Error::SpawnFailed, + tower_uv::Error::MissingPyprojectToml => Error::SpawnFailed, + tower_uv::Error::InvalidUv => Error::SpawnFailed, + tower_uv::Error::UnsupportedPlatform => Error::UnsupportedPlatform, } } } diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 4384521d..bae6807a 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -1,12 +1,9 @@ use std::path::PathBuf; use std::future::Future; -use std::sync::Arc; use std::collections::HashMap; -use tokio::sync::Mutex; use tokio::sync::mpsc::{ UnboundedReceiver, UnboundedSender, - unbounded_channel, }; use chrono::{DateTime, Utc}; @@ -41,7 +38,7 @@ pub struct Output { pub line: String, } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, PartialEq)] pub enum Status { None, Running, @@ -49,13 +46,9 @@ pub enum Status { Crashed { code: i32 }, } -type SharedReceiver = Arc>>; - -type SharedSender = Arc>>; - -pub type OutputReceiver = SharedReceiver; +pub type OutputReceiver = UnboundedReceiver; -pub type OutputSender = SharedSender; +pub type OutputSender = UnboundedSender; pub trait App { // start will start the process @@ -81,14 +74,6 @@ impl std::default::Default for AppLauncher { } } -pub fn create_output_stream() -> (OutputSender, OutputReceiver) { - let (sender, receiver) = unbounded_channel::(); - - let output_sender = Arc::new(Mutex::new(sender)); - let output_receiver = Arc::new(Mutex::new(receiver)); - (output_sender, output_receiver) -} - impl AppLauncher { pub async fn launch( &mut self, @@ -104,7 +89,7 @@ impl AppLauncher { let opts = StartOptions { ctx, - output_sender: Some(output_sender), + output_sender, cwd: Some(cwd), environment, secrets, @@ -152,7 +137,7 @@ pub struct StartOptions { pub secrets: HashMap, pub parameters: HashMap, pub env_vars: HashMap, - pub output_sender: Option, + pub output_sender: OutputSender, } pub struct ExecuteOptions { diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 76c16124..56977443 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -186,28 +186,65 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender, cancel_ // Now we also need to find the program to execute. let program_path = working_dir.join(&manifest.invoke); - // Check once more if the process was cancelled before we do a uv sync. The sync itself, - // once started, will take a while and we have logic for checking for cancellation. + // Quickly do a check to see if there was a cancellation before we do a subprocess spawn to + // ensure everything is in place. if cancel_token.is_cancelled() { // again tell any waiters that we cancelled. let _ = sx.send(-1); return Err(Error::Cancelled); } - let mut child = uv.sync(&working_dir, &env_vars).await?; + let mut child = uv.venv(&working_dir, &env_vars).await?; // Drain the logs to the output channel. - if let Some(ref sender) = opts.output_sender { - let stdout = child.stdout.take().expect("no stdout"); - tokio::spawn(drain_output(FD::Stdout, Channel::Setup, sender.clone(), BufReader::new(stdout))); + let stdout = child.stdout.take().expect("no stdout"); + tokio::spawn(drain_output(FD::Stdout, Channel::Setup, opts.output_sender.clone(), BufReader::new(stdout))); - let stderr = child.stderr.take().expect("no stderr"); - tokio::spawn(drain_output(FD::Stderr, Channel::Setup, sender.clone(), BufReader::new(stderr))); - } + let stderr = child.stderr.take().expect("no stderr"); + tokio::spawn(drain_output(FD::Stderr, Channel::Setup, opts.output_sender.clone(), BufReader::new(stderr))); - // Let's wait for the setup to finish. We don't care about the results. + // Wait for venv to finish up. wait_for_process(ctx.clone(), &cancel_token, child).await; + // Check once more if the process was cancelled before we do a uv sync. The sync itself, + // once started, will take a while and we have logic for checking for cancellation. + if cancel_token.is_cancelled() { + // again tell any waiters that we cancelled. + let _ = sx.send(-1); + return Err(Error::Cancelled); + } + + match uv.sync(&working_dir, &env_vars).await { + Err(e) => { + // If we were missing a pyproject.toml, then that's fine for us--we'll just + // continue execution. + // + // Note that we do a match here instead of an if. That's because of the way + // tower_uv::Error is implemented. Namely, it doesn't implement PartialEq and can't + // do so due to it's dependency on std::io::Error. + match e { + tower_uv::Error::MissingPyprojectToml => { + debug!(ctx: &ctx, "no pyproject.toml found, continuing without sync"); + }, + _ => { + // If we got any other error, we want to return it. + return Err(e.into()); + } + } + }, + Ok(mut child) => { + // Drain the logs to the output channel. + let stdout = child.stdout.take().expect("no stdout"); + tokio::spawn(drain_output(FD::Stdout, Channel::Setup, opts.output_sender.clone(), BufReader::new(stdout))); + + let stderr = child.stderr.take().expect("no stderr"); + tokio::spawn(drain_output(FD::Stderr, Channel::Setup, opts.output_sender.clone(), BufReader::new(stderr))); + + // Let's wait for the setup to finish. We don't care about the results. + wait_for_process(ctx.clone(), &cancel_token, child).await; + } + } + // Check once more to see if the process was cancelled, this will bail us out early. if cancel_token.is_cancelled() { // if there's a waiter, we want them to know that the process was cancelled so we have @@ -219,13 +256,11 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender, cancel_ let mut child = uv.run(&working_dir, &program_path, &env_vars).await?; // Drain the logs to the output channel. - if let Some(ref sender) = opts.output_sender { - let stdout = child.stdout.take().expect("no stdout"); - tokio::spawn(drain_output(FD::Stdout, Channel::Program, sender.clone(), BufReader::new(stdout))); + let stdout = child.stdout.take().expect("no stdout"); + tokio::spawn(drain_output(FD::Stdout, Channel::Program, opts.output_sender.clone(), BufReader::new(stdout))); - let stderr = child.stderr.take().expect("no stderr"); - tokio::spawn(drain_output(FD::Stderr, Channel::Program, sender.clone(), BufReader::new(stderr))); - } + let stderr = child.stderr.take().expect("no stderr"); + tokio::spawn(drain_output(FD::Stderr, Channel::Program, opts.output_sender.clone(), BufReader::new(stderr))); let _ = sx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); } @@ -410,8 +445,6 @@ async fn drain_output(fd: FD, channel: Channel, output: Ou let mut lines = input.lines(); while let Some(line) = lines.next_line().await.expect("line iteration fialed") { - let output = output.lock().await; - let _ = output.send(Output{ channel, fd, @@ -421,7 +454,6 @@ async fn drain_output(fd: FD, channel: Channel, output: Ou } } - fn is_bash_package(package: &Package) -> bool { return package.manifest.invoke.ends_with(".sh") } diff --git a/crates/tower-runtime/tests/example-apps/01-hello-world/Towerfile b/crates/tower-runtime/tests/example-apps/01-hello-world/Towerfile new file mode 100644 index 00000000..9406d9bc --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/01-hello-world/Towerfile @@ -0,0 +1,3 @@ +[app] +name = "01-hello-world" +script = "./task.py" diff --git a/crates/tower-runtime/tests/example-apps/01-hello-world/task.py b/crates/tower-runtime/tests/example-apps/01-hello-world/task.py new file mode 100644 index 00000000..215234df --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/01-hello-world/task.py @@ -0,0 +1,2 @@ +for i in range(0, 5): + print("Hello, world!") diff --git a/crates/tower-runtime/tests/example-apps/02-use-faker/README.md b/crates/tower-runtime/tests/example-apps/02-use-faker/README.md new file mode 100644 index 00000000..e69de29b diff --git a/crates/tower-runtime/tests/example-apps/02-use-faker/Towerfile b/crates/tower-runtime/tests/example-apps/02-use-faker/Towerfile new file mode 100644 index 00000000..97e276cf --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/02-use-faker/Towerfile @@ -0,0 +1,3 @@ +[app] +name = "02-use-faker" +script = "./main.py" diff --git a/crates/tower-runtime/tests/example-apps/02-use-faker/main.py b/crates/tower-runtime/tests/example-apps/02-use-faker/main.py new file mode 100644 index 00000000..95b8a506 --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/02-use-faker/main.py @@ -0,0 +1,9 @@ +from faker import Faker + +def main(): + fake = Faker() + print(fake.name()) + + +if __name__ == "__main__": + main() diff --git a/crates/tower-runtime/tests/example-apps/02-use-faker/pyproject.toml b/crates/tower-runtime/tests/example-apps/02-use-faker/pyproject.toml new file mode 100644 index 00000000..f88cfd39 --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/02-use-faker/pyproject.toml @@ -0,0 +1,9 @@ +[project] +name = "02-get-current-ip" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "faker>=37.4.2", +] diff --git a/crates/tower-runtime/tests/example-apps/02-use-faker/uv.lock b/crates/tower-runtime/tests/example-apps/02-use-faker/uv.lock new file mode 100644 index 00000000..391ed29e --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/02-use-faker/uv.lock @@ -0,0 +1,35 @@ +version = 1 +revision = 2 +requires-python = ">=3.12" + +[[package]] +name = "02-get-current-ip" +version = "0.1.0" +source = { virtual = "." } +dependencies = [ + { name = "faker" }, +] + +[package.metadata] +requires-dist = [{ name = "faker", specifier = ">=37.4.2" }] + +[[package]] +name = "faker" +version = "37.4.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzdata" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/65/95/da573e055608e180086e2ac3208f8c15d8b44220912f565a9821b9bff33a/faker-37.4.2.tar.gz", hash = "sha256:8e281bbaea30e5658895b8bea21cc50d27aaf3a43db3f2694409ca5701c56b0a", size = 1902890, upload-time = "2025-07-15T16:38:24.803Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/26/1c/b909a055be556c11f13cf058cfa0e152f9754d803ff3694a937efe300709/faker-37.4.2-py3-none-any.whl", hash = "sha256:b70ed1af57bfe988cbcd0afd95f4768c51eaf4e1ce8a30962e127ac5c139c93f", size = 1943179, upload-time = "2025-07-15T16:38:23.053Z" }, +] + +[[package]] +name = "tzdata" +version = "2025.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/95/32/1a225d6164441be760d75c2c42e2780dc0873fe382da3e98a2e1e48361e5/tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9", size = 196380, upload-time = "2025-03-23T13:54:43.652Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839, upload-time = "2025-03-23T13:54:41.845Z" }, +] diff --git a/crates/tower-runtime/tests/example-apps/03-legacy-app/Towerfile b/crates/tower-runtime/tests/example-apps/03-legacy-app/Towerfile new file mode 100644 index 00000000..f2e741ef --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/03-legacy-app/Towerfile @@ -0,0 +1,3 @@ +[app] +name = "03-legacy-app" +script = "task.py" diff --git a/crates/tower-runtime/tests/example-apps/03-legacy-app/requirements.txt b/crates/tower-runtime/tests/example-apps/03-legacy-app/requirements.txt new file mode 100644 index 00000000..79b039ba --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/03-legacy-app/requirements.txt @@ -0,0 +1,2 @@ +dlt[snowflake,filesystem]==1.4.0 +pandas diff --git a/crates/tower-runtime/tests/example-apps/03-legacy-app/task.py b/crates/tower-runtime/tests/example-apps/03-legacy-app/task.py new file mode 100644 index 00000000..2005385d --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/03-legacy-app/task.py @@ -0,0 +1,2 @@ +import dlt +print(dlt.version.__version__) diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs new file mode 100644 index 00000000..8457d4ad --- /dev/null +++ b/crates/tower-runtime/tests/local_test.rs @@ -0,0 +1,185 @@ +use std::collections::HashMap; +use std::path::PathBuf; + +use tower_runtime::{ + App, + StartOptions, + Status, + local::LocalApp, +}; + +use config::Towerfile; +use tower_package::{Package, PackageSpec}; +use tower_telemetry::{self, debug}; + +use tokio::sync::mpsc::unbounded_channel; + +fn get_example_app_dir(name: &str) -> PathBuf { + // This is where the root of the app lives. + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("tests"); + path.push("example-apps"); + path.push(name); + + if !path.exists() { + panic!("Example app directory does not exist: {}", path.display()); + } + + path +} + +async fn build_package_from_dir(dir: &PathBuf) -> Package { + let towerfile = Towerfile::from_path(dir.join("Towerfile")).expect("Failed to load Towerfile"); + let spec = PackageSpec::from_towerfile(&towerfile); + let mut package = Package::build(spec).await.expect("Failed to build package from directory"); + package.unpack().await.expect("Failed to unpack package"); + package +} + +#[tokio::test] +async fn test_running_hello_world() { + tower_telemetry::enable_logging( + tower_telemetry::LogLevel::Debug, + tower_telemetry::LogFormat::Plain, + tower_telemetry::LogDestination::Stdout, + ); + + debug!("Running 01-hello-world"); + let hello_world_dir = get_example_app_dir("01-hello-world"); + let package = build_package_from_dir(&hello_world_dir).await; + let (sender, mut receiver) = unbounded_channel(); + + // We need to create the package, which will load the app + let opts = StartOptions{ + ctx: tower_telemetry::Context::new(), + package, + output_sender: sender, + cwd: None, + environment: "local".to_string(), + secrets: HashMap::new(), + parameters: HashMap::new(), + env_vars: HashMap::new(), + }; + + // Start the app using the LocalApp runtime + let app = LocalApp::start(opts).await.expect("Failed to start app"); + + // The status should be running + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Running, "App should be running"); + + while let Some(output) = receiver.recv().await { + let valid_line = output.line.contains("Hello, world!") || + output.line.contains("Using CPython") || + output.line.contains("Creating virtual environment") || + output.line.contains("Activate with"); + + assert!(valid_line, "Log should contain 'Hello, world!' or a setup line"); + } + + // check the status once more, should be done. + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Exited, "App should be running"); +} + +#[tokio::test] +async fn test_running_use_faker() { + debug!("Running 02-use-faker"); + // This test is a simple test that outputs some text to the console; however, this time it has + // a dependency defined in pyproject.toml, which means that it'll have to do a uv sync first. + let use_faker_dir = get_example_app_dir("02-use-faker"); + let package = build_package_from_dir(&use_faker_dir).await; + let (sender, mut receiver) = unbounded_channel(); + + // We need to create the package, which will load the app + let opts = StartOptions{ + ctx: tower_telemetry::Context::new(), + package, + output_sender: sender, + cwd: None, + environment: "local".to_string(), + secrets: HashMap::new(), + parameters: HashMap::new(), + env_vars: HashMap::new(), + }; + + // Start the app using the LocalApp runtime + let app = LocalApp::start(opts).await.expect("Failed to start app"); + + // The status should be running + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Running, "App should be running"); + + let mut count_setup = 0; + let mut count_stdout = 0; + + while let Some(output) = receiver.recv().await { + debug!("Received output: {:?}", output.line); + match output.channel { + tower_runtime::Channel::Setup => { + count_setup += 1; + }, + tower_runtime::Channel::Program => { + count_stdout += 1; + } + } + } + + assert!(count_setup > 0, "There should be some setup output"); + assert!(count_stdout > 0, "should be more than one output"); + + // check the status once more, should be done. + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Exited, "App should be running"); +} + +#[tokio::test] +async fn test_running_legacy_app() { + debug!("Running 03-legacy-app"); + // This test is a simple test that outputs some text to the console; however, this time it has + // a dependency defined in pyproject.toml, which means that it'll have to do a uv sync first. + let legacy_app_dir = get_example_app_dir("03-legacy-app"); + let package = build_package_from_dir(&legacy_app_dir).await; + let (sender, mut receiver) = unbounded_channel(); + + // We need to create the package, which will load the app + let opts = StartOptions{ + ctx: tower_telemetry::Context::new(), + package, + output_sender: sender, + cwd: None, + environment: "local".to_string(), + secrets: HashMap::new(), + parameters: HashMap::new(), + env_vars: HashMap::new(), + }; + + // Start the app using the LocalApp runtime + let app = LocalApp::start(opts).await.expect("Failed to start app"); + + // The status should be running + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Running, "App should be running"); + + let mut count_setup = 0; + let mut count_stdout = 0; + + while let Some(output) = receiver.recv().await { + debug!("Received output: {:?}", output.line); + match output.channel { + tower_runtime::Channel::Setup => { + count_setup += 1; + }, + tower_runtime::Channel::Program => { + count_stdout += 1; + } + } + } + + assert!(count_setup > 0, "There should be some setup output"); + assert!(count_stdout > 0, "should be more than one output"); + + // check the status once more, should be done. + let status = app.status().await.expect("Failed to get app status"); + assert!(status == Status::Exited, "App should be running"); +} diff --git a/crates/tower-uv/Cargo.toml b/crates/tower-uv/Cargo.toml index ffd63f59..b85e9d0c 100644 --- a/crates/tower-uv/Cargo.toml +++ b/crates/tower-uv/Cargo.toml @@ -9,6 +9,7 @@ license = { workspace = true } [dependencies] async-compression = { workspace = true } async_zip = { workspace = true } +dirs = { workspace = true } futures-lite = { workspace = true } reqwest = { workspace = true } tokio = { workspace = true } diff --git a/crates/tower-uv/src/install.rs b/crates/tower-uv/src/install.rs index 25677da3..d3c052c0 100644 --- a/crates/tower-uv/src/install.rs +++ b/crates/tower-uv/src/install.rs @@ -1,8 +1,12 @@ use std::env; use std::path::PathBuf; +use std::sync::OnceLock; use tokio_tar::Archive; -use tokio::process::Command; +use tokio::{ + sync::Mutex, + process::Command, +}; use async_compression::tokio::bufread::GzipDecoder; use async_zip::tokio::read::seek::ZipFileReader; use futures_lite::io::AsyncReadExt; @@ -12,8 +16,16 @@ use tower_telemetry::debug; // Copy the UV_VERSION locally to make this a bit more ergonomic. const UV_VERSION: &str = crate::UV_VERSION; +static GLOBAL_LOCK: OnceLock> = OnceLock::new(); + +fn get_global_lock() -> &'static Mutex<()> { + GLOBAL_LOCK.get_or_init(|| Mutex::new(())) +} + #[derive(Debug)] pub enum Error { + NotFound(String), + UnsupportedPlatform, IoError(std::io::Error), Other(String), } @@ -31,7 +43,8 @@ impl From for Error { } pub fn get_default_uv_bin_dir() -> Result { - Ok(PathBuf::from(".tower/bin")) + let dir = dirs::data_local_dir().ok_or(Error::UnsupportedPlatform)?; + Ok(dir.join("tower").join("bin")) } #[derive(Debug)] @@ -301,8 +314,66 @@ async fn download_uv_archive(path: &PathBuf, archive: String) -> Result Result { + debug!("Starting download of UV for current architecture"); let archive = ArchiveSelector::get_archive_name().await?; let path = download_uv_archive(path, archive).await?; debug!("Downloaded UV to: {:?}", path); Ok(path) } + +async fn find_uv_binary() -> Option { + if let Ok(default_path) = get_default_uv_bin_dir() { + // Check if the default path exists + if default_path.exists() { + let uv_path = default_path.join("uv"); + if uv_path.exists() { + return Some(uv_path); + } + } + } + + None +} + +pub async fn find_or_setup_uv() -> Result { + // We only allow setup in the process space at a given time. + let _guard = get_global_lock().lock().await; + + // If we get here, uv wasn't found in PATH, so let's download it + if let Some(path) = find_uv_binary().await { + debug!("UV binary found at {:?}", path); + Ok(path) + } else { + let path = get_default_uv_bin_dir()?; + debug!("UV binary not found in PATH, setting up UV at {:?}", path); + + // Create the directory if it doesn't exist + std::fs::create_dir_all(&path).map_err(Error::IoError)?; + + let parent = path.parent() + .ok_or_else(|| Error::NotFound("Parent directory not found".to_string()))? + .to_path_buf(); + + // We download this code to the UV directory + let exe = download_uv_for_arch(&parent).await?; + + // Target is the UV binary we want. + let target = path.join("uv"); + + // Copy the `uv` binary into the default directory + tokio::fs::copy(&exe, &target) + .await?; + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mut perms = std::fs::metadata(&target)?.permissions(); + perms.set_mode(0o755); + std::fs::set_permissions(&target, perms)?; + } + + debug!("Copied UV binary from {:?} to {:?}", exe, target); + + Ok(target) + } +} diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 1998b21b..1ebf0aee 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -4,7 +4,7 @@ use std::process::Stdio; use tokio::process::{Command, Child}; use tower_telemetry::debug; -mod install; +pub mod install; // UV_VERSION is the version of UV to download and install when setting up a local UV deployment. pub const UV_VERSION: &str = "0.7.13"; @@ -15,6 +15,9 @@ pub enum Error { NotFound(String), PermissionDenied(String), Other(String), + MissingPyprojectToml, + InvalidUv, + UnsupportedPlatform, } impl From for Error { @@ -27,70 +30,29 @@ impl From for Error { impl From for Error { fn from(err: install::Error) -> Self { match err { + install::Error::NotFound(msg) => Error::NotFound(msg), + install::Error::UnsupportedPlatform => Error::UnsupportedPlatform, install::Error::IoError(e) => Error::IoError(e), install::Error::Other(msg) => Error::Other(msg), } } } -async fn find_uv_binary() -> Option { - if let Ok(default_path) = install::get_default_uv_bin_dir() { - // Check if the default path exists - if default_path.exists() { - let uv_path = default_path.join("uv"); - if uv_path.exists() { - return Some(uv_path); - } - } - } - - // First, check if uv is already in the PATH - let output = Command::new("which") - .arg("uv") +async fn test_uv_path(path: &PathBuf) -> Result<(), Error> { + let res = Command::new(&path) + .arg("--color") + .arg("never") + .arg("--no-progress") + .arg("--help") .output() .await; - if let Ok(output) = output { - let path_str = String::from_utf8_lossy(&output.stdout); - let path = PathBuf::from(path_str.trim()); - - // If this is a path that actually exists, then we assume that it's `uv` and we can - // continue. - if path.exists() { - Some(path) - } else { - None + match res { + Ok(_) => Ok(()), + Err(e) => { + debug!("Testing UV failed: {:?}", e); + Err(Error::InvalidUv) } - } else { - None - } -} - -async fn find_or_setup_uv() -> Result { - // If we get here, uv wasn't found in PATH, so let's download it - if let Some(path) = find_uv_binary().await { - Ok(path) - } else { - let path = install::get_default_uv_bin_dir()?; - - // Create the directory if it doesn't exist - std::fs::create_dir_all(&path).map_err(Error::IoError)?; - - let parent = path.parent() - .ok_or_else(|| Error::NotFound("Parent directory not found".to_string()))? - .to_path_buf(); - - // We download this code to the UV directory - let exe = install::download_uv_for_arch(&parent).await?; - - // Target is the UV binary we want. - let target = path.join("uv"); - - // Copy the `uv` binary into the default directory - std::fs::copy(&exe, &target) - .map_err(|e| Error::IoError(e))?; - - Ok(target) } } @@ -100,7 +62,8 @@ pub struct Uv { impl Uv { pub async fn new() -> Result { - let uv_path = find_or_setup_uv().await?; + let uv_path = install::find_or_setup_uv().await?; + test_uv_path(&uv_path).await?; Ok(Uv { uv_path }) } @@ -120,21 +83,48 @@ impl Uv { } pub async fn sync(&self, cwd: &PathBuf, env_vars: &HashMap) -> Result { - debug!("Executing UV ({:?}) sync in {:?}", &self.uv_path, cwd); + // We need to figure out which sync strategy to apply. If there is a pyproject.toml, then + // that's easy. + if cwd.join("pyproject.toml").exists() { + debug!("Executing UV ({:?}) sync in {:?}", &self.uv_path, cwd); + let child = Command::new(&self.uv_path) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(cwd) + .arg("--color") + .arg("never") + .arg("--no-progress") + .arg("sync") + .envs(env_vars) + .spawn()?; + + Ok(child) + } else if cwd.join("requirements.txt").exists() { + debug!("Executing UV ({:?}) sync with requirements in {:?}", &self.uv_path, cwd); + + // If there is a requirements.txt, then we can use that to sync. + let child = Command::new(&self.uv_path) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(cwd) + .arg("--color") + .arg("never") + .arg("pip") + .arg("install") + .arg("-r") + .arg(cwd.join("requirements.txt")) + .envs(env_vars) + .spawn()?; + + Ok(child) + } else { + // If there is no pyproject.toml or requirements.txt, then we can't sync. + Err(Error::MissingPyprojectToml) + } - let child = Command::new(&self.uv_path) - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(cwd) - .arg("--color") - .arg("never") - .arg("--no-progress") - .arg("sync") - .envs(env_vars) - .spawn()?; - Ok(child) } pub async fn run(&self, cwd: &PathBuf, program: &PathBuf, env_vars: &HashMap) -> Result { @@ -155,4 +145,8 @@ impl Uv { Ok(child) } + + pub async fn is_valid(&self) -> bool { + test_uv_path(&self.uv_path).await.is_ok() + } } diff --git a/crates/tower-uv/tests/install_test.rs b/crates/tower-uv/tests/install_test.rs new file mode 100644 index 00000000..82fe493c --- /dev/null +++ b/crates/tower-uv/tests/install_test.rs @@ -0,0 +1,15 @@ +use tower_uv::{ + install::get_default_uv_bin_dir, + Uv, +}; + +#[tokio::test] +async fn test_installing_uv() { + // Ensure there is no `uv` in the directory that we will install it to by default. + let default_uv_bin_dir = get_default_uv_bin_dir().unwrap(); + let _ = tokio::fs::remove_dir_all(&default_uv_bin_dir).await; + + // Now if we instantiate a Uv instance, it should install the `uv` binary. + let uv = Uv::new().await.expect("Failed to create a Uv instance"); + assert!(uv.is_valid().await); +} From 19ce94b0a1a1f54cf854312a37d27f57a9a00056 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 21 Jul 2025 16:02:55 +0200 Subject: [PATCH 5/6] chore: Bump version to v0.3.21-rc.2 --- Cargo.lock | 22 +++++++++++----------- Cargo.toml | 3 ++- pyproject.toml | 3 ++- uv.lock | 2 +- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d578b393..900e63f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,7 +387,7 @@ dependencies = [ [[package]] name = "config" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" dependencies = [ "chrono", "clap", @@ -474,7 +474,7 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crypto" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" dependencies = [ "aes-gcm", "base64", @@ -2513,7 +2513,7 @@ dependencies = [ [[package]] name = "testutils" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" dependencies = [ "pem", "rsa", @@ -2751,7 +2751,7 @@ dependencies = [ [[package]] name = "tower" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" dependencies = [ "tokio", "tower-api", @@ -2775,7 +2775,7 @@ dependencies = [ [[package]] name = "tower-api" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" dependencies = [ "reqwest", "serde", @@ -2787,7 +2787,7 @@ dependencies = [ [[package]] name = "tower-cmd" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" dependencies = [ "anyhow", "bytes", @@ -2826,7 +2826,7 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-package" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" dependencies = [ "async-compression", "config", @@ -2844,7 +2844,7 @@ dependencies = [ [[package]] name = "tower-runtime" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" dependencies = [ "chrono", "config", @@ -2864,7 +2864,7 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tower-telemetry" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" dependencies = [ "tracing", "tracing-appender", @@ -2873,7 +2873,7 @@ dependencies = [ [[package]] name = "tower-uv" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" dependencies = [ "async-compression", "async_zip", @@ -2887,7 +2887,7 @@ dependencies = [ [[package]] name = "tower-version" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index c94262bf..9870f590 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ resolver = "2" [workspace.package] edition = "2021" -version = "0.3.21-rc.1" +version = "0.3.22-rc.2" + description = "Tower is the best way to host Python data apps in production" diff --git a/pyproject.toml b/pyproject.toml index 49b21c76..ae9c897e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,8 @@ build-backend = "maturin" [project] name = "tower" -version = "0.3.21rc1" +version = "0.3.22rc2" + description = "Tower CLI and runtime environment for Tower." diff --git a/uv.lock b/uv.lock index cdb7efc8..f372558a 100644 --- a/uv.lock +++ b/uv.lock @@ -1201,7 +1201,7 @@ wheels = [ [[package]] name = "tower" -version = "0.3.21rc1" +version = "0.3.22rc2" source = { editable = "." } dependencies = [ { name = "attrs" }, From 6bf9ed78654c46626a792cb12d2598e78a9e1a7a Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Mon, 21 Jul 2025 16:10:07 +0200 Subject: [PATCH 6/6] chore: Actually update release to v0.3.21-rc.2 --- Cargo.lock | 22 +++++++++++----------- Cargo.toml | 5 +---- pyproject.toml | 5 +---- uv.lock | 2 +- 4 files changed, 14 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 900e63f3..23bee215 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,7 +387,7 @@ dependencies = [ [[package]] name = "config" -version = "0.3.22-rc.2" +version = "0.3.21-rc.2" dependencies = [ "chrono", "clap", @@ -474,7 +474,7 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crypto" -version = "0.3.22-rc.2" +version = "0.3.21-rc.2" dependencies = [ "aes-gcm", "base64", @@ -2513,7 +2513,7 @@ dependencies = [ [[package]] name = "testutils" -version = "0.3.22-rc.2" +version = "0.3.21-rc.2" dependencies = [ "pem", "rsa", @@ -2751,7 +2751,7 @@ dependencies = [ [[package]] name = "tower" -version = "0.3.22-rc.2" +version = "0.3.21-rc.2" dependencies = [ "tokio", "tower-api", @@ -2775,7 +2775,7 @@ dependencies = [ [[package]] name = "tower-api" -version = "0.3.22-rc.2" +version = "0.3.21-rc.2" dependencies = [ "reqwest", "serde", @@ -2787,7 +2787,7 @@ dependencies = [ [[package]] name = "tower-cmd" -version = "0.3.22-rc.2" +version = "0.3.21-rc.2" dependencies = [ "anyhow", "bytes", @@ -2826,7 +2826,7 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-package" -version = "0.3.22-rc.2" +version = "0.3.21-rc.2" dependencies = [ "async-compression", "config", @@ -2844,7 +2844,7 @@ dependencies = [ [[package]] name = "tower-runtime" -version = "0.3.22-rc.2" +version = "0.3.21-rc.2" dependencies = [ "chrono", "config", @@ -2864,7 +2864,7 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tower-telemetry" -version = "0.3.22-rc.2" +version = "0.3.21-rc.2" dependencies = [ "tracing", "tracing-appender", @@ -2873,7 +2873,7 @@ dependencies = [ [[package]] name = "tower-uv" -version = "0.3.22-rc.2" +version = "0.3.21-rc.2" dependencies = [ "async-compression", "async_zip", @@ -2887,7 +2887,7 @@ dependencies = [ [[package]] name = "tower-version" -version = "0.3.22-rc.2" +version = "0.3.21-rc.2" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 9870f590..9e234ba0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,7 @@ resolver = "2" [workspace.package] edition = "2021" -version = "0.3.22-rc.2" - - - +version = "0.3.21-rc.2" description = "Tower is the best way to host Python data apps in production" rust-version = "1.81" authors = ["Brad Heller "] diff --git a/pyproject.toml b/pyproject.toml index ae9c897e..41fd16d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,10 +4,7 @@ build-backend = "maturin" [project] name = "tower" -version = "0.3.22rc2" - - - +version = "0.3.21rc2" description = "Tower CLI and runtime environment for Tower." authors = [{ name = "Tower Computing Inc.", email = "brad@tower.dev" }] readme = "README.md" diff --git a/uv.lock b/uv.lock index f372558a..523da39c 100644 --- a/uv.lock +++ b/uv.lock @@ -1201,7 +1201,7 @@ wheels = [ [[package]] name = "tower" -version = "0.3.22rc2" +version = "0.3.21rc2" source = { editable = "." } dependencies = [ { name = "attrs" },