From dd9a047cb79603b7db9f71e609f6143f2395d981 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Thu, 4 Dec 2025 16:18:07 +0200 Subject: [PATCH 1/4] First run through of catching up on sync priorities and progress. --- .../Client/PowerSyncDatabase.cs | 55 ++++++++++- .../Stream/StreamingSyncImplementation.cs | 3 + .../PowerSync.Common/DB/Crud/CrudEntry.cs | 35 ++++++- .../PowerSync.Common/DB/Crud/SyncStatus.cs | 9 +- PowerSync/PowerSync.Common/DB/IDBAdapter.cs | 6 +- PowerSync/PowerSync.Common/DB/Schema/Table.cs | 96 ++++++++++++++++--- 6 files changed, 179 insertions(+), 25 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 9f17882..9e864ab 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -156,9 +156,36 @@ public async Task WaitForReady() await isReadyTask; } - public async Task WaitForFirstSync(CancellationToken? cancellationToken = null) + public class PrioritySyncRequest { - if (CurrentStatus.HasSynced == true) + public CancellationToken? Token { get; set; } + public int? Priority { get; set; } + } + + /// + /// Wait for the first sync operation to complete. + /// + /// + /// An object providing a cancellation token and a priority target. + /// When a priority target is set, the task may complete when all buckets with the given (or higher) + /// priorities have been synchronized. This can be earlier than a complete sync. + /// + /// A task which will complete once the first full sync has completed. + public async Task WaitForFirstSync(PrioritySyncRequest? request = null) + { + var priority = request?.Priority ?? null; + var cancellationToken = request?.Token ?? null; + + bool StatusMatches(SyncStatus status) + { + if (priority == null) + { + return status.HasSynced == true; + } + return status.StatusForPriority(priority.Value).HasSynced == true; + } + + if (StatusMatches(CurrentStatus)) { return; } @@ -166,11 +193,11 @@ public async Task WaitForFirstSync(CancellationToken? cancellationToken = null) var tcs = new TaskCompletionSource(); var cts = new CancellationTokenSource(); - var _ = Task.Run(() => + _ = Task.Run(() => { foreach (var update in Listen(cts.Token)) { - if (update.StatusChanged?.HasSynced == true) + if (update.StatusChanged != null && StatusMatches(update.StatusChanged!)) { cts.Cancel(); tcs.SetResult(true); @@ -227,7 +254,22 @@ private async Task LoadVersion() } } - private record LastSyncedResult(int? priority, string? last_synced_at); + // protected async resolveOfflineSyncStatus() { + // const result = await this.database.get<{ r: string }>('SELECT powersync_offline_sync_status() as r'); + // const parsed = JSON.parse(result.r) as CoreSyncStatus; + + // const updatedStatus = new SyncStatus({ + // ...this.currentStatus.toJSON(), + // ...coreStatusToJs(parsed) + // }); + + // if (!updatedStatus.isEqual(this.currentStatus)) { + // this.currentStatus = updatedStatus; + // this.iterateListeners((l) => l.statusChanged?.(this.currentStatus)); + // } + // } + // TODO DELETE + private record LastSyncedResult(int priority, string? last_synced_at); protected async Task UpdateHasSynced() { @@ -236,6 +278,7 @@ protected async Task UpdateHasSynced() ); DateTime? lastCompleteSync = null; + List priorityStatuses = []; // TODO: Will be altered/extended when reporting individual sync priority statuses is supported foreach (var result in results) @@ -262,6 +305,8 @@ protected async Task UpdateHasSynced() } } + + /// /// Replace the schema with a new version. This is for advanced use cases - typically the schema should just be specified once in the constructor. /// Cannot be used while connected - this should only be called before . diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index 62466cc..4cebd7c 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -1043,10 +1043,12 @@ protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptio Connected = options.Connected ?? SyncStatus.Connected, Connecting = !options.Connected.GetValueOrDefault() && (options.Connecting ?? SyncStatus.Connecting), LastSyncedAt = options.LastSyncedAt ?? SyncStatus.LastSyncedAt, + PriorityStatusEntries = options.PriorityStatusEntries ?? SyncStatus.PriorityStatusEntries, DataFlow = new SyncDataFlowStatus { Uploading = options.DataFlow?.Uploading ?? SyncStatus.DataFlowStatus.Uploading, Downloading = options.DataFlow?.Downloading ?? SyncStatus.DataFlowStatus.Downloading, + DownloadProgress = options.DataFlow?.DownloadProgress ?? SyncStatus.DataFlowStatus.DownloadProgress DownloadError = updateOptions?.ClearDownloadError == true ? null : options.DataFlow?.DownloadError ?? SyncStatus.DataFlowStatus.DownloadError, UploadError = updateOptions?.ClearUploadError == true ? null : options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError, } @@ -1088,6 +1090,7 @@ private async Task DelayRetry() } } + enum LockType { CRUD, diff --git a/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs b/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs index 5fe8f02..542c5f3 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs @@ -32,6 +32,9 @@ public class CrudEntryDataJSON [JsonProperty("data")] public Dictionary? Data { get; set; } + [JsonProperty("old")] + public Dictionary? Old { get; set; } + [JsonProperty("op")] public UpdateType Op { get; set; } @@ -40,6 +43,9 @@ public class CrudEntryDataJSON [JsonProperty("id")] public string Id { get; set; } = null!; + + [JsonProperty("metadata")] + public string? Metadata { get; set; } } public class CrudEntryOutputJSON @@ -63,7 +69,17 @@ public class CrudEntryOutputJSON public Dictionary? Data { get; set; } } -public class CrudEntry(int clientId, UpdateType op, string table, string id, long? transactionId = null, Dictionary? opData = null) + +public class CrudEntry( + int clientId, + UpdateType op, + string table, + string id, + long? transactionId = null, + Dictionary? opData = null, + Dictionary? previousValues = null, + string? metadata = null +) { public int ClientId { get; private set; } = clientId; public string Id { get; private set; } = id; @@ -72,6 +88,19 @@ public class CrudEntry(int clientId, UpdateType op, string table, string id, lon public string Table { get; private set; } = table; public long? TransactionId { get; private set; } = transactionId; + /// + /// Previous values before this change. + /// + public Dictionary? PreviousValues { get; private set; } = previousValues; + + /// + /// Client-side metadata attached with this write. + /// + /// This field is only available when the `trackMetadata` option was set to `true` when creating a table + /// and the insert or update statement set the `_metadata` column. + /// + public string? Metadata { get; private set; } = metadata; + public static CrudEntry FromRow(CrudEntryJSON dbRow) { var data = JsonConvert.DeserializeObject(dbRow.Data) @@ -83,7 +112,9 @@ public static CrudEntry FromRow(CrudEntryJSON dbRow) data.Type, data.Id, dbRow.TransactionId, - data.Data + data.Data, + data.Old, + data.Metadata ); } diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs index cdda36a..c882ee8 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs @@ -100,7 +100,7 @@ public class SyncStatus(SyncStatusOptions options) /// public SyncPriorityStatus[] PriorityStatusEntries => (Options.PriorityStatusEntries ?? []) - .OrderBy(entry => entry.Priority) + .OrderBy(x => x, Comparer.Create(ComparePriorities)) .ToArray(); /// @@ -154,7 +154,7 @@ public SyncPriorityStatus StatusForPriority(int priority) private string SerializeObject() { - return JsonConvert.SerializeObject(new { Options = Options, UploadErrorMessage = Options.DataFlow?.UploadError?.Message, DownloadErrorMessage = DataFlowStatus.DownloadError?.Message }); + return JsonConvert.SerializeObject(new { Options, UploadErrorMessage = Options.DataFlow?.UploadError?.Message, DownloadErrorMessage = DataFlowStatus.DownloadError?.Message }); } public bool IsEqual(SyncStatus status) @@ -173,4 +173,9 @@ public string ToJSON() { return SerializeObject(); } + + private static int ComparePriorities(SyncPriorityStatus a, SyncPriorityStatus b) + { + return b.Priority - a.Priority; // Reverse because higher priorities have lower numbers + } } \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs index b0c906b..a31ee49 100644 --- a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs +++ b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs @@ -30,13 +30,13 @@ public class QueryRows public interface IDBGetUtils { // Execute a read-only query and return results. - Task GetAll(string sql, params object[]? parameters); + Task GetAll(string sql, object[]? parameters = null); // Execute a read-only query and return the first result, or null if the ResultSet is empty. - Task GetOptional(string sql, params object[]? parameters); + Task GetOptional(string sql, object[]? parameters = null); // Execute a read-only query and return the first result, error if the ResultSet is empty. - Task Get(string sql, params object[]? parameters); + Task Get(string sql, object[]? parameters = null); } public interface ILockContext : IDBGetUtils diff --git a/PowerSync/PowerSync.Common/DB/Schema/Table.cs b/PowerSync/PowerSync.Common/DB/Schema/Table.cs index cd9d2b1..2476e79 100644 --- a/PowerSync/PowerSync.Common/DB/Schema/Table.cs +++ b/PowerSync/PowerSync.Common/DB/Schema/Table.cs @@ -7,15 +7,57 @@ public class TableOptions( Dictionary>? indexes = null, bool? localOnly = null, bool? insertOnly = null, - string? viewName = null) + string? viewName = null, + bool? trackMetadata = null, + TrackPreviousOptions? trackPreviousOptions = null, + bool? ignoreEmptyUpdates = null +) { public Dictionary> Indexes { get; set; } = indexes ?? []; - public bool LocalOnly { get; set; } = localOnly ?? false; + public bool LocalOnly { get; } = localOnly ?? false; - public bool InsertOnly { get; set; } = insertOnly ?? false; + public bool InsertOnly { get; } = insertOnly ?? false; - public string? ViewName { get; set; } = viewName; + public string? ViewName { get; } = viewName; + + /// + /// Whether to add a hidden `_metadata` column that will be enabled for updates to attach custom + /// information about writes that will be reported through [CrudEntry.metadata]. + /// + public bool TrackMetadata { get; } = trackMetadata ?? false; + + /// + /// When set to a non-null value, track old values of columns + /// + public TrackPreviousOptions? TrackPreviousOptions { get; } = trackPreviousOptions ?? null; + + /// + /// Whether an `UPDATE` statement that doesn't change any values should be ignored when creating + /// CRUD entries. + /// + public bool IgnoreEmptyUpdates { get; } = ignoreEmptyUpdates ?? false; +} + +/// +/// Whether to include previous column values when PowerSync tracks local changes. +/// Including old values may be helpful for some backend connector implementations, +/// which is why it can be enabled on a per-table or per-column basis. +/// +public class TrackPreviousOptions +{ + /// + /// When defined, a list of column names for which old values should be tracked. + /// + [JsonProperty("columns")] + public List? Columns { get; set; } + + /// + /// Whether to only include old values when they were changed by an update, instead of always + /// including all old values, + /// + [JsonProperty("onlyWhenChanged")] + public bool? OnlyWhenChanged { get; set; } } public class Table @@ -35,16 +77,21 @@ public Table(Dictionary columns, TableOptions? options = nul { ConvertedColumns = [.. columns.Select(kv => new Column(new ColumnOptions(kv.Key, kv.Value)))]; - ConvertedIndexes = [.. (Options?.Indexes ?? []) + ConvertedIndexes = + [ + .. (Options?.Indexes ?? []) .Select(kv => new Index(new IndexOptions( kv.Key, - [.. kv.Value.Select(name => - new IndexedColumn(new IndexColumnOptions( - name.Replace("-", ""), !name.StartsWith("-"))) - )] + [ + .. kv.Value.Select(name => + new IndexedColumn(new IndexColumnOptions( + name.Replace("-", ""), !name.StartsWith("-"))) + ) + ] )) - )]; + ) + ]; Options = options ?? new TableOptions(); @@ -61,7 +108,18 @@ public void Validate() if (Columns.Count > Column.MAX_AMOUNT_OF_COLUMNS) { - throw new Exception($"Table has too many columns. The maximum number of columns is {Column.MAX_AMOUNT_OF_COLUMNS}."); + throw new Exception( + $"Table has too many columns. The maximum number of columns is {Column.MAX_AMOUNT_OF_COLUMNS}."); + } + + if (Options.TrackMetadata && Options.LocalOnly) + { + throw new Exception("Can't include metadata for local-only tables."); + } + + if (Options.TrackPreviousOptions != null && Options.LocalOnly) + { + throw new Exception("Can't include old values for local-only tables."); } var columnNames = new HashSet { "id" }; @@ -103,15 +161,27 @@ public void Validate() public string ToJSON(string Name = "") { + var trackPrevious = Options.TrackPreviousOptions; + var jsonObject = new { view_name = Options.ViewName ?? Name, local_only = Options.LocalOnly, insert_only = Options.InsertOnly, columns = ConvertedColumns.Select(c => JsonConvert.DeserializeObject(c.ToJSON())).ToList(), - indexes = ConvertedIndexes.Select(e => JsonConvert.DeserializeObject(e.ToJSON(this))).ToList() + indexes = ConvertedIndexes.Select(e => JsonConvert.DeserializeObject(e.ToJSON(this))).ToList(), + + include_metadata = Options.TrackMetadata, + ignore_empty_update = Options.IgnoreEmptyUpdates, + include_old = (object)(trackPrevious switch + { + null => false, + { Columns: null } => true, + { Columns: var cols } => cols + }), + include_old_only_when_changed = trackPrevious?.OnlyWhenChanged ?? false }; return JsonConvert.SerializeObject(jsonObject); } -} +} \ No newline at end of file From 6cb2ce6d97653527e60a75f8c18abe2fe757d703 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 8 Dec 2025 10:14:51 +0200 Subject: [PATCH 2/4] Updated SyncStatus update handling from core instructions. --- .../Client/Sync/Stream/CoreInstructions.cs | 40 ++++++++++++++++++- .../Stream/StreamingSyncImplementation.cs | 36 +---------------- PowerSync/PowerSync.Common/DB/Schema/Table.cs | 4 +- 3 files changed, 43 insertions(+), 37 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs index 7bfef98..464ad24 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs @@ -1,5 +1,6 @@ using Newtonsoft.Json.Linq; using Newtonsoft.Json; +using PowerSync.Common.DB.Crud; namespace PowerSync.Common.Client.Sync.Stream; @@ -83,6 +84,7 @@ public class CoreSyncStatus [JsonProperty("downloading")] public DownloadProgress? Downloading { get; set; } + } public class SyncPriorityStatus @@ -126,4 +128,40 @@ public class FetchCredentials : Instruction public class CloseSyncStream : Instruction { } public class FlushFileSystem : Instruction { } -public class DidCompleteSync : Instruction { } \ No newline at end of file +public class DidCompleteSync : Instruction { } + +public class CoreInstructionHelpers +{ + public static DB.Crud.SyncPriorityStatus PriorityToStatus(SyncPriorityStatus status) + { + return new DB.Crud.SyncPriorityStatus + { + Priority = status.Priority, + HasSynced = status.HasSynced ?? null, + LastSyncedAt = status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null + }; + } + + public static DB.Crud.SyncStatusOptions CoreStatusToSyncStatus(CoreSyncStatus status) + { + var coreCompleteSync = + status.PriorityStatus.FirstOrDefault(s => s.Priority == SyncProgress.FULL_SYNC_PRIORITY); + var completeSync = coreCompleteSync != null ? PriorityToStatus(coreCompleteSync) : null; + + return new DB.Crud.SyncStatusOptions + { + Connected = status.Connected, + Connecting = status.Connecting, + DataFlow = new DB.Crud.SyncDataFlowStatus + { + // We expose downloading as a boolean field, the core extension reports download information as a nullable + // download status. When that status is non-null, a download is in progress. + Downloading = status.Downloading != null, + DownloadProgress = status.Downloading?.Buckets + }, + LastSyncedAt = completeSync?.LastSyncedAt, + HasSynced = completeSync?.HasSynced, + PriorityStatusEntries = status.PriorityStatus.Select(PriorityToStatus).ToArray() + }; + } +} \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index 4cebd7c..b1dea71 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -537,29 +537,7 @@ async Task HandleInstruction(Instruction instruction) } break; case UpdateSyncStatus syncStatus: - var info = syncStatus.Status; - var coreCompleteSync = - info.PriorityStatus.FirstOrDefault(s => s.Priority == SyncProgress.FULL_SYNC_PRIORITY); - var completeSync = coreCompleteSync != null ? CoreStatusToSyncStatus(coreCompleteSync) : null; - - UpdateSyncStatus(new SyncStatusOptions - { - Connected = info.Connected, - Connecting = info.Connecting, - LastSyncedAt = completeSync?.LastSyncedAt, - HasSynced = completeSync?.HasSynced, - PriorityStatusEntries = info.PriorityStatus.Select(CoreStatusToSyncStatus).ToArray(), - DataFlow = new SyncDataFlowStatus - { - Downloading = info.Downloading != null, - DownloadProgress = info.Downloading?.Buckets - } - }, - new UpdateSyncStatusOptions - { - ClearDownloadError = true, - } - ); + UpdateSyncStatus(CoreInstructionHelpers.CoreStatusToSyncStatus(syncStatus.Status)); break; case EstablishSyncStream establishSyncStream: if (receivingLines != null) @@ -1048,7 +1026,7 @@ protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptio { Uploading = options.DataFlow?.Uploading ?? SyncStatus.DataFlowStatus.Uploading, Downloading = options.DataFlow?.Downloading ?? SyncStatus.DataFlowStatus.Downloading, - DownloadProgress = options.DataFlow?.DownloadProgress ?? SyncStatus.DataFlowStatus.DownloadProgress + DownloadProgress = options.DataFlow?.DownloadProgress ?? SyncStatus.DataFlowStatus.DownloadProgress, DownloadError = updateOptions?.ClearDownloadError == true ? null : options.DataFlow?.DownloadError ?? SyncStatus.DataFlowStatus.DownloadError, UploadError = updateOptions?.ClearUploadError == true ? null : options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError, } @@ -1071,16 +1049,6 @@ protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptio } } - private static DB.Crud.SyncPriorityStatus CoreStatusToSyncStatus(SyncPriorityStatus status) - { - return new DB.Crud.SyncPriorityStatus - { - Priority = status.Priority, - HasSynced = status.HasSynced ?? null, - LastSyncedAt = status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null - }; - } - private async Task DelayRetry() { if (Options.RetryDelayMs.HasValue) diff --git a/PowerSync/PowerSync.Common/DB/Schema/Table.cs b/PowerSync/PowerSync.Common/DB/Schema/Table.cs index 2476e79..6a6ca22 100644 --- a/PowerSync/PowerSync.Common/DB/Schema/Table.cs +++ b/PowerSync/PowerSync.Common/DB/Schema/Table.cs @@ -15,9 +15,9 @@ public class TableOptions( { public Dictionary> Indexes { get; set; } = indexes ?? []; - public bool LocalOnly { get; } = localOnly ?? false; + public bool LocalOnly { get; set; } = localOnly ?? false; - public bool InsertOnly { get; } = insertOnly ?? false; + public bool InsertOnly { get; set; } = insertOnly ?? false; public string? ViewName { get; } = viewName; From 1a5da8b06aabfc25b8c9d6c9c36b11e5badbd8fd Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 8 Dec 2025 15:04:15 +0200 Subject: [PATCH 3/4] Added ResolveOfflineSyncStatus, bumped sqlite extension and minimum version. --- .../Client/PowerSyncDatabase.cs | 63 ++++--------------- .../Client/Sync/Stream/CoreInstructions.cs | 7 ++- .../Stream/StreamingSyncImplementation.cs | 2 +- .../PowerSync.Common/DB/Crud/SyncStatus.cs | 21 +++++++ .../SyncIntegrationTests.cs | 2 - Tools/Setup/Setup.cs | 5 +- 6 files changed, 45 insertions(+), 55 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 9e864ab..a47760f 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -219,7 +219,7 @@ protected async Task Initialize() await BucketStorageAdapter.Init(); await LoadVersion(); await UpdateSchema(schema); - await UpdateHasSynced(); + await ResolveOfflineSyncStatus(); await Database.Execute("PRAGMA RECURSIVE_TRIGGERS=TRUE"); Ready = true; Emit(new PowerSyncDBEvent { Initialized = true }); @@ -243,70 +243,33 @@ private async Task LoadVersion() catch (Exception e) { throw new Exception( - $"Unsupported PowerSync extension version. Need >=0.2.0 <1.0.0, got: {sdkVersion}. Details: {e.Message}" + $"Unsupported PowerSync extension version. Need >=0.4.10 <1.0.0, got: {sdkVersion}. Details: {e.Message}" ); } - // Validate version is >= 0.2.0 and < 1.0.0 - if (versionInts[0] != 0 || versionInts[1] < 2 || versionInts[2] < 0) + // Validate version is >= 0.4.10 and < 1.0.0 + if (versionInts[0] != 0 || versionInts[1] < 4 || (versionInts[1] == 4 && versionInts[2] < 10)) { - throw new Exception($"Unsupported PowerSync extension version. Need >=0.2.0 <1.0.0, got: {sdkVersion}"); + throw new Exception($"Unsupported PowerSync extension version. Need >=0.4.10 <1.0.0, got: {sdkVersion}"); } } - // protected async resolveOfflineSyncStatus() { - // const result = await this.database.get<{ r: string }>('SELECT powersync_offline_sync_status() as r'); - // const parsed = JSON.parse(result.r) as CoreSyncStatus; - - // const updatedStatus = new SyncStatus({ - // ...this.currentStatus.toJSON(), - // ...coreStatusToJs(parsed) - // }); - - // if (!updatedStatus.isEqual(this.currentStatus)) { - // this.currentStatus = updatedStatus; - // this.iterateListeners((l) => l.statusChanged?.(this.currentStatus)); - // } - // } - // TODO DELETE - private record LastSyncedResult(int priority, string? last_synced_at); - - protected async Task UpdateHasSynced() + private record OfflineSyncStatusResult(string r); + protected async Task ResolveOfflineSyncStatus() { - var results = await Database.GetAll( - "SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC" - ); - - DateTime? lastCompleteSync = null; - List priorityStatuses = []; + var result = await Database.Get("SELECT powersync_offline_sync_status() as r"); + var parsed = JsonConvert.DeserializeObject(result.r); - // TODO: Will be altered/extended when reporting individual sync priority statuses is supported - foreach (var result in results) - { - var parsedDate = DateTime.Parse(result.last_synced_at + "Z"); - - if (result.priority == FULL_SYNC_PRIORITY) - { - // This lowest-possible priority represents a complete sync. - lastCompleteSync = parsedDate; - } - } + var parsedSyncStatus = CoreInstructionHelpers.CoreStatusToSyncStatus(parsed!); + var updatedStatus = CurrentStatus.CreateUpdatedStatus(parsedSyncStatus); - var hasSynced = lastCompleteSync != null; - if (hasSynced != CurrentStatus.HasSynced) + if (!updatedStatus.IsEqual(CurrentStatus)) { - CurrentStatus = new SyncStatus(new SyncStatusOptions(CurrentStatus.Options) - { - HasSynced = hasSynced, - LastSyncedAt = lastCompleteSync, - }); - + CurrentStatus = updatedStatus; Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus }); } } - - /// /// Replace the schema with a new version. This is for advanced use cases - typically the schema should just be specified once in the constructor. /// Cannot be used while connected - this should only be called before . diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs index 464ad24..b27e77d 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs @@ -142,7 +142,12 @@ public static DB.Crud.SyncPriorityStatus PriorityToStatus(SyncPriorityStatus sta }; } - public static DB.Crud.SyncStatusOptions CoreStatusToSyncStatus(CoreSyncStatus status) + public static DB.Crud.SyncStatus CoreStatusToSyncStatus(CoreSyncStatus status) + { + return new DB.Crud.SyncStatus(CoreStatusToSyncStatusOptions(status)); + } + + public static DB.Crud.SyncStatusOptions CoreStatusToSyncStatusOptions(CoreSyncStatus status) { var coreCompleteSync = status.PriorityStatus.FirstOrDefault(s => s.Priority == SyncProgress.FULL_SYNC_PRIORITY); diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index b1dea71..ec9fa07 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -537,7 +537,7 @@ async Task HandleInstruction(Instruction instruction) } break; case UpdateSyncStatus syncStatus: - UpdateSyncStatus(CoreInstructionHelpers.CoreStatusToSyncStatus(syncStatus.Status)); + UpdateSyncStatus(CoreInstructionHelpers.CoreStatusToSyncStatusOptions(syncStatus.Status)); break; case EstablishSyncStream establishSyncStream: if (receivingLines != null) diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs index c882ee8..43da269 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs @@ -152,6 +152,27 @@ public SyncPriorityStatus StatusForPriority(int priority) }; } + /// + /// Creates an updated SyncStatus by merging the current status with the provided updated status. + /// + public SyncStatus CreateUpdatedStatus(SyncStatus updatedStatus) + { + var updatedOptions = updatedStatus.Options; + var currentOptions = Options; + + var parsedOptions = new SyncStatusOptions + { + Connected = updatedOptions.Connected ?? currentOptions.Connected, + Connecting = updatedOptions.Connecting ?? currentOptions.Connecting, + LastSyncedAt = updatedOptions.LastSyncedAt ?? currentOptions.LastSyncedAt, + HasSynced = updatedOptions.HasSynced ?? currentOptions.HasSynced, + PriorityStatusEntries = updatedOptions.PriorityStatusEntries ?? currentOptions.PriorityStatusEntries, + DataFlow = updatedOptions.DataFlow ?? currentOptions.DataFlow, + }; + + return new SyncStatus(parsedOptions); + } + private string SerializeObject() { return JsonConvert.SerializeObject(new { Options, UploadErrorMessage = Options.DataFlow?.UploadError?.Message, DownloadErrorMessage = DataFlowStatus.DownloadError?.Message }); diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs index 6e2da0e..f4a6308 100644 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs @@ -42,8 +42,6 @@ public async Task InitializeAsync() await db.Init(); var connector = new NodeConnector(userId); - await ClearAllData(); - Console.WriteLine($"Using User ID: {userId}"); try { diff --git a/Tools/Setup/Setup.cs b/Tools/Setup/Setup.cs index 81fd9c3..17c5426 100644 --- a/Tools/Setup/Setup.cs +++ b/Tools/Setup/Setup.cs @@ -5,9 +5,12 @@ using System.Collections.Generic; using System.IO.Compression; +/// +/// Execute with `dotnet run --project Tools/Setup` +/// public class PowerSyncSetup { - private const string VERSION = "0.4.9"; + private const string VERSION = "0.4.10"; private const string GITHUB_BASE_URL = $"https://github.com/powersync-ja/powersync-sqlite-core/releases/download/v{VERSION}"; private const string MAVEN_BASE_URL = $"https://repo1.maven.org/maven2/com/powersync/powersync-sqlite-core/{VERSION}"; From bce30241ff8fb055e5197e1be97c92ff6a0d574c Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 8 Dec 2025 19:50:50 +0200 Subject: [PATCH 4/4] Dropping C_SHARP implementation. --- .../Sync/Bucket/BucketStorageAdapter.cs | 15 - .../Client/Sync/Bucket/SqliteBucketStorage.cs | 187 --- .../Stream/StreamingSyncImplementation.cs | 287 +---- .../Client/Sync/BucketStorageTests.cs | 1019 ----------------- 4 files changed, 2 insertions(+), 1506 deletions(-) delete mode 100644 Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs index 4611bb2..272e747 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs @@ -95,15 +95,6 @@ public class BucketStorageEvent public interface IBucketStorageAdapter : IEventStream { Task Init(); - Task SaveSyncData(SyncDataBatch batch); - Task RemoveBuckets(string[] buckets); - Task SetTargetCheckpoint(Checkpoint checkpoint); - - void StartSession(); - - Task GetBucketStates(); - - Task SyncLocalDatabase(Checkpoint checkpoint); Task NextCrudItem(); Task HasCrud(); @@ -112,12 +103,6 @@ public interface IBucketStorageAdapter : IEventStream Task HasCompletedSync(); Task UpdateLocalTarget(Func> callback); - /// - /// Exposed for tests only. - /// - Task AutoCompact(); - Task ForceCompact(); - string GetMaxOpId(); /// diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs index 78c549d..d35c1f3 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs @@ -25,9 +25,6 @@ public class SqliteBucketStorage : EventStream, IBucketStora private readonly HashSet tableNames; private string? clientId; - private static readonly int COMPACT_OPERATION_INTERVAL = 1000; - private int compactCounter = COMPACT_OPERATION_INTERVAL; - private ILogger logger; private CancellationTokenSource updateCts; @@ -95,50 +92,6 @@ public string GetMaxOpId() return MAX_OP_ID; } - public void StartSession() { } - - public async Task GetBucketStates() - { - return - await db.GetAll("SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'"); - } - - public async Task SaveSyncData(SyncDataBatch batch) - { - await db.WriteTransaction(async tx => - { - int count = 0; - foreach (var b in batch.Buckets) - { - var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - ["save", JsonConvert.SerializeObject(new { buckets = new[] { JsonConvert.DeserializeObject(b.ToJSON()) } })]); - logger.LogDebug("saveSyncData {message}", JsonConvert.SerializeObject(result)); - count += b.Data.Length; - } - compactCounter += count; - }); - } - - public async Task RemoveBuckets(string[] buckets) - { - foreach (var bucket in buckets) - { - await DeleteBucket(bucket); - } - } - - private async Task DeleteBucket(string bucket) - { - await db.WriteTransaction(async tx => - { - await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - ["delete_bucket", bucket]); - }); - - logger.LogDebug("Done deleting bucket"); - pendingBucketDeletes = true; - } - private record LastSyncedResult(string? synced_at); public async Task HasCompletedSync() { @@ -150,71 +103,6 @@ public async Task HasCompletedSync() return hasCompletedSync; } - public async Task SyncLocalDatabase(Checkpoint checkpoint) - { - var validation = await ValidateChecksums(checkpoint); - if (!validation.CheckpointValid) - { - logger.LogError("Checksums failed for {failures}", JsonConvert.SerializeObject(validation.CheckpointFailures)); - foreach (var failedBucket in validation.CheckpointFailures ?? []) - { - await DeleteBucket(failedBucket); - } - return new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = false, - CheckpointFailures = validation.CheckpointFailures - }; - } - - var bucketNames = checkpoint.Buckets.Select(b => b.Bucket).ToArray(); - await db.WriteTransaction(async tx => - { - await tx.Execute( - "UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))", - [checkpoint.LastOpId, JsonConvert.SerializeObject(bucketNames)] - ); - - if (checkpoint.WriteCheckpoint != null) - { - await tx.Execute( - "UPDATE ps_buckets SET last_op = ? WHERE name = '$local'", - [checkpoint.WriteCheckpoint] - ); - } - }); - - var valid = await UpdateObjectsFromBuckets(checkpoint); - if (!valid) - { - logger.LogDebug("Not at a consistent checkpoint - cannot update local db"); - return new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = true - }; - } - - await ForceCompact(); - - return new SyncLocalDatabaseResult - { - Ready = true, - CheckpointValid = true - }; - } - - private async Task UpdateObjectsFromBuckets(Checkpoint checkpoint) - { - return await db.WriteTransaction(async tx => - { - var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - ["sync_local", ""]); - - return result.InsertId == 1; - }); - } private record ResultResult(object result); @@ -227,75 +115,6 @@ public class ResultDetail public List? FailedBuckets { get; set; } } - public async Task ValidateChecksums( - Checkpoint checkpoint) - { - var result = await db.Get("SELECT powersync_validate_checkpoint(?) as result", - [JsonConvert.SerializeObject(checkpoint)]); - - logger.LogDebug("validateChecksums result item {message}", JsonConvert.SerializeObject(result)); - - if (result == null) return new SyncLocalDatabaseResult { CheckpointValid = false, Ready = false }; - - var resultDetail = JsonConvert.DeserializeObject(result.result.ToString() ?? "{}"); - - if (resultDetail?.Valid == true) - { - return new SyncLocalDatabaseResult { Ready = true, CheckpointValid = true }; - } - else - { - return new SyncLocalDatabaseResult - { - CheckpointValid = false, - Ready = false, - CheckpointFailures = resultDetail?.FailedBuckets?.ToArray() ?? [] - }; - } - } - - /// - /// Force a compact operation, primarily for testing purposes. - /// - public async Task ForceCompact() - { - compactCounter = COMPACT_OPERATION_INTERVAL; - pendingBucketDeletes = true; - - await AutoCompact(); - } - - public async Task AutoCompact() - { - await DeletePendingBuckets(); - await ClearRemoveOps(); - } - - private async Task DeletePendingBuckets() - { - if (!pendingBucketDeletes) return; - - await db.WriteTransaction(async tx => - { - await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES (?, ?)", - ["delete_pending_buckets", ""]); - }); - - pendingBucketDeletes = false; - } - - private async Task ClearRemoveOps() - { - if (compactCounter < COMPACT_OPERATION_INTERVAL) return; - - await db.WriteTransaction(async tx => - { - await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES (?, ?)", - ["clear_remove_ops", ""]); - }); - - compactCounter = 0; - } private record TargetOpResult(string target_op); private record SequenceResult(int seq); @@ -431,12 +250,6 @@ public async Task HasCrud() return await db.GetOptional("SELECT 1 as ignore FROM ps_crud LIMIT 1") != null; } - public async Task SetTargetCheckpoint(Checkpoint checkpoint) - { - // No Op - await Task.CompletedTask; - } - record ControlResult(string? r); public async Task Control(string op, object? payload = null) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index ec9fa07..bf45272 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -25,15 +25,7 @@ public enum SyncClientImplementation /// The Rust sync client stores sync data in a format that is slightly different than the one used /// by the old C# implementation. When adopting the RUST client on existing /// databases, the PowerSync SDK will migrate the format automatically. - RUST, - /// - /// Decodes and handles sync lines received from the sync service in C#. - /// - /// This is the legacy option. - /// - /// The explicit choice to use the C#-based sync implementation will be removed from a future version of the SDK. - /// - C_SHARP + RUST } public class AdditionalConnectionOptions(int? retryDelayMs = null, int? crudUploadThrottleMs = null) @@ -423,7 +415,7 @@ protected async Task StreamingSyncIteration(Cancel } else { - return await LegacyStreamingSyncIteration(signal, resolvedOptions); + throw new NotImplementedException("C_SHARP sync client implementation is no longer supported."); } } }); @@ -619,281 +611,6 @@ async Task HandleInstruction(Instruction instruction) return new StreamingSyncIterationResult { ImmediateRestart = hideDisconnectOnRestart }; } - protected async Task LegacyStreamingSyncIteration(CancellationToken signal, RequiredPowerSyncConnectionOptions resolvedOptions) - { - logger.LogWarning("The legacy sync client implementation is deprecated and will be removed in a future release."); - logger.LogDebug("Streaming sync iteration started"); - Options.Adapter.StartSession(); - var bucketEntries = await Options.Adapter.GetBucketStates(); - var initialBuckets = new Dictionary(); - - foreach (var entry in bucketEntries) - { - initialBuckets[entry.Bucket] = entry.OpId; - } - - var req = initialBuckets - .Select(kvp => new BucketRequest - { - Name = kvp.Key, - After = kvp.Value - }) - .ToList(); - - var targetCheckpoint = (Checkpoint?)null; - var validatedCheckpoint = (Checkpoint?)null; - var appliedCheckpoint = (Checkpoint?)null; - - var bucketSet = new HashSet(initialBuckets.Keys); - - var clientId = await Options.Adapter.GetClientId(); - - logger.LogDebug("Requesting stream from server"); - - var syncOptions = new SyncStreamOptions - { - Path = "/sync/stream", - CancellationToken = signal, - Data = new StreamingSyncRequest - { - Buckets = req, - IncludeChecksum = true, - RawData = true, - Parameters = resolvedOptions.Params, - ClientId = clientId - } - }; - - var stream = Options.Remote.PostStream(syncOptions); - var first = true; - await foreach (var line in stream) - { - if (first) - { - first = false; - logger.LogDebug("Stream established. Processing events"); - } - - if (line == null) - { - logger.LogDebug("Stream has closed while waiting"); - // The stream has closed while waiting - return new StreamingSyncIterationResult { LegacyRetry = true }; - } - - // A connection is active and messages are being received - if (!SyncStatus.Connected) - { - // There is a connection now - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true - }); - TriggerCrudUpload(); - } - - if (line is StreamingSyncCheckpoint syncCheckpoint) - { - logger.LogDebug("Sync checkpoint: {message}", syncCheckpoint); - - targetCheckpoint = syncCheckpoint.Checkpoint; - var bucketsToDelete = new HashSet(bucketSet); - var newBuckets = new HashSet(); - - foreach (var checksum in syncCheckpoint.Checkpoint.Buckets) - { - newBuckets.Add(checksum.Bucket); - bucketsToDelete.Remove(checksum.Bucket); - } - if (bucketsToDelete.Count > 0) - { - logger.LogDebug("Removing buckets: {message}", string.Join(", ", bucketsToDelete)); - } - - bucketSet = newBuckets; - await Options.Adapter.RemoveBuckets([.. bucketsToDelete]); - await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); - } - else if (line is StreamingSyncCheckpointComplete checkpointComplete) - { - logger.LogDebug("Checkpoint complete: {message}", targetCheckpoint); - - var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); - - if (!result.CheckpointValid) - { - // This means checksums failed. Start again with a new checkpoint. - await Task.Delay(50); - return new StreamingSyncIterationResult { LegacyRetry = true }; - } - else if (!result.Ready) - { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - // Landing here the whole time - } - else - { - appliedCheckpoint = targetCheckpoint; - logger.LogDebug("Validated checkpoint: {message}", appliedCheckpoint); - - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - DataFlow = new SyncDataFlowStatus - { - Downloading = false - } - }, new UpdateSyncStatusOptions - { - ClearDownloadError = true - }); - - } - - validatedCheckpoint = targetCheckpoint; - } - else if (line is StreamingSyncCheckpointDiff checkpointDiff) - { - // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint - if (targetCheckpoint == null) - { - throw new Exception("Checkpoint diff without previous checkpoint"); - } - - var diff = checkpointDiff.CheckpointDiff; - var newBuckets = new Dictionary(); - - foreach (var checksum in targetCheckpoint.Buckets) - { - newBuckets[checksum.Bucket] = checksum; - } - - foreach (var checksum in diff.UpdatedBuckets) - { - newBuckets[checksum.Bucket] = checksum; - } - - foreach (var bucket in diff.RemovedBuckets) - { - newBuckets.Remove(bucket); - } - - var newWriteCheckpoint = !string.IsNullOrEmpty(diff.WriteCheckpoint) ? diff.WriteCheckpoint : null; - var newCheckpoint = new Checkpoint - { - LastOpId = diff.LastOpId, - Buckets = [.. newBuckets.Values], - WriteCheckpoint = newWriteCheckpoint - }; - - targetCheckpoint = newCheckpoint; - - bucketSet = [.. newBuckets.Keys]; - - var bucketsToDelete = diff.RemovedBuckets.ToArray(); - if (bucketsToDelete.Length > 0) - { - logger.LogDebug("Remove buckets: {message}", string.Join(", ", bucketsToDelete)); - } - - // Perform async operations - await Options.Adapter.RemoveBuckets(bucketsToDelete); - await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); - } - else if (line is StreamingSyncDataJSON dataJSON) - { - UpdateSyncStatus(new SyncStatusOptions - { - DataFlow = new SyncDataFlowStatus - { - Downloading = true - } - }); - await Options.Adapter.SaveSyncData(new SyncDataBatch([SyncDataBucket.FromRow(dataJSON.Data)])); - } - else if (line is StreamingSyncKeepalive keepalive) - { - var remainingSeconds = keepalive.TokenExpiresIn; - if (remainingSeconds == 0) - { - // Connection would be closed automatically right after this - logger.LogDebug("Token expiring; reconnect"); - Options.Remote.InvalidateCredentials(); - - // For a rare case where the backend connector does not update the token - // (uses the same one), this should have some delay. - // - await DelayRetry(); - return new StreamingSyncIterationResult { LegacyRetry = true }; - } - else if (remainingSeconds < 30) - { - logger.LogDebug("Token will expire soon; reconnect"); - // Pre-emptively refresh the token - Options.Remote.InvalidateCredentials(); - return new StreamingSyncIterationResult { LegacyRetry = true }; - } - TriggerCrudUpload(); - } - else - { - logger.LogDebug("Sync complete"); - - if (targetCheckpoint == appliedCheckpoint) - { - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - }, - new UpdateSyncStatusOptions - { - ClearDownloadError = true - } - ); - } - else if (validatedCheckpoint == targetCheckpoint) - { - var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); - if (!result.CheckpointValid) - { - // This means checksums failed. Start again with a new checkpoint. - await Task.Delay(50); - return new StreamingSyncIterationResult { LegacyRetry = false }; - } - else if (!result.Ready) - { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - } - else - { - appliedCheckpoint = targetCheckpoint; - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - DataFlow = new SyncDataFlowStatus - { - Downloading = false, - } - }, - new UpdateSyncStatusOptions - { - ClearDownloadError = true - }); - } - } - } - } - - logger.LogDebug("Stream input empty"); - // Connection closed. Likely due to auth issue. - return new StreamingSyncIterationResult { LegacyRetry = true }; - } - public new void Close() { crudUpdateCts?.Cancel(); diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs deleted file mode 100644 index 7a2f9bf..0000000 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs +++ /dev/null @@ -1,1019 +0,0 @@ -namespace PowerSync.Common.Tests.Client.Sync; - -using System.Threading.Tasks; - -using Microsoft.Data.Sqlite; -using Microsoft.Extensions.Logging; -using Newtonsoft.Json; -using PowerSync.Common.Client; -using PowerSync.Common.Client.Sync.Bucket; -using PowerSync.Common.DB.Schema; - -class TestData -{ - public static OplogEntry putAsset1_1 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "1", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - ObjectType = "assets", - ObjectId = "O1", - Data = JsonConvert.SerializeObject(new { description = "bar" }), - Checksum = 1 - }); - - public static OplogEntry putAsset2_2 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "2", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - ObjectType = "assets", - ObjectId = "O2", - Data = JsonConvert.SerializeObject(new { description = "bar" }), - Checksum = 2 - }); - - public static OplogEntry putAsset1_3 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "3", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - ObjectType = "assets", - ObjectId = "O1", - Data = JsonConvert.SerializeObject(new { description = "bard" }), - Checksum = 3 - }); - - public static OplogEntry removeAsset1_4 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "4", - Op = new OpType(OpTypeEnum.REMOVE).ToJSON(), - ObjectType = "assets", - ObjectId = "O1", - Checksum = 4 - }); - - public static OplogEntry removeAsset1_5 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "5", - Op = new OpType(OpTypeEnum.REMOVE).ToJSON(), - ObjectType = "assets", - ObjectId = "O1", - Checksum = 5 - }); -} - -public class BucketStorageTests : IAsyncLifetime -{ - private PowerSyncDatabase db = default!; - private IBucketStorageAdapter bucketStorage = default!; - - public async Task InitializeAsync() - { - db = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = "powersync.db" }, - Schema = TestSchema.AppSchema, - }); - await db.Init(); - bucketStorage = new SqliteBucketStorage(db.Database, createLogger()); - - } - - public async Task DisposeAsync() - { - await db.DisconnectAndClear(); - await db.Close(); - bucketStorage.Close(); - } - - private record IdResult(string id); - private record DescriptionResult(string description); - private record AssetResult(string id, string description, string? make = null); - - static async Task ExpectAsset1_3(PowerSyncDatabase database) - { - var result = await database.GetAll("SELECT id, description, make FROM assets WHERE id = 'O1'"); - Assert.Equal(new AssetResult("O1", "bard", null), result[0]); - } - - static async Task ExpectNoAsset1(PowerSyncDatabase database) - { - var result = await database.GetAll("SELECT id, description, make FROM assets WHERE id = 'O1'"); - Assert.Empty(result); - } - - static async Task ExpectNoAssets(PowerSyncDatabase database) - { - var result = await database.GetAll("SELECT id, description, make FROM assets"); - Assert.Empty(result); - } - - async Task SyncLocalChecked(Checkpoint checkpoint) - { - var result = await bucketStorage.SyncLocalDatabase(checkpoint); - Assert.Equal(new SyncLocalDatabaseResult { Ready = true, CheckpointValid = true }, result); - } - - private ILogger createLogger() - { - ILoggerFactory loggerFactory = LoggerFactory.Create(builder => - { - builder.AddConsole(); // Enable console logging - builder.SetMinimumLevel(LogLevel.Debug); - }); - - return loggerFactory.CreateLogger("TestLogger"); - } - - [Fact] - public async Task BasicSetup() - { - await db.WaitForReady(); - var initialBucketStates = await bucketStorage.GetBucketStates(); - Assert.Empty(initialBucketStates); - - await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)])); - - var bucketStates = await bucketStorage.GetBucketStates(); - - Assert.Collection(bucketStates, state => - { - Assert.Equal("bucket1", state.Bucket); - Assert.Equal("3", state.OpId); - }); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldGetObjectFromMultipleBuckets() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 3 }, new BucketChecksum { Bucket = "bucket2", Checksum = 3 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldPrioritizeLaterUpdates() - { - // Test behavior when the same object is present in multiple buckets. - // In this case, there are two different versions in the different buckets. - // While we should not get this with our server implementation, the client still specifies this behavior: - // The largest op_id wins. - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_1], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 3 }, new BucketChecksum { Bucket = "bucket2", Checksum = 1 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldIgnoreRemoveFromOneBucket() - { - // When we have 1 PUT and 1 REMOVE, the object must be kept.); - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 3 }, new BucketChecksum { Bucket = "bucket2", Checksum = 7 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldRemoveWhenRemovedFromAllBuckets() - { - // When we only have REMOVE left for an object, it must be deleted. - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3, TestData.removeAsset1_5], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "5", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 8 }, new BucketChecksum { Bucket = "bucket2", Checksum = 7 }] - }); - - await ExpectNoAssets(db); - } - - [Fact] - public async Task ShouldUseSubkeys() - { - // Subkeys cause this to be treated as a separate entity in the oplog, - // but the same entity in the local database. - - var put4 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "4", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - Subkey = "b", - ObjectType = "assets", - ObjectId = "O1", - Data = JsonConvert.SerializeObject(new { description = "B" }), - Checksum = 4 - }); - - var remove5 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "5", - Op = new OpType(OpTypeEnum.REMOVE).ToJSON(), - Subkey = "b", - ObjectType = "assets", - ObjectId = "O1", - Checksum = 5 - }); - - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3, put4], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 8 }] - }); - - var result = await db.GetAll("SELECT id, description, make FROM assets WHERE id = 'O1'"); - Assert.Equal(new AssetResult("O1", "B", null), result[0]); - - await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [remove5], false)])); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "5", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 13 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldFailChecksumValidation() - { - // Simple checksum validation - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)]) - ); - - var result = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 10 }, new BucketChecksum { Bucket = "bucket2", Checksum = 1 }] - }); - - var expected = new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = false, - CheckpointFailures = ["bucket1", "bucket2"] - }; - - Assert.Equal(expected, result); - - await ExpectNoAssets(db); - } - - [Fact] - public async Task ShouldDeleteBuckets() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3], false)]) - ); - - await bucketStorage.RemoveBuckets(["bucket2"]); - // The delete only takes effect after syncLocal. - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 3 }] - }); - - // Bucket is deleted, but object is still present in other buckets. - await ExpectAsset1_3(db); - - await bucketStorage.RemoveBuckets(["bucket1"]); - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [] - }); - - // Both buckets deleted - object removed. - await ExpectNoAssets(db); - } - - [Fact] - public async Task ShouldDeleteAndRecreateBuckets() - { - // Save some data - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1], false)]) - ); - - // Delete the bucket - await bucketStorage.RemoveBuckets(["bucket1"]); - - // Save some data again - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3], false)]) - ); - - // Delete again - await bucketStorage.RemoveBuckets(["bucket1"]); - - // Final save of data - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3], false)]) - ); - - // Check that the data is there - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 4 }] - }); - - await ExpectAsset1_3(db); - - // Now final delete - await bucketStorage.RemoveBuckets(["bucket1"]); - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [] - }); - - await ExpectNoAssets(db); - } - - [Fact] - public async Task ShouldHandleMove() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", - [ - OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "1", - Op = new OpType(OpTypeEnum.MOVE).ToJSON(), - Checksum = 1 - }) - ], false) - ]) - ); - - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 4 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldHandleClear() - { - // Save some data - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "1", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 1 } - ] - }); - - // CLEAR, then save new data - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", - [ - OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "2", - Op = new OpType(OpTypeEnum.CLEAR).ToJSON(), - Checksum = 2 - }), - OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "3", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - Checksum = 3, - Data = TestData.putAsset2_2.Data, - ObjectId = TestData.putAsset2_2.ObjectId, - ObjectType = TestData.putAsset2_2.ObjectType - }) - ], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - // 2 + 3. 1 is replaced with 2. - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 5 }] - }); - - await ExpectNoAsset1(db); - - var result = await db.Get("SELECT id, description FROM assets WHERE id = 'O2'"); - - Assert.Equal(new AssetResult("O2", "bar"), result); - } - - [Fact] - public async Task UpdateWithNewTypes() - { - var dbName = "test-bucket-storage-new-types.db"; - var powersync = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = dbName }, - Schema = new Schema([]), - }); - await powersync.Init(); - bucketStorage = new SqliteBucketStorage(powersync.Database); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - // Ensure an exception is thrown due to missing table - await Assert.ThrowsAsync(async () => - await powersync.GetAll("SELECT * FROM assets")); - - await powersync.Close(); - - powersync = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = dbName }, - Schema = TestSchema.AppSchema, - }); - await powersync.Init(); - - await ExpectAsset1_3(powersync); - - await powersync.DisconnectAndClear(); - await powersync.Close(); - } - - [Fact] - public async Task ShouldRemoveTypes() - { - var dbName = "test-bucket-storage-remove-types.db"; - - // Create database with initial schema - var powersync = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = dbName }, - Schema = TestSchema.AppSchema, - }); - - await powersync.Init(); - bucketStorage = new SqliteBucketStorage(powersync.Database); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - await ExpectAsset1_3(powersync); - await powersync.Close(); - - // Now open another instance with an empty schema - powersync = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = dbName }, - Schema = new Schema([]), - }); - await powersync.Init(); - - await Assert.ThrowsAsync(async () => - await powersync.Execute("SELECT * FROM assets")); - - await powersync.Close(); - - // Reopen database with the original schema - powersync = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = dbName }, - Schema = TestSchema.AppSchema, - }); - await powersync.Init(); - - await ExpectAsset1_3(powersync); - - await powersync.DisconnectAndClear(); - await powersync.Close(); - } - - private record OplogStats(string Type, string Id, int Count); - - [Fact] - public async Task ShouldCompact() - { - // Test compacting behavior. - // This test relies heavily on internals and will have to be updated when the compact implementation is updated. - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.removeAsset1_4], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "4", - WriteCheckpoint = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 7 }] - }); - - await bucketStorage.ForceCompact(); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "4", - WriteCheckpoint = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 7 }] - }); - - var stats = await db.GetAll( - "SELECT row_type as Type, row_id as Id, count(*) as Count FROM ps_oplog GROUP BY row_type, row_id ORDER BY row_type, row_id" - ); - - var expectedStats = new List { new("assets", "O2", 1) }; - - Assert.Equal(expectedStats, stats); - } - - [Fact] - public async Task ShouldNotSyncLocalDbWithPendingCrud_ServerRemoved() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - // Local save - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O3"]); - - var insertedResult = await db.GetAll("SELECT id FROM assets WHERE id = 'O3'"); - Assert.Equal(new IdResult("O3"), insertedResult[0]); - - // At this point, we have data in the CRUD table and are not able to sync the local DB. - var result = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - var expectedResult = new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = true - }; - - Assert.Equal(expectedResult, result); - - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - // At this point, the data has been uploaded but not synced back yet. - var result3 = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - Assert.Equal(expectedResult, result3); - - // The data must still be present locally. - var stillPresentResult = await db.GetAll("SELECT id FROM assets WHERE id = 'O3'"); - Assert.Equal(new IdResult("O3"), stillPresentResult[0]); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", Array.Empty(), false) - ]) - ); - - // Now we have synced the data back (or lack of data in this case), - // so we can do a local sync. - await SyncLocalChecked(new Checkpoint - { - LastOpId = "5", - WriteCheckpoint = "5", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - // Since the object was not in the sync response, it is deleted. - var deletedResult = await db.GetAll("SELECT id FROM assets WHERE id = 'O3'"); - Assert.Empty(deletedResult); - } - - [Fact] - public async Task ShouldNotSyncLocalDbWithPendingCrud_WhenMoreCrudIsAdded_1() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - // Local save - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O3"]); - - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - var result3 = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - var expectedResult = new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = true - }; - - Assert.Equal(expectedResult, result3); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", Array.Empty(), false) - ]) - ); - - // Add more data before SyncLocalDatabase. - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O4"]); - - var result4 = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "5", - WriteCheckpoint = "5", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - Assert.Equal(expectedResult, result4); - } - - [Fact] - public async Task ShouldNotSyncLocalDbWithPendingCrud_WhenMoreCrudIsAdded_2() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - // Local save - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O3"]); - - var batch = await bucketStorage.GetCrudBatch(); - - // Add more data before calling complete() - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O4"]); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [], false) - ]) - ); - - var result4 = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "5", - WriteCheckpoint = "5", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - var expected = new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = true - }; - - Assert.Equal(expected, result4); - } - - [Fact] - public async Task ShouldNotSyncLocalDbWithPendingCrud_UpdateOnServer() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - // Local save - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O3"]); - - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", - [ - OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "5", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - ObjectType = "assets", - ObjectId = "O3", - Checksum = 5, - Data = JsonConvert.SerializeObject(new { description = "server updated" }) - }) - ], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "5", - WriteCheckpoint = "5", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 11 } - ] - }); - - var updatedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O3'"); - Assert.Equal(new DescriptionResult("server updated"), updatedResult[0]); - } - - [Fact] - public async Task ShouldRevertAFailingInsert() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - // Local insert, later rejected by server - await db.Execute("INSERT INTO assets(id, description) VALUES(?, ?)", ["O3", "inserted"]); - - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - var insertedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O3'"); - Assert.Equal(new DescriptionResult("inserted"), insertedResult[0]); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - var revertedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O3'"); - Assert.Empty(revertedResult); - } - - [Fact] - public async Task ShouldRevertAFailingDelete() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - // Local delete, later rejected by server - await db.Execute("DELETE FROM assets WHERE id = ?", ["O2"]); - - var deletedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O2'"); - Assert.Empty(deletedResult); // Ensure the record is deleted locally - - // Simulate a permissions error when uploading - data should be preserved - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - var revertedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O2'"); - Assert.Equal(new DescriptionResult("bar"), revertedResult[0]); - } - - [Fact] - public async Task ShouldRevertAFailingUpdate() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - // Local update, later rejected by server - await db.Execute("UPDATE assets SET description = ? WHERE id = ?", ["updated", "O2"]); - - var updatedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O2'"); - Assert.Equal(new DescriptionResult("updated"), updatedResult[0]); - - // Simulate a permissions error when uploading - data should be preserved - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(async () => await Task.FromResult("4")); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "4", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - var revertedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O2'"); - Assert.Equal(new DescriptionResult("bar"), revertedResult[0]); - } -} \ No newline at end of file