Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 129 additions & 10 deletions launchdarkly-server-sdk/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,29 @@ impl DataSource for StreamingDataSource {
}
}
},
Some(Err(es::Error::Eof)) => {
// Clean end-of-stream from the server;
// routine for streams that rotate connections.
debug!("event stream eof, will reconnect");
continue;
},
Some(Err(e)) => {
match e {
es::Error::Eof => {
continue;
}
_ => {
error!("unhandled error on event stream: {e:?}");
break;
}
}
// Keep polling. The eventsource-client will
// either reconnect on the next poll or end
// the stream (we'll observe that as `None`
// on the next iteration). Breaking here
// would drop this task and leave the data
// source silently dysfunctional.
warn!("error on event stream, will keep polling: {e:?}");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might change the language here. Customers will think polling means non-streaming.

continue;
},
None => {
error!("unexpected end of event stream");
// The stream is exhausted only when the
// eventsource-client has given up reconnecting.
// Signal initialization failure so the caller
// can observe the permanent failure.
error!("event stream closed permanently");
notify_init.call_once(|| (init_complete)(false));
break;
}
};
Expand Down Expand Up @@ -433,6 +443,115 @@ mod tests {
mock.assert()
}

// When the SSE stream returns a non-Eof error, the streaming data
// source should keep polling so the eventsource-client can reconnect.
// A naive `break` would drop the spawned task and leave the data
// source silently dysfunctional.
#[tokio::test(flavor = "multi_thread")]
async fn streaming_source_recovers_from_non_eof_stream_error() {
let mut server = mockito::Server::new_async().await;

// Bytes 0xFF 0xFE form an invalid UTF-8 sequence; the eventsource
// parser rejects the line as `Error::InvalidLine`, which exercises
// the catch-all error arm of the fetch loop.
let invalid_body: &[u8] = b"\xff\xfe:bad\n\n";
let mock = server
.mock("GET", "/all")
.with_status(200)
.with_body(invalid_body)
.expect_at_least(2)
.create_async()
.await;

let (shutdown_tx, _) = broadcast::channel::<()>(1);
let init_outcome = Arc::new(Mutex::new(None::<bool>));

let streaming = StreamingDataSource::new(
&server.url(),
"sdk-key",
Duration::from_millis(10),
&None,
launchdarkly_sdk_transport::HyperTransport::new().expect("Failed to create transport"),
)
.unwrap();

let data_store = Arc::new(RwLock::new(InMemoryDataStore::new()));

let init_outcome_clone = init_outcome.clone();
streaming.subscribe(
data_store,
Arc::new(move |success| {
*init_outcome_clone.lock().unwrap() = Some(success);
}),
shutdown_tx.subscribe(),
);

// Wait long enough for the initial request, the parser error, and
// several reconnect attempts (initial_reconnect_delay is 10ms).
tokio::time::sleep(Duration::from_millis(500)).await;
let _ = shutdown_tx.send(());

// While the eventsource-client is still actively reconnecting, the
// SDK should treat the situation as transient and not signal init
// failure. (Permanent failure is exercised by the 401 test.)
assert_eq!(
*init_outcome.lock().unwrap(),
None,
"init_complete should not be called while the stream is still retrying"
);
mock.assert();
}

// An unrecoverable HTTP status (e.g. 401) should stop the streaming
// data source from making further requests and signal init failure
// to the caller, rather than retrying indefinitely.
#[tokio::test(flavor = "multi_thread")]
async fn streaming_source_stops_on_unrecoverable_http_status() {
let mut server = mockito::Server::new_async().await;

let mock = server
.mock("GET", "/all")
.with_status(401)
.expect(1)
.create_async()
.await;

let (shutdown_tx, _) = broadcast::channel::<()>(1);
let init_outcome = Arc::new(Mutex::new(None::<bool>));

let streaming = StreamingDataSource::new(
&server.url(),
"sdk-key",
Duration::from_millis(10),
&None,
launchdarkly_sdk_transport::HyperTransport::new().expect("Failed to create transport"),
)
.unwrap();

let data_store = Arc::new(RwLock::new(InMemoryDataStore::new()));

let init_outcome_clone = init_outcome.clone();
streaming.subscribe(
data_store,
Arc::new(move |success| {
*init_outcome_clone.lock().unwrap() = Some(success);
}),
shutdown_tx.subscribe(),
);

// Wait long enough that any reconnect attempts would have happened
// (initial_reconnect_delay is 10ms).
tokio::time::sleep(Duration::from_millis(500)).await;
let _ = shutdown_tx.send(());

assert_eq!(
*init_outcome.lock().unwrap(),
Some(false),
"expected init_complete(false) on unrecoverable HTTP status"
);
mock.assert();
}

#[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
#[test_case(None, Matcher::Missing)]
#[tokio::test(flavor = "multi_thread")]
Expand Down
Loading