Skip to content

Commit fd79baf

Browse files
authored
Add new --follow mode for tower apps logs with resilient streaming and tests (#171)
* feat (cli): add follow mode for apps logs * fix(cli): improve follow retry handling * refactor: reset follow backoff and improve log errors * reset monitor failures and cancel follow watchers * harden apps logs follow and add regression tests for - deduplicate log lines across reconnects / might be an issue - add unit test for deduplication and out of order logs * fix linter issues * wait for run start before log streaming
1 parent f342526 commit fd79baf

6 files changed

Lines changed: 582 additions & 7 deletions

File tree

crates/tower-cmd/src/api.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,17 @@ pub enum LogStreamError {
367367
Unknown,
368368
}
369369

370+
impl std::fmt::Display for LogStreamError {
371+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372+
match self {
373+
LogStreamError::Reqwest(err) => write!(f, "{err}"),
374+
LogStreamError::Unknown => write!(f, "unknown log stream error"),
375+
}
376+
}
377+
}
378+
379+
impl std::error::Error for LogStreamError {}
380+
370381
impl From<reqwest_eventsource::CannotCloneRequestError> for LogStreamError {
371382
fn from(err: reqwest_eventsource::CannotCloneRequestError) -> Self {
372383
debug!("Failed to clone request {:?}", err);
@@ -399,10 +410,30 @@ async fn drain_run_logs_stream(mut source: EventSource, tx: mpsc::Sender<LogStre
399410
}
400411
"warning" => {
401412
let event_warning = serde_json::from_str(&message.data);
402-
if let Ok(event) = event_warning {
403-
tx.send(LogStreamEvent::EventWarning(event)).await.ok();
404-
} else {
405-
debug!("Failed to parse warning message: {:?}", message.data);
413+
match event_warning {
414+
Ok(event) => {
415+
tx.send(LogStreamEvent::EventWarning(event)).await.ok();
416+
}
417+
Err(err) => {
418+
let warning_data = serde_json::from_str(&message.data);
419+
match warning_data {
420+
Ok(data) => {
421+
let event = tower_api::models::EventWarning {
422+
data,
423+
event: tower_api::models::event_warning::Event::Warning,
424+
id: None,
425+
retry: None,
426+
};
427+
tx.send(LogStreamEvent::EventWarning(event)).await.ok();
428+
}
429+
Err(_) => {
430+
debug!(
431+
"Failed to parse warning message: {:?}. Error: {}",
432+
message.data, err
433+
);
434+
}
435+
}
436+
}
406437
}
407438
}
408439
_ => {

0 commit comments

Comments
 (0)