diff --git a/launchdarkly-server-sdk/src/data_source.rs b/launchdarkly-server-sdk/src/data_source.rs index 47b5c88..aae4f88 100644 --- a/launchdarkly-server-sdk/src/data_source.rs +++ b/launchdarkly-server-sdk/src/data_source.rs @@ -142,19 +142,29 @@ impl DataSource for StreamingDataSource { } } }, + Some(Err(es::Error::Eof)) => { + // Clean end-of-stream from the server; + // routine for streams that rotate connections. + debug!("event stream eof, will reconnect"); + continue; + }, Some(Err(e)) => { - match e { - es::Error::Eof => { - continue; - } - _ => { - error!("unhandled error on event stream: {e:?}"); - break; - } - } + // Keep polling. The eventsource-client will + // either reconnect on the next poll or end + // the stream (we'll observe that as `None` + // on the next iteration). Breaking here + // would drop this task and leave the data + // source silently dysfunctional. + warn!("error on event stream, will keep polling: {e:?}"); + continue; }, None => { - error!("unexpected end of event stream"); + // The stream is exhausted only when the + // eventsource-client has given up reconnecting. + // Signal initialization failure so the caller + // can observe the permanent failure. + error!("event stream closed permanently"); + notify_init.call_once(|| (init_complete)(false)); break; } }; @@ -433,6 +443,115 @@ mod tests { mock.assert() } + // When the SSE stream returns a non-Eof error, the streaming data + // source should keep polling so the eventsource-client can reconnect. + // A naive `break` would drop the spawned task and leave the data + // source silently dysfunctional. + #[tokio::test(flavor = "multi_thread")] + async fn streaming_source_recovers_from_non_eof_stream_error() { + let mut server = mockito::Server::new_async().await; + + // Bytes 0xFF 0xFE form an invalid UTF-8 sequence; the eventsource + // parser rejects the line as `Error::InvalidLine`, which exercises + // the catch-all error arm of the fetch loop. + let invalid_body: &[u8] = b"\xff\xfe:bad\n\n"; + let mock = server + .mock("GET", "/all") + .with_status(200) + .with_body(invalid_body) + .expect_at_least(2) + .create_async() + .await; + + let (shutdown_tx, _) = broadcast::channel::<()>(1); + let init_outcome = Arc::new(Mutex::new(None::)); + + let streaming = StreamingDataSource::new( + &server.url(), + "sdk-key", + Duration::from_millis(10), + &None, + launchdarkly_sdk_transport::HyperTransport::new().expect("Failed to create transport"), + ) + .unwrap(); + + let data_store = Arc::new(RwLock::new(InMemoryDataStore::new())); + + let init_outcome_clone = init_outcome.clone(); + streaming.subscribe( + data_store, + Arc::new(move |success| { + *init_outcome_clone.lock().unwrap() = Some(success); + }), + shutdown_tx.subscribe(), + ); + + // Wait long enough for the initial request, the parser error, and + // several reconnect attempts (initial_reconnect_delay is 10ms). + tokio::time::sleep(Duration::from_millis(500)).await; + let _ = shutdown_tx.send(()); + + // While the eventsource-client is still actively reconnecting, the + // SDK should treat the situation as transient and not signal init + // failure. (Permanent failure is exercised by the 401 test.) + assert_eq!( + *init_outcome.lock().unwrap(), + None, + "init_complete should not be called while the stream is still retrying" + ); + mock.assert(); + } + + // An unrecoverable HTTP status (e.g. 401) should stop the streaming + // data source from making further requests and signal init failure + // to the caller, rather than retrying indefinitely. + #[tokio::test(flavor = "multi_thread")] + async fn streaming_source_stops_on_unrecoverable_http_status() { + let mut server = mockito::Server::new_async().await; + + let mock = server + .mock("GET", "/all") + .with_status(401) + .expect(1) + .create_async() + .await; + + let (shutdown_tx, _) = broadcast::channel::<()>(1); + let init_outcome = Arc::new(Mutex::new(None::)); + + let streaming = StreamingDataSource::new( + &server.url(), + "sdk-key", + Duration::from_millis(10), + &None, + launchdarkly_sdk_transport::HyperTransport::new().expect("Failed to create transport"), + ) + .unwrap(); + + let data_store = Arc::new(RwLock::new(InMemoryDataStore::new())); + + let init_outcome_clone = init_outcome.clone(); + streaming.subscribe( + data_store, + Arc::new(move |success| { + *init_outcome_clone.lock().unwrap() = Some(success); + }), + shutdown_tx.subscribe(), + ); + + // Wait long enough that any reconnect attempts would have happened + // (initial_reconnect_delay is 10ms). + tokio::time::sleep(Duration::from_millis(500)).await; + let _ = shutdown_tx.send(()); + + assert_eq!( + *init_outcome.lock().unwrap(), + Some(false), + "expected init_complete(false) on unrecoverable HTTP status" + ); + mock.assert(); + } + #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")] #[test_case(None, Matcher::Missing)] #[tokio::test(flavor = "multi_thread")]