From 2c8aca5d5ddab33c0decc45164da596ecb3b2fdb Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 8 May 2026 14:38:07 -0700 Subject: [PATCH 1/3] fix: keep StreamingDataSource alive on non-Eof stream errors The fetch loop matched any error other than `Eof` (and recoverable HTTP responses) with a catch-all `_ => break`, dropping the spawned task and leaving the data source silently dysfunctional. Because no one was polling the eventsource-client's stream anymore, its reconnect logic never ran. The `None` branch also broke without notifying the caller, so callers waiting on initialization had no signal that the stream had closed permanently. Replace the catch-all `break` with `continue` so the eventsource-client gets to reconnect on the next poll, and call `init_complete(false)` in the `None` branch so the caller can observe permanent stream closure. Adds `streaming_source_recovers_from_non_eof_stream_error` test -- sends invalid UTF-8 to trigger `Error::InvalidLine` from the parser and asserts the mock receives at least two requests. Pairs with launchdarkly/rust-eventsource-client#134, which fixes the matching state-transition gap on the eventsource-client side. Together they restore the reconnection contract: the eventsource-client owns reconnection, and the SDK keeps polling and trusts it. Closes launchdarkly/rust-server-sdk#116. --- launchdarkly-server-sdk/src/data_source.rs | 68 ++++++++++++++++++---- 1 file changed, 58 insertions(+), 10 deletions(-) diff --git a/launchdarkly-server-sdk/src/data_source.rs b/launchdarkly-server-sdk/src/data_source.rs index 47b5c88..7b92202 100644 --- a/launchdarkly-server-sdk/src/data_source.rs +++ b/launchdarkly-server-sdk/src/data_source.rs @@ -143,18 +143,19 @@ impl DataSource for StreamingDataSource { } }, Some(Err(e)) => { - match e { - es::Error::Eof => { - continue; - } - _ => { - error!("unhandled error on event stream: {e:?}"); - break; - } - } + // The eventsource-client owns reconnection; + // breaking here would drop this task and leave + // the data source silently dysfunctional. + warn!("recoverable error on event stream, will retry: {e:?}"); + continue; }, None => { - error!("unexpected end of event stream"); + // The stream is exhausted only when the + // eventsource-client has given up reconnecting. + // Signal initialization failure so the caller + // can observe the permanent failure. + error!("event stream closed permanently"); + notify_init.call_once(|| (init_complete)(false)); break; } }; @@ -433,6 +434,53 @@ mod tests { mock.assert() } + // When the SSE stream returns a non-Eof error, the streaming data + // source should keep polling so the eventsource-client can reconnect. + // A naive `break` would drop the spawned task and leave the data + // source silently dysfunctional. + #[tokio::test(flavor = "multi_thread")] + async fn streaming_source_recovers_from_non_eof_stream_error() { + let mut server = mockito::Server::new_async().await; + + // Bytes 0xFF 0xFE form an invalid UTF-8 sequence; the eventsource + // parser rejects the line as `Error::InvalidLine`, which exercises + // the catch-all error arm of the fetch loop. + let invalid_body: &[u8] = b"\xff\xfe:bad\n\n"; + let mock = server + .mock("GET", "/all") + .with_status(200) + .with_body(invalid_body) + .expect_at_least(2) + .create_async() + .await; + + let (shutdown_tx, _) = broadcast::channel::<()>(1); + + let streaming = StreamingDataSource::new( + &server.url(), + "sdk-key", + Duration::from_millis(10), + &None, + launchdarkly_sdk_transport::HyperTransport::new().expect("Failed to create transport"), + ) + .unwrap(); + + let data_store = Arc::new(RwLock::new(InMemoryDataStore::new())); + + streaming.subscribe( + data_store, + Arc::new(move |_success| {}), + shutdown_tx.subscribe(), + ); + + // Wait long enough for the initial request, the parser error, and + // several reconnect attempts (initial_reconnect_delay is 10ms). + tokio::time::sleep(Duration::from_millis(500)).await; + let _ = shutdown_tx.send(()); + + mock.assert(); + } + #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")] #[test_case(None, Matcher::Missing)] #[tokio::test(flavor = "multi_thread")] From e3ea4ac2dce1fe92977af68e0ec91572cb0a5e12 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 8 May 2026 14:45:19 -0700 Subject: [PATCH 2/3] test: pin StreamingDataSource shutdown on unrecoverable HTTP status The 401 path (and any other status that `is_http_error_recoverable` rejects) should stop the streaming fetch loop and signal init failure to the caller. The catch-all-error change in this PR sits directly next to that branch, so add a behavioral test exercising the 401 path end-to-end through the fetch loop -- existing coverage was only of the predicate function. --- launchdarkly-server-sdk/src/data_source.rs | 50 ++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/launchdarkly-server-sdk/src/data_source.rs b/launchdarkly-server-sdk/src/data_source.rs index 7b92202..61b10b5 100644 --- a/launchdarkly-server-sdk/src/data_source.rs +++ b/launchdarkly-server-sdk/src/data_source.rs @@ -481,6 +481,56 @@ mod tests { mock.assert(); } + // An unrecoverable HTTP status (e.g. 401) should stop the streaming + // data source from making further requests and signal init failure + // to the caller, rather than retrying indefinitely. + #[tokio::test(flavor = "multi_thread")] + async fn streaming_source_stops_on_unrecoverable_http_status() { + let mut server = mockito::Server::new_async().await; + + let mock = server + .mock("GET", "/all") + .with_status(401) + .expect(1) + .create_async() + .await; + + let (shutdown_tx, _) = broadcast::channel::<()>(1); + let init_outcome = Arc::new(Mutex::new(None::)); + + let streaming = StreamingDataSource::new( + &server.url(), + "sdk-key", + Duration::from_millis(10), + &None, + launchdarkly_sdk_transport::HyperTransport::new().expect("Failed to create transport"), + ) + .unwrap(); + + let data_store = Arc::new(RwLock::new(InMemoryDataStore::new())); + + let init_outcome_clone = init_outcome.clone(); + streaming.subscribe( + data_store, + Arc::new(move |success| { + *init_outcome_clone.lock().unwrap() = Some(success); + }), + shutdown_tx.subscribe(), + ); + + // Wait long enough that any reconnect attempts would have happened + // (initial_reconnect_delay is 10ms). + tokio::time::sleep(Duration::from_millis(500)).await; + let _ = shutdown_tx.send(()); + + assert_eq!( + *init_outcome.lock().unwrap(), + Some(false), + "expected init_complete(false) on unrecoverable HTTP status" + ); + mock.assert(); + } + #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")] #[test_case(None, Matcher::Missing)] #[tokio::test(flavor = "multi_thread")] From d9e5bd77ac52f74217835c982fecd0f0890a49bf Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 8 May 2026 15:34:38 -0700 Subject: [PATCH 3/3] test/log: split Eof out, reword catch-all, pin no-init mid-retry Three follow-ups from review of #168: * Routes `Error::Eof` to `debug!` so healthy stream rotations don't spam `warn!` lines on every reconnect. * Rewords the catch-all comment and log: some non-Eof errors converge via the eventsource-client's `StreamClosed` transition (observed as `None` on the next poll), not in-stream reconnect, so "will retry" was misleading. * Strengthens `streaming_source_recovers_from_non_eof_stream_error` with an `Arc>>` capture asserting `init_complete` is *not* called during the retry window. Permanent-failure init is already covered by `streaming_source_stops_on_unrecoverable_http_status`. --- launchdarkly-server-sdk/src/data_source.rs | 31 ++++++++++++++++++---- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/launchdarkly-server-sdk/src/data_source.rs b/launchdarkly-server-sdk/src/data_source.rs index 61b10b5..aae4f88 100644 --- a/launchdarkly-server-sdk/src/data_source.rs +++ b/launchdarkly-server-sdk/src/data_source.rs @@ -142,11 +142,20 @@ impl DataSource for StreamingDataSource { } } }, + Some(Err(es::Error::Eof)) => { + // Clean end-of-stream from the server; + // routine for streams that rotate connections. + debug!("event stream eof, will reconnect"); + continue; + }, Some(Err(e)) => { - // The eventsource-client owns reconnection; - // breaking here would drop this task and leave - // the data source silently dysfunctional. - warn!("recoverable error on event stream, will retry: {e:?}"); + // Keep polling. The eventsource-client will + // either reconnect on the next poll or end + // the stream (we'll observe that as `None` + // on the next iteration). Breaking here + // would drop this task and leave the data + // source silently dysfunctional. + warn!("error on event stream, will keep polling: {e:?}"); continue; }, None => { @@ -455,6 +464,7 @@ mod tests { .await; let (shutdown_tx, _) = broadcast::channel::<()>(1); + let init_outcome = Arc::new(Mutex::new(None::)); let streaming = StreamingDataSource::new( &server.url(), @@ -467,9 +477,12 @@ mod tests { let data_store = Arc::new(RwLock::new(InMemoryDataStore::new())); + let init_outcome_clone = init_outcome.clone(); streaming.subscribe( data_store, - Arc::new(move |_success| {}), + Arc::new(move |success| { + *init_outcome_clone.lock().unwrap() = Some(success); + }), shutdown_tx.subscribe(), ); @@ -478,6 +491,14 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; let _ = shutdown_tx.send(()); + // While the eventsource-client is still actively reconnecting, the + // SDK should treat the situation as transient and not signal init + // failure. (Permanent failure is exercised by the 401 test.) + assert_eq!( + *init_outcome.lock().unwrap(), + None, + "init_complete should not be called while the stream is still retrying" + ); mock.assert(); }