@@ -8,7 +8,11 @@ use tower_package::{Package, PackageSpec};
88use tower_runtime:: { local:: LocalApp , App , AppLauncher , OutputReceiver , Status } ;
99use tower_telemetry:: { debug, Context } ;
1010
11- use tokio:: sync:: { mpsc:: unbounded_channel, oneshot} ;
11+ use tokio:: sync:: {
12+ mpsc:: { unbounded_channel, Receiver as MpscReceiver } ,
13+ oneshot:: { self , Receiver as OneshotReceiver } ,
14+ } ;
15+ use tokio:: time:: { sleep, timeout, Duration } ;
1216
1317use crate :: { api, output, util:: dates} ;
1418
@@ -243,8 +247,50 @@ pub async fn do_run_remote(
243247 }
244248}
245249
250+ async fn stream_logs_until_complete (
251+ mut log_stream : MpscReceiver < api:: LogStreamEvent > ,
252+ mut run_complete : OneshotReceiver < Run > ,
253+ enable_ctrl_c : bool ,
254+ run_link : & str ,
255+ ) -> Result < Option < Run > , Error > {
256+ loop {
257+ tokio:: select! {
258+ event = log_stream. recv( ) => match event {
259+ Some ( api:: LogStreamEvent :: EventLog ( log) ) => {
260+ output:: remote_log_event( & log) ;
261+ } ,
262+ None => return Ok ( None ) ,
263+ _ => { } ,
264+ } ,
265+ res = & mut run_complete => {
266+ let completed_run = res?;
267+ drain_remaining_logs( log_stream) . await ;
268+ return Ok ( Some ( completed_run) ) ;
269+ } ,
270+ _ = tokio:: signal:: ctrl_c( ) , if enable_ctrl_c => {
271+ output:: write( "Received Ctrl+C, stopping log streaming...\n " ) ;
272+ output:: write( "Note: The run will continue in Tower cloud\n " ) ;
273+ output:: write( & format!( " See more: {}\n " , run_link) ) ;
274+ return Ok ( None ) ;
275+ } ,
276+ }
277+ }
278+ }
279+
280+ async fn drain_remaining_logs ( mut log_stream : MpscReceiver < api:: LogStreamEvent > ) {
281+ let drain_duration = Duration :: from_secs ( 5 ) ;
282+ let _ = timeout ( drain_duration, async {
283+ while let Some ( event) = log_stream. recv ( ) . await {
284+ if let api:: LogStreamEvent :: EventLog ( log) = event {
285+ output:: remote_log_event ( & log) ;
286+ }
287+ }
288+ } )
289+ . await ;
290+ }
291+
246292async fn do_follow_run ( config : Config , run : & Run ) -> Result < ( ) , Error > {
247- let enable_ctrl_c = !output:: get_output_mode ( ) . is_mcp ( ) ; // Disable Ctrl+C in MCP mode
293+ let enable_ctrl_c = !output:: get_output_mode ( ) . is_mcp ( ) ;
248294 let mut spinner = output:: spinner ( "Waiting for run to start..." ) ;
249295 match wait_for_run_start ( & config, & run) . await {
250296 Err ( err) => {
@@ -257,35 +303,24 @@ async fn do_follow_run(config: Config, run: &Run) -> Result<(), Error> {
257303
258304 // We do this here, explicitly, to not double-monitor our API via the
259305 // `wait_for_run_start` function above.
260- let mut run_complete = monitor_run_completion ( & config, run) ;
306+ let run_complete = monitor_run_completion ( & config, run) ;
261307
262308 // Now we follow the logs from the run. We can stream them from the cloud to here using
263309 // the stream_logs API endpoint.
264310 match api:: stream_run_logs ( & config, & run. app_name , run. number ) . await {
265- Ok ( mut output) => loop {
266- let should_exit = tokio:: select! {
267- Some ( event) = output. recv( ) => {
268- if let api:: LogStreamEvent :: EventLog ( log) = & event {
269- let ts = dates:: format_str( & log. reported_at) ;
270- output:: log_line( & ts, & log. content, output:: LogLineType :: Remote ) ;
271- }
272- false
273- } ,
274- res = & mut run_complete => {
275- handle_run_completion( res) ?;
276- true
277- } ,
278- _ = tokio:: signal:: ctrl_c( ) , if enable_ctrl_c => {
279- output:: write( "Received Ctrl+C, stopping log streaming...\n " ) ;
280- output:: write( "Note: The run will continue in Tower cloud\n " ) ;
281- output:: write( & format!( " See more: {}\n " , run. dollar_link) ) ;
282- true
283- } ,
284- } ;
285- if should_exit {
286- break ;
311+ Ok ( log_stream) => {
312+ let completed_run = stream_logs_until_complete (
313+ log_stream,
314+ run_complete,
315+ enable_ctrl_c,
316+ & run. dollar_link ,
317+ )
318+ . await ?;
319+
320+ if let Some ( run) = completed_run {
321+ handle_run_completion ( Ok ( run) ) ?;
287322 }
288- } ,
323+ }
289324 Err ( err) => {
290325 output:: error ( & format ! ( "Failed to stream run logs: {:?}" , err) ) ;
291326 return Err ( Error :: LogStreamFailed ) ;
@@ -297,9 +332,7 @@ async fn do_follow_run(config: Config, run: &Run) -> Result<(), Error> {
297332 Ok ( ( ) )
298333}
299334
300- fn handle_run_completion (
301- res : Result < Run , tokio:: sync:: oneshot:: error:: RecvError > ,
302- ) -> Result < ( ) , Error > {
335+ fn handle_run_completion ( res : Result < Run , oneshot:: error:: RecvError > ) -> Result < ( ) , Error > {
303336 match res {
304337 Ok ( completed_run) => match completed_run. status {
305338 tower_api:: models:: run:: Status :: Errored => {
@@ -540,7 +573,7 @@ async fn monitor_status(app: LocalApp) -> Status {
540573 ) ;
541574 return tower_runtime:: Status :: Running ; // Return a default status for timeout
542575 }
543- tokio :: time :: sleep ( std :: time :: Duration :: from_millis ( 100 ) ) . await ;
576+ sleep ( Duration :: from_millis ( 100 ) ) . await ;
544577 continue ;
545578 }
546579 }
@@ -556,7 +589,7 @@ async fn monitor_status(app: LocalApp) -> Status {
556589 output:: error ( "Failed to get app status after timeout" ) ;
557590 return tower_runtime:: Status :: Running ; // Return a default status for timeout
558591 }
559- tokio :: time :: sleep ( std :: time :: Duration :: from_millis ( 100 ) ) . await ;
592+ sleep ( Duration :: from_millis ( 100 ) ) . await ;
560593 }
561594 }
562595 }
@@ -585,7 +618,7 @@ async fn wait_for_run_start(config: &Config, run: &Run) -> Result<(), Error> {
585618 break ;
586619 } else {
587620 // Wait half a second to to try again.
588- tokio :: time :: sleep ( tokio :: time :: Duration :: from_millis ( 500 ) ) . await ;
621+ sleep ( Duration :: from_millis ( 500 ) ) . await ;
589622 }
590623 }
591624
@@ -602,7 +635,7 @@ async fn wait_for_run_completion(config: &Config, run: &Run) -> Result<Run, Erro
602635 return Ok ( res. run ) ;
603636 } else {
604637 // Wait half a second to to try again.
605- tokio :: time :: sleep ( tokio :: time :: Duration :: from_millis ( 500 ) ) . await ;
638+ sleep ( Duration :: from_millis ( 500 ) ) . await ;
606639 }
607640 }
608641}
0 commit comments