Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/test-rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ jobs:
if: github.ref_name != 'main'
run: >
cargo test --all-features
env:
RUST_LOG: debug

- run: bash .github/check-for-naughty-dependencies.sh

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions crates/tower-cmd/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use tower_package::{Package, PackageSpec};
use tower_runtime::{local::LocalApp, App, AppLauncher, OutputReceiver};
use tower_telemetry::{Context, debug};

use tokio::sync::mpsc::unbounded_channel;

use crate::{
output,
api,
Expand Down Expand Up @@ -133,7 +135,7 @@ async fn do_run_local(config: Config, path: PathBuf, env: &str, mut params: Hash
std::process::exit(1);
}

let (sender, receiver) = tower_runtime::create_output_stream();
let (sender, receiver) = unbounded_channel();

output::success(&format!("Launching app `{}`", towerfile.app.name));
let output_task = tokio::spawn(monitor_output(receiver));
Expand Down Expand Up @@ -342,9 +344,9 @@ async fn build_package(towerfile: &Towerfile) -> Package {

/// monitor_output is a helper function that will monitor the output of a given output channel and
/// plops it down on stdout.
async fn monitor_output(output: OutputReceiver) {
async fn monitor_output(mut output: OutputReceiver) {
loop {
if let Some(line) = output.lock().await.recv().await {
if let Some(line) = output.recv().await {
let ts = &line.time;
let msg = &line.line;
output::log_line(&ts.to_rfc3339(), msg, output::LogLineType::Local);
Expand Down
3 changes: 3 additions & 0 deletions crates/tower-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ snafu = { workspace = true }
tower-package = { workspace = true }
tower-telemetry = { workspace = true }
tower-uv = { workspace = true }

[dev-dependencies]
config = { workspace = true }
3 changes: 3 additions & 0 deletions crates/tower-runtime/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ impl From<tower_uv::Error> for Error {
tower_uv::Error::NotFound(_) => Error::SpawnFailed,
tower_uv::Error::PermissionDenied(_) => Error::SpawnFailed,
tower_uv::Error::Other(_) => Error::SpawnFailed,
tower_uv::Error::MissingPyprojectToml => Error::SpawnFailed,
Copy link

Copilot AI Jul 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation is inconsistent with the surrounding code. The line should be indented to align with the other match arms above it.

Suggested change
tower_uv::Error::MissingPyprojectToml => Error::SpawnFailed,
tower_uv::Error::MissingPyprojectToml => Error::SpawnFailed,

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this one? Maybe there's something messed up in my local enviro config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it just wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be to me? Thought maybe it was some hidden whitespace issue perhaps.

tower_uv::Error::InvalidUv => Error::SpawnFailed,
tower_uv::Error::UnsupportedPlatform => Error::UnsupportedPlatform,
}
}
}
25 changes: 5 additions & 20 deletions crates/tower-runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::path::PathBuf;
use std::future::Future;
use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{
UnboundedReceiver,
UnboundedSender,
unbounded_channel,
};
use chrono::{DateTime, Utc};

Expand Down Expand Up @@ -41,21 +38,17 @@ pub struct Output {
pub line: String,
}

#[derive(Copy, Clone)]
#[derive(Copy, Clone, PartialEq)]
pub enum Status {
None,
Running,
Exited,
Crashed { code: i32 },
}

type SharedReceiver<T> = Arc<Mutex<UnboundedReceiver<T>>>;

type SharedSender<T> = Arc<Mutex<UnboundedSender<T>>>;

pub type OutputReceiver = SharedReceiver<Output>;
pub type OutputReceiver = UnboundedReceiver<Output>;

pub type OutputSender = SharedSender<Output>;
pub type OutputSender = UnboundedSender<Output>;

pub trait App {
// start will start the process
Expand All @@ -81,14 +74,6 @@ impl<A: App> std::default::Default for AppLauncher<A> {
}
}

pub fn create_output_stream() -> (OutputSender, OutputReceiver) {
let (sender, receiver) = unbounded_channel::<Output>();

let output_sender = Arc::new(Mutex::new(sender));
let output_receiver = Arc::new(Mutex::new(receiver));
(output_sender, output_receiver)
}

impl<A: App> AppLauncher<A> {
pub async fn launch(
&mut self,
Expand All @@ -104,7 +89,7 @@ impl<A: App> AppLauncher<A> {

let opts = StartOptions {
ctx,
output_sender: Some(output_sender),
output_sender,
cwd: Some(cwd),
environment,
secrets,
Expand Down Expand Up @@ -152,7 +137,7 @@ pub struct StartOptions {
pub secrets: HashMap<String, String>,
pub parameters: HashMap<String, String>,
pub env_vars: HashMap<String, String>,
pub output_sender: Option<OutputSender>,
pub output_sender: OutputSender,
}

pub struct ExecuteOptions {
Expand Down
70 changes: 51 additions & 19 deletions crates/tower-runtime/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,28 +186,65 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender<i32>, cancel_
// Now we also need to find the program to execute.
let program_path = working_dir.join(&manifest.invoke);

// Check once more if the process was cancelled before we do a uv sync. The sync itself,
// once started, will take a while and we have logic for checking for cancellation.
// Quickly do a check to see if there was a cancellation before we do a subprocess spawn to
// ensure everything is in place.
if cancel_token.is_cancelled() {
// again tell any waiters that we cancelled.
let _ = sx.send(-1);
return Err(Error::Cancelled);
}

let mut child = uv.sync(&working_dir, &env_vars).await?;
let mut child = uv.venv(&working_dir, &env_vars).await?;

// Drain the logs to the output channel.
if let Some(ref sender) = opts.output_sender {
let stdout = child.stdout.take().expect("no stdout");
tokio::spawn(drain_output(FD::Stdout, Channel::Setup, sender.clone(), BufReader::new(stdout)));
let stdout = child.stdout.take().expect("no stdout");
tokio::spawn(drain_output(FD::Stdout, Channel::Setup, opts.output_sender.clone(), BufReader::new(stdout)));

let stderr = child.stderr.take().expect("no stderr");
tokio::spawn(drain_output(FD::Stderr, Channel::Setup, sender.clone(), BufReader::new(stderr)));
}
let stderr = child.stderr.take().expect("no stderr");
tokio::spawn(drain_output(FD::Stderr, Channel::Setup, opts.output_sender.clone(), BufReader::new(stderr)));

// Let's wait for the setup to finish. We don't care about the results.
// Wait for venv to finish up.
wait_for_process(ctx.clone(), &cancel_token, child).await;

// Check once more if the process was cancelled before we do a uv sync. The sync itself,
// once started, will take a while and we have logic for checking for cancellation.
if cancel_token.is_cancelled() {
// again tell any waiters that we cancelled.
let _ = sx.send(-1);
return Err(Error::Cancelled);
}

match uv.sync(&working_dir, &env_vars).await {
Err(e) => {
// If we were missing a pyproject.toml, then that's fine for us--we'll just
// continue execution.
//
// Note that we do a match here instead of an if. That's because of the way
// tower_uv::Error is implemented. Namely, it doesn't implement PartialEq and can't
// do so due to it's dependency on std::io::Error.
match e {
tower_uv::Error::MissingPyprojectToml => {
debug!(ctx: &ctx, "no pyproject.toml found, continuing without sync");
},
_ => {
// If we got any other error, we want to return it.
return Err(e.into());
}
}
},
Ok(mut child) => {
// Drain the logs to the output channel.
let stdout = child.stdout.take().expect("no stdout");
tokio::spawn(drain_output(FD::Stdout, Channel::Setup, opts.output_sender.clone(), BufReader::new(stdout)));

let stderr = child.stderr.take().expect("no stderr");
tokio::spawn(drain_output(FD::Stderr, Channel::Setup, opts.output_sender.clone(), BufReader::new(stderr)));

// Let's wait for the setup to finish. We don't care about the results.
wait_for_process(ctx.clone(), &cancel_token, child).await;
}
}

// Check once more to see if the process was cancelled, this will bail us out early.
if cancel_token.is_cancelled() {
// if there's a waiter, we want them to know that the process was cancelled so we have
Expand All @@ -219,13 +256,11 @@ async fn execute_local_app(opts: StartOptions, sx: oneshot::Sender<i32>, cancel_
let mut child = uv.run(&working_dir, &program_path, &env_vars).await?;

// Drain the logs to the output channel.
if let Some(ref sender) = opts.output_sender {
let stdout = child.stdout.take().expect("no stdout");
tokio::spawn(drain_output(FD::Stdout, Channel::Program, sender.clone(), BufReader::new(stdout)));
let stdout = child.stdout.take().expect("no stdout");
tokio::spawn(drain_output(FD::Stdout, Channel::Program, opts.output_sender.clone(), BufReader::new(stdout)));

let stderr = child.stderr.take().expect("no stderr");
tokio::spawn(drain_output(FD::Stderr, Channel::Program, sender.clone(), BufReader::new(stderr)));
}
let stderr = child.stderr.take().expect("no stderr");
tokio::spawn(drain_output(FD::Stderr, Channel::Program, opts.output_sender.clone(), BufReader::new(stderr)));

let _ = sx.send(wait_for_process(ctx.clone(), &cancel_token, child).await);
}
Expand Down Expand Up @@ -410,8 +445,6 @@ async fn drain_output<R: AsyncRead + Unpin>(fd: FD, channel: Channel, output: Ou
let mut lines = input.lines();

while let Some(line) = lines.next_line().await.expect("line iteration fialed") {
let output = output.lock().await;

let _ = output.send(Output{
channel,
fd,
Expand All @@ -421,7 +454,6 @@ async fn drain_output<R: AsyncRead + Unpin>(fd: FD, channel: Channel, output: Ou
}
}


fn is_bash_package(package: &Package) -> bool {
return package.manifest.invoke.ends_with(".sh")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[app]
name = "01-hello-world"
script = "./task.py"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
for i in range(0, 5):
print("Hello, world!")
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[app]
name = "02-use-faker"
script = "./main.py"
9 changes: 9 additions & 0 deletions crates/tower-runtime/tests/example-apps/02-use-faker/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from faker import Faker

def main():
fake = Faker()
print(fake.name())


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[project]
name = "02-get-current-ip"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"faker>=37.4.2",
]
35 changes: 35 additions & 0 deletions crates/tower-runtime/tests/example-apps/02-use-faker/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[app]
name = "03-legacy-app"
script = "task.py"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
dlt[snowflake,filesystem]==1.4.0
pandas
2 changes: 2 additions & 0 deletions crates/tower-runtime/tests/example-apps/03-legacy-app/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import dlt
print(dlt.version.__version__)
Loading