@@ -68,6 +68,7 @@ class StreamingSyncImplementation implements StreamingSync {
6868 bool _safeToClose = true ;
6969
7070 final Mutex syncMutex, crudMutex;
71+ Completer <void >? _activeCrudUpload;
7172
7273 final Map <String , String > _userAgentHeaders;
7374
@@ -135,7 +136,7 @@ class StreamingSyncImplementation implements StreamingSync {
135136 try {
136137 _abort = AbortController ();
137138 clientId = await adapter.getClientId ();
138- crudLoop ();
139+ _crudLoop ();
139140 var invalidCredentials = false ;
140141 while (! aborted) {
141142 _updateStatus (connecting: true );
@@ -176,8 +177,8 @@ class StreamingSyncImplementation implements StreamingSync {
176177 }
177178 }
178179
179- Future <void > crudLoop () async {
180- await uploadAllCrud ();
180+ Future <void > _crudLoop () async {
181+ await _uploadAllCrud ();
181182
182183 // Trigger a CRUD upload whenever the upstream trigger fires
183184 // as-well-as whenever the sync stream reconnects.
@@ -187,11 +188,13 @@ class StreamingSyncImplementation implements StreamingSync {
187188 // The stream here is closed on abort.
188189 await for (var _ in mergeStreams (
189190 [crudUpdateTriggerStream, _internalCrudTriggerController.stream])) {
190- await uploadAllCrud ();
191+ await _uploadAllCrud ();
191192 }
192193 }
193194
194- Future <void > uploadAllCrud () async {
195+ Future <void > _uploadAllCrud () {
196+ assert (_activeCrudUpload == null );
197+ final completer = _activeCrudUpload = Completer ();
195198 return crudMutex.lock (() async {
196199 // Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
197200 CrudEntry ? checkedCrudItem;
@@ -244,7 +247,11 @@ class StreamingSyncImplementation implements StreamingSync {
244247 _updateStatus (uploading: false );
245248 }
246249 }
247- }, timeout: retryDelay);
250+ }, timeout: retryDelay).whenComplete (() {
251+ assert (identical (_activeCrudUpload, completer));
252+ _activeCrudUpload = null ;
253+ completer.complete ();
254+ });
248255 }
249256
250257 Future <String > getWriteCheckpoint () async {
@@ -336,7 +343,7 @@ class StreamingSyncImplementation implements StreamingSync {
336343 return (initialRequests, localDescriptions);
337344 }
338345
339- Future <bool > streamingSyncIteration (
346+ Future <void > streamingSyncIteration (
340347 {AbortController ? abortController}) async {
341348 adapter.startSession ();
342349
@@ -379,51 +386,27 @@ class StreamingSyncImplementation implements StreamingSync {
379386 await adapter.removeBuckets ([...bucketsToDelete]);
380387 _updateStatus (downloading: true );
381388 case StreamingSyncCheckpointComplete ():
382- final result = await adapter.syncLocalDatabase (targetCheckpoint! );
383- if (! result.checkpointValid) {
384- // This means checksums failed. Start again with a new checkpoint.
385- // TODO: better back-off
386- // await new Promise((resolve) => setTimeout(resolve, 50));
387- return false ;
388- } else if (! result.ready) {
389- // Checksums valid, but need more data for a consistent checkpoint.
390- // Continue waiting.
391- } else {
392- appliedCheckpoint = targetCheckpoint;
393-
394- final now = DateTime .now ();
395- _updateStatus (
396- downloading: false ,
397- downloadError: _noError,
398- lastSyncedAt: now,
399- priorityStatusEntries: [
400- if (appliedCheckpoint.checksums.isNotEmpty)
401- (
402- hasSynced: true ,
403- lastSyncedAt: now,
404- priority: maxBy (
405- appliedCheckpoint.checksums
406- .map ((cs) => BucketPriority (cs.priority)),
407- (priority) => priority,
408- compare: BucketPriority .comparator,
409- )! ,
410- )
411- ],
412- );
389+ final result =
390+ await _applyCheckpoint (targetCheckpoint! , abortController);
391+ if (result.abort) {
392+ return ;
413393 }
414-
415394 validatedCheckpoint = targetCheckpoint;
395+ if (result.didApply) {
396+ appliedCheckpoint = targetCheckpoint;
397+ }
416398 case StreamingSyncCheckpointPartiallyComplete (: final bucketPriority):
417399 final result = await adapter.syncLocalDatabase (targetCheckpoint! ,
418400 forPriority: bucketPriority);
419401 if (! result.checkpointValid) {
420402 // This means checksums failed. Start again with a new checkpoint.
421403 // TODO: better back-off
422404 // await new Promise((resolve) => setTimeout(resolve, 50));
423- return false ;
405+ return ;
424406 } else if (! result.ready) {
425- // Checksums valid, but need more data for a consistent checkpoint.
426- // Continue waiting.
407+ // If we have pending uploads, we can't complete new checkpoints
408+ // outside of priority 0. We'll resolve this for a complete
409+ // checkpoint later.
427410 } else {
428411 _updateStatusForPriority ((
429412 priority: BucketPriority (bucketPriority),
@@ -494,22 +477,13 @@ class StreamingSyncImplementation implements StreamingSync {
494477 downloadError: _noError,
495478 lastSyncedAt: DateTime .now ());
496479 } else if (validatedCheckpoint == targetCheckpoint) {
497- final result = await adapter.syncLocalDatabase (targetCheckpoint! );
498- if (! result.checkpointValid) {
499- // This means checksums failed. Start again with a new checkpoint.
500- // TODO: better back-off
501- // await new Promise((resolve) => setTimeout(resolve, 50));
502- return false ;
503- } else if (! result.ready) {
504- // Checksums valid, but need more data for a consistent checkpoint.
505- // Continue waiting.
506- } else {
480+ final result =
481+ await _applyCheckpoint (targetCheckpoint! , abortController);
482+ if (result.abort) {
483+ return ;
484+ }
485+ if (result.didApply) {
507486 appliedCheckpoint = targetCheckpoint;
508-
509- _updateStatus (
510- downloading: false ,
511- downloadError: _noError,
512- lastSyncedAt: DateTime .now ());
513487 }
514488 }
515489 }
@@ -519,7 +493,65 @@ class StreamingSyncImplementation implements StreamingSync {
519493 break ;
520494 }
521495 }
522- return true ;
496+ }
497+
498+ Future <({bool abort, bool didApply})> _applyCheckpoint (
499+ Checkpoint targetCheckpoint, AbortController ? abortController) async {
500+ var result = await adapter.syncLocalDatabase (targetCheckpoint);
501+ final pendingUpload = _activeCrudUpload;
502+
503+ if (! result.checkpointValid) {
504+ // This means checksums failed. Start again with a new checkpoint.
505+ // TODO: better back-off
506+ // await new Promise((resolve) => setTimeout(resolve, 50));
507+ return const (abort: true , didApply: false );
508+ } else if (! result.ready && pendingUpload != null ) {
509+ // We have pending entries in the local upload queue or are waiting to
510+ // confirm a write checkpoint, which prevented this checkpoint from
511+ // applying. Wait for that to complete and try again.
512+ isolateLogger.fine ('Could not apply checkpoint due to local data. '
513+ 'Waiting for in-progress upload before retrying...' );
514+ await Future .any ([
515+ pendingUpload.future,
516+ if (abortController case final controller? ) controller.onAbort,
517+ ]);
518+
519+ if (abortController? .aborted == true ) {
520+ return const (abort: true , didApply: false );
521+ }
522+
523+ // Try again now that uploads have completed.
524+ result = await adapter.syncLocalDatabase (targetCheckpoint);
525+ }
526+
527+ if (result.checkpointValid && result.ready) {
528+ isolateLogger.fine ('validated checkpoint: $targetCheckpoint ' );
529+ final now = DateTime .now ();
530+ _updateStatus (
531+ downloading: false ,
532+ downloadError: _noError,
533+ lastSyncedAt: now,
534+ priorityStatusEntries: [
535+ if (targetCheckpoint.checksums.isNotEmpty)
536+ (
537+ hasSynced: true ,
538+ lastSyncedAt: now,
539+ priority: maxBy (
540+ targetCheckpoint.checksums
541+ .map ((cs) => BucketPriority (cs.priority)),
542+ (priority) => priority,
543+ compare: BucketPriority .comparator,
544+ )! ,
545+ )
546+ ],
547+ );
548+
549+ return const (abort: false , didApply: true );
550+ } else {
551+ isolateLogger.fine (
552+ 'Could not apply checkpoint. Waiting for next sync complete line' );
553+ return const (abort: false , didApply: false );
554+ }
523555 }
524556
525557 Stream <StreamingSyncLine > streamingSyncRequest (
0 commit comments