@@ -3,6 +3,7 @@ import 'dart:convert' as convert;
33import 'dart:io' ;
44
55import 'package:http/http.dart' as http;
6+ import 'package:powersync/src/abort_controller.dart' ;
67import 'package:powersync/src/exceptions.dart' ;
78import 'package:powersync/src/log_internal.dart' ;
89
@@ -39,6 +40,8 @@ class StreamingSyncImplementation {
3940
4041 SyncStatus lastStatus = const SyncStatus ();
4142
43+ AbortController ? _abort;
44+
4245 StreamingSyncImplementation (
4346 {required this .adapter,
4447 required this .credentialsCallback,
@@ -50,34 +53,66 @@ class StreamingSyncImplementation {
5053 statusStream = _statusStreamController.stream;
5154 }
5255
56+ /// Close any active streams.
57+ Future <void > abort () async {
58+ // If streamingSync() hasn't been called yet, _abort will be null.
59+ var future = _abort? .abort ();
60+ // This immediately triggers a new iteration in the merged stream, allowing us
61+ // to break immediately.
62+ // However, we still need to close the underlying stream explicitly, otherwise
63+ // the break will wait for the next line of data received on the stream.
64+ _localPingController.add (null );
65+ // According to the documentation, the behavior is undefined when calling
66+ // close() while requests are pending. However, this is no other
67+ // known way to cancel open streams, and this appears to end the stream with
68+ // a consistent ClientException.
69+ _client.close ();
70+ // wait for completeAbort() to be called
71+ await future;
72+ }
73+
74+ bool get aborted {
75+ return _abort? .aborted ?? false ;
76+ }
77+
5378 Future <void > streamingSync () async {
54- crudLoop ();
55- var invalidCredentials = false ;
56- while (true ) {
57- _updateStatus (connecting: true );
58- try {
59- if (invalidCredentials && invalidCredentialsCallback != null ) {
60- // This may error. In that case it will be retried again on the next
61- // iteration.
62- await invalidCredentialsCallback !();
63- invalidCredentials = false ;
64- }
65- await streamingSyncIteration ();
66- // Continue immediately
67- } catch (e, stacktrace) {
68- final message = _syncErrorMessage (e);
69- isolateLogger.warning ('Sync error: $message ' , e, stacktrace);
70- invalidCredentials = true ;
79+ try {
80+ _abort = AbortController ();
81+ crudLoop ();
82+ var invalidCredentials = false ;
83+ while (! aborted) {
84+ _updateStatus (connecting: true );
85+ try {
86+ if (invalidCredentials && invalidCredentialsCallback != null ) {
87+ // This may error. In that case it will be retried again on the next
88+ // iteration.
89+ await invalidCredentialsCallback !();
90+ invalidCredentials = false ;
91+ }
92+ await streamingSyncIteration ();
93+ // Continue immediately
94+ } catch (e, stacktrace) {
95+ if (aborted && e is http.ClientException ) {
96+ // Explicit abort requested - ignore. Example error:
97+ // ClientException: Connection closed while receiving data, uri=http://localhost:8080/sync/stream
98+ return ;
99+ }
100+ final message = _syncErrorMessage (e);
101+ isolateLogger.warning ('Sync error: $message ' , e, stacktrace);
102+ invalidCredentials = true ;
71103
72- _updateStatus (
73- connected: false ,
74- connecting: true ,
75- downloading: false ,
76- downloadError: e);
104+ _updateStatus (
105+ connected: false ,
106+ connecting: true ,
107+ downloading: false ,
108+ downloadError: e);
77109
78- // On error, wait a little before retrying
79- await Future .delayed (retryDelay);
110+ // On error, wait a little before retrying
111+ await Future .delayed (retryDelay);
112+ }
80113 }
114+ } finally {
115+ _abort! .completeAbort ();
81116 }
82117 }
83118
@@ -204,6 +239,10 @@ class StreamingSyncImplementation {
204239 bool haveInvalidated = false ;
205240
206241 await for (var line in merged) {
242+ if (aborted) {
243+ break ;
244+ }
245+
207246 _updateStatus (connected: true , connecting: false );
208247 if (line is Checkpoint ) {
209248 targetCheckpoint = line;
@@ -348,6 +387,9 @@ class StreamingSyncImplementation {
348387
349388 // Note: The response stream is automatically closed when this loop errors
350389 await for (var line in ndjson (res.stream)) {
390+ if (aborted) {
391+ break ;
392+ }
351393 yield parseStreamingSyncLine (line as Map <String , dynamic >);
352394 }
353395 }
0 commit comments