diff --git a/PowerSync/PowerSync.Common/CHANGELOG.md b/PowerSync/PowerSync.Common/CHANGELOG.md index 4fc0f47..5a0ad07 100644 --- a/PowerSync/PowerSync.Common/CHANGELOG.md +++ b/PowerSync/PowerSync.Common/CHANGELOG.md @@ -1,5 +1,14 @@ # PowerSync.Common Changelog +## 0.0.6-alpha.1 +- Dropping support for the legacy C# sync implementation. +- Add `trackPreviousValues` option on `TableOptions` which sets `CrudEntry.PreviousValues` to previous values on updates. +- Add `trackMetadata` option on `TableOptions` which adds a `_metadata` column that can be used for updates. The configured metadata is available through `CrudEntry.Metadata`. +- Add `ignoreEmptyUpdates` option on `TableOptions` which skips creating CRUD entries for updates that don't change any values. +- Reporting progress information about downloaded rows. Sync progress is available through `SyncStatus.DownloadProgress()`. +- Support bucket priorities. +- Report `PriorityStatusEntries` on `SyncStatus`. + ## 0.0.5-alpha.1 - Using the latest (0.4.9) version of the core extension, it introduces support for the Rust Sync implementation and also makes it the default - users can still opt out and use the legacy C# sync implementation as option when calling `connect()`. diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 9f17882..c0db402 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -80,8 +80,6 @@ public interface IPowerSyncDatabase : IEventStream public class PowerSyncDatabase : EventStream, IPowerSyncDatabase { - private static readonly int FULL_SYNC_PRIORITY = 2147483647; - public IDBAdapter Database; private Schema schema; @@ -156,9 +154,36 @@ public async Task WaitForReady() await isReadyTask; } - public async Task WaitForFirstSync(CancellationToken? cancellationToken = null) + public class PrioritySyncRequest + { + public CancellationToken? Token { get; set; } + public int? Priority { get; set; } + } + + /// + /// Wait for the first sync operation to complete. + /// + /// + /// An object providing a cancellation token and a priority target. + /// When a priority target is set, the task may complete when all buckets with the given (or higher) + /// priorities have been synchronized. This can be earlier than a complete sync. + /// + /// A task which will complete once the first full sync has completed. + public async Task WaitForFirstSync(PrioritySyncRequest? request = null) { - if (CurrentStatus.HasSynced == true) + var priority = request?.Priority ?? null; + var cancellationToken = request?.Token ?? null; + + bool StatusMatches(SyncStatus status) + { + if (priority == null) + { + return status.HasSynced == true; + } + return status.StatusForPriority(priority.Value).HasSynced == true; + } + + if (StatusMatches(CurrentStatus)) { return; } @@ -166,11 +191,11 @@ public async Task WaitForFirstSync(CancellationToken? cancellationToken = null) var tcs = new TaskCompletionSource(); var cts = new CancellationTokenSource(); - var _ = Task.Run(() => + _ = Task.Run(() => { foreach (var update in Listen(cts.Token)) { - if (update.StatusChanged?.HasSynced == true) + if (update.StatusChanged != null && StatusMatches(update.StatusChanged!)) { cts.Cancel(); tcs.SetResult(true); @@ -192,7 +217,7 @@ protected async Task Initialize() await BucketStorageAdapter.Init(); await LoadVersion(); await UpdateSchema(schema); - await UpdateHasSynced(); + await ResolveOfflineSyncStatus(); await Database.Execute("PRAGMA RECURSIVE_TRIGGERS=TRUE"); Ready = true; Emit(new PowerSyncDBEvent { Initialized = true }); @@ -216,48 +241,29 @@ private async Task LoadVersion() catch (Exception e) { throw new Exception( - $"Unsupported PowerSync extension version. Need >=0.2.0 <1.0.0, got: {sdkVersion}. Details: {e.Message}" + $"Unsupported PowerSync extension version. Need >=0.4.10 <1.0.0, got: {sdkVersion}. Details: {e.Message}" ); } - // Validate version is >= 0.2.0 and < 1.0.0 - if (versionInts[0] != 0 || versionInts[1] < 2 || versionInts[2] < 0) + // Validate version is >= 0.4.10 and < 1.0.0 + if (versionInts[0] != 0 || versionInts[1] < 4 || (versionInts[1] == 4 && versionInts[2] < 10)) { - throw new Exception($"Unsupported PowerSync extension version. Need >=0.2.0 <1.0.0, got: {sdkVersion}"); + throw new Exception($"Unsupported PowerSync extension version. Need >=0.4.10 <1.0.0, got: {sdkVersion}"); } } - private record LastSyncedResult(int? priority, string? last_synced_at); - - protected async Task UpdateHasSynced() + private record OfflineSyncStatusResult(string r); + protected async Task ResolveOfflineSyncStatus() { - var results = await Database.GetAll( - "SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC" - ); + var result = await Database.Get("SELECT powersync_offline_sync_status() as r"); + var parsed = JsonConvert.DeserializeObject(result.r); - DateTime? lastCompleteSync = null; + var parsedSyncStatus = CoreInstructionHelpers.CoreStatusToSyncStatus(parsed!); + var updatedStatus = CurrentStatus.CreateUpdatedStatus(parsedSyncStatus); - // TODO: Will be altered/extended when reporting individual sync priority statuses is supported - foreach (var result in results) + if (!updatedStatus.IsEqual(CurrentStatus)) { - var parsedDate = DateTime.Parse(result.last_synced_at + "Z"); - - if (result.priority == FULL_SYNC_PRIORITY) - { - // This lowest-possible priority represents a complete sync. - lastCompleteSync = parsedDate; - } - } - - var hasSynced = lastCompleteSync != null; - if (hasSynced != CurrentStatus.HasSynced) - { - CurrentStatus = new SyncStatus(new SyncStatusOptions(CurrentStatus.Options) - { - HasSynced = hasSynced, - LastSyncedAt = lastCompleteSync, - }); - + CurrentStatus = updatedStatus; Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus }); } } diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs index 4611bb2..272e747 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs @@ -95,15 +95,6 @@ public class BucketStorageEvent public interface IBucketStorageAdapter : IEventStream { Task Init(); - Task SaveSyncData(SyncDataBatch batch); - Task RemoveBuckets(string[] buckets); - Task SetTargetCheckpoint(Checkpoint checkpoint); - - void StartSession(); - - Task GetBucketStates(); - - Task SyncLocalDatabase(Checkpoint checkpoint); Task NextCrudItem(); Task HasCrud(); @@ -112,12 +103,6 @@ public interface IBucketStorageAdapter : IEventStream Task HasCompletedSync(); Task UpdateLocalTarget(Func> callback); - /// - /// Exposed for tests only. - /// - Task AutoCompact(); - Task ForceCompact(); - string GetMaxOpId(); /// diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs index 78c549d..d35c1f3 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs @@ -25,9 +25,6 @@ public class SqliteBucketStorage : EventStream, IBucketStora private readonly HashSet tableNames; private string? clientId; - private static readonly int COMPACT_OPERATION_INTERVAL = 1000; - private int compactCounter = COMPACT_OPERATION_INTERVAL; - private ILogger logger; private CancellationTokenSource updateCts; @@ -95,50 +92,6 @@ public string GetMaxOpId() return MAX_OP_ID; } - public void StartSession() { } - - public async Task GetBucketStates() - { - return - await db.GetAll("SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'"); - } - - public async Task SaveSyncData(SyncDataBatch batch) - { - await db.WriteTransaction(async tx => - { - int count = 0; - foreach (var b in batch.Buckets) - { - var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - ["save", JsonConvert.SerializeObject(new { buckets = new[] { JsonConvert.DeserializeObject(b.ToJSON()) } })]); - logger.LogDebug("saveSyncData {message}", JsonConvert.SerializeObject(result)); - count += b.Data.Length; - } - compactCounter += count; - }); - } - - public async Task RemoveBuckets(string[] buckets) - { - foreach (var bucket in buckets) - { - await DeleteBucket(bucket); - } - } - - private async Task DeleteBucket(string bucket) - { - await db.WriteTransaction(async tx => - { - await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - ["delete_bucket", bucket]); - }); - - logger.LogDebug("Done deleting bucket"); - pendingBucketDeletes = true; - } - private record LastSyncedResult(string? synced_at); public async Task HasCompletedSync() { @@ -150,71 +103,6 @@ public async Task HasCompletedSync() return hasCompletedSync; } - public async Task SyncLocalDatabase(Checkpoint checkpoint) - { - var validation = await ValidateChecksums(checkpoint); - if (!validation.CheckpointValid) - { - logger.LogError("Checksums failed for {failures}", JsonConvert.SerializeObject(validation.CheckpointFailures)); - foreach (var failedBucket in validation.CheckpointFailures ?? []) - { - await DeleteBucket(failedBucket); - } - return new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = false, - CheckpointFailures = validation.CheckpointFailures - }; - } - - var bucketNames = checkpoint.Buckets.Select(b => b.Bucket).ToArray(); - await db.WriteTransaction(async tx => - { - await tx.Execute( - "UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))", - [checkpoint.LastOpId, JsonConvert.SerializeObject(bucketNames)] - ); - - if (checkpoint.WriteCheckpoint != null) - { - await tx.Execute( - "UPDATE ps_buckets SET last_op = ? WHERE name = '$local'", - [checkpoint.WriteCheckpoint] - ); - } - }); - - var valid = await UpdateObjectsFromBuckets(checkpoint); - if (!valid) - { - logger.LogDebug("Not at a consistent checkpoint - cannot update local db"); - return new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = true - }; - } - - await ForceCompact(); - - return new SyncLocalDatabaseResult - { - Ready = true, - CheckpointValid = true - }; - } - - private async Task UpdateObjectsFromBuckets(Checkpoint checkpoint) - { - return await db.WriteTransaction(async tx => - { - var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - ["sync_local", ""]); - - return result.InsertId == 1; - }); - } private record ResultResult(object result); @@ -227,75 +115,6 @@ public class ResultDetail public List? FailedBuckets { get; set; } } - public async Task ValidateChecksums( - Checkpoint checkpoint) - { - var result = await db.Get("SELECT powersync_validate_checkpoint(?) as result", - [JsonConvert.SerializeObject(checkpoint)]); - - logger.LogDebug("validateChecksums result item {message}", JsonConvert.SerializeObject(result)); - - if (result == null) return new SyncLocalDatabaseResult { CheckpointValid = false, Ready = false }; - - var resultDetail = JsonConvert.DeserializeObject(result.result.ToString() ?? "{}"); - - if (resultDetail?.Valid == true) - { - return new SyncLocalDatabaseResult { Ready = true, CheckpointValid = true }; - } - else - { - return new SyncLocalDatabaseResult - { - CheckpointValid = false, - Ready = false, - CheckpointFailures = resultDetail?.FailedBuckets?.ToArray() ?? [] - }; - } - } - - /// - /// Force a compact operation, primarily for testing purposes. - /// - public async Task ForceCompact() - { - compactCounter = COMPACT_OPERATION_INTERVAL; - pendingBucketDeletes = true; - - await AutoCompact(); - } - - public async Task AutoCompact() - { - await DeletePendingBuckets(); - await ClearRemoveOps(); - } - - private async Task DeletePendingBuckets() - { - if (!pendingBucketDeletes) return; - - await db.WriteTransaction(async tx => - { - await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES (?, ?)", - ["delete_pending_buckets", ""]); - }); - - pendingBucketDeletes = false; - } - - private async Task ClearRemoveOps() - { - if (compactCounter < COMPACT_OPERATION_INTERVAL) return; - - await db.WriteTransaction(async tx => - { - await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES (?, ?)", - ["clear_remove_ops", ""]); - }); - - compactCounter = 0; - } private record TargetOpResult(string target_op); private record SequenceResult(int seq); @@ -431,12 +250,6 @@ public async Task HasCrud() return await db.GetOptional("SELECT 1 as ignore FROM ps_crud LIMIT 1") != null; } - public async Task SetTargetCheckpoint(Checkpoint checkpoint) - { - // No Op - await Task.CompletedTask; - } - record ControlResult(string? r); public async Task Control(string op, object? payload = null) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs index 7bfef98..8a31406 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs @@ -1,5 +1,6 @@ using Newtonsoft.Json.Linq; using Newtonsoft.Json; +using PowerSync.Common.DB.Crud; namespace PowerSync.Common.Client.Sync.Stream; @@ -83,6 +84,7 @@ public class CoreSyncStatus [JsonProperty("downloading")] public DownloadProgress? Downloading { get; set; } + } public class SyncPriorityStatus @@ -126,4 +128,45 @@ public class FetchCredentials : Instruction public class CloseSyncStream : Instruction { } public class FlushFileSystem : Instruction { } -public class DidCompleteSync : Instruction { } \ No newline at end of file +public class DidCompleteSync : Instruction { } + +public class CoreInstructionHelpers +{ + public static DB.Crud.SyncPriorityStatus PriorityToStatus(SyncPriorityStatus status) + { + return new DB.Crud.SyncPriorityStatus + { + Priority = status.Priority, + HasSynced = status.HasSynced ?? null, + LastSyncedAt = status?.LastSyncedAt != null ? DateTimeOffset.FromUnixTimeSeconds(status!.LastSyncedAt).DateTime : null + }; + } + + public static DB.Crud.SyncStatus CoreStatusToSyncStatus(CoreSyncStatus status) + { + return new DB.Crud.SyncStatus(CoreStatusToSyncStatusOptions(status)); + } + + public static DB.Crud.SyncStatusOptions CoreStatusToSyncStatusOptions(CoreSyncStatus status) + { + var coreCompleteSync = + status.PriorityStatus.FirstOrDefault(s => s.Priority == SyncProgress.FULL_SYNC_PRIORITY); + var completeSync = coreCompleteSync != null ? PriorityToStatus(coreCompleteSync) : null; + + return new DB.Crud.SyncStatusOptions + { + Connected = status.Connected, + Connecting = status.Connecting, + DataFlow = new DB.Crud.SyncDataFlowStatus + { + // We expose downloading as a boolean field, the core extension reports download information as a nullable + // download status. When that status is non-null, a download is in progress. + Downloading = status.Downloading != null, + DownloadProgress = status.Downloading?.Buckets + }, + LastSyncedAt = completeSync?.LastSyncedAt, + HasSynced = completeSync?.HasSynced, + PriorityStatusEntries = status.PriorityStatus.Select(PriorityToStatus).ToArray() + }; + } +} \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index 62466cc..bf45272 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -25,15 +25,7 @@ public enum SyncClientImplementation /// The Rust sync client stores sync data in a format that is slightly different than the one used /// by the old C# implementation. When adopting the RUST client on existing /// databases, the PowerSync SDK will migrate the format automatically. - RUST, - /// - /// Decodes and handles sync lines received from the sync service in C#. - /// - /// This is the legacy option. - /// - /// The explicit choice to use the C#-based sync implementation will be removed from a future version of the SDK. - /// - C_SHARP + RUST } public class AdditionalConnectionOptions(int? retryDelayMs = null, int? crudUploadThrottleMs = null) @@ -423,7 +415,7 @@ protected async Task StreamingSyncIteration(Cancel } else { - return await LegacyStreamingSyncIteration(signal, resolvedOptions); + throw new NotImplementedException("C_SHARP sync client implementation is no longer supported."); } } }); @@ -537,29 +529,7 @@ async Task HandleInstruction(Instruction instruction) } break; case UpdateSyncStatus syncStatus: - var info = syncStatus.Status; - var coreCompleteSync = - info.PriorityStatus.FirstOrDefault(s => s.Priority == SyncProgress.FULL_SYNC_PRIORITY); - var completeSync = coreCompleteSync != null ? CoreStatusToSyncStatus(coreCompleteSync) : null; - - UpdateSyncStatus(new SyncStatusOptions - { - Connected = info.Connected, - Connecting = info.Connecting, - LastSyncedAt = completeSync?.LastSyncedAt, - HasSynced = completeSync?.HasSynced, - PriorityStatusEntries = info.PriorityStatus.Select(CoreStatusToSyncStatus).ToArray(), - DataFlow = new SyncDataFlowStatus - { - Downloading = info.Downloading != null, - DownloadProgress = info.Downloading?.Buckets - } - }, - new UpdateSyncStatusOptions - { - ClearDownloadError = true, - } - ); + UpdateSyncStatus(CoreInstructionHelpers.CoreStatusToSyncStatusOptions(syncStatus.Status)); break; case EstablishSyncStream establishSyncStream: if (receivingLines != null) @@ -641,281 +611,6 @@ async Task HandleInstruction(Instruction instruction) return new StreamingSyncIterationResult { ImmediateRestart = hideDisconnectOnRestart }; } - protected async Task LegacyStreamingSyncIteration(CancellationToken signal, RequiredPowerSyncConnectionOptions resolvedOptions) - { - logger.LogWarning("The legacy sync client implementation is deprecated and will be removed in a future release."); - logger.LogDebug("Streaming sync iteration started"); - Options.Adapter.StartSession(); - var bucketEntries = await Options.Adapter.GetBucketStates(); - var initialBuckets = new Dictionary(); - - foreach (var entry in bucketEntries) - { - initialBuckets[entry.Bucket] = entry.OpId; - } - - var req = initialBuckets - .Select(kvp => new BucketRequest - { - Name = kvp.Key, - After = kvp.Value - }) - .ToList(); - - var targetCheckpoint = (Checkpoint?)null; - var validatedCheckpoint = (Checkpoint?)null; - var appliedCheckpoint = (Checkpoint?)null; - - var bucketSet = new HashSet(initialBuckets.Keys); - - var clientId = await Options.Adapter.GetClientId(); - - logger.LogDebug("Requesting stream from server"); - - var syncOptions = new SyncStreamOptions - { - Path = "/sync/stream", - CancellationToken = signal, - Data = new StreamingSyncRequest - { - Buckets = req, - IncludeChecksum = true, - RawData = true, - Parameters = resolvedOptions.Params, - ClientId = clientId - } - }; - - var stream = Options.Remote.PostStream(syncOptions); - var first = true; - await foreach (var line in stream) - { - if (first) - { - first = false; - logger.LogDebug("Stream established. Processing events"); - } - - if (line == null) - { - logger.LogDebug("Stream has closed while waiting"); - // The stream has closed while waiting - return new StreamingSyncIterationResult { LegacyRetry = true }; - } - - // A connection is active and messages are being received - if (!SyncStatus.Connected) - { - // There is a connection now - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true - }); - TriggerCrudUpload(); - } - - if (line is StreamingSyncCheckpoint syncCheckpoint) - { - logger.LogDebug("Sync checkpoint: {message}", syncCheckpoint); - - targetCheckpoint = syncCheckpoint.Checkpoint; - var bucketsToDelete = new HashSet(bucketSet); - var newBuckets = new HashSet(); - - foreach (var checksum in syncCheckpoint.Checkpoint.Buckets) - { - newBuckets.Add(checksum.Bucket); - bucketsToDelete.Remove(checksum.Bucket); - } - if (bucketsToDelete.Count > 0) - { - logger.LogDebug("Removing buckets: {message}", string.Join(", ", bucketsToDelete)); - } - - bucketSet = newBuckets; - await Options.Adapter.RemoveBuckets([.. bucketsToDelete]); - await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); - } - else if (line is StreamingSyncCheckpointComplete checkpointComplete) - { - logger.LogDebug("Checkpoint complete: {message}", targetCheckpoint); - - var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); - - if (!result.CheckpointValid) - { - // This means checksums failed. Start again with a new checkpoint. - await Task.Delay(50); - return new StreamingSyncIterationResult { LegacyRetry = true }; - } - else if (!result.Ready) - { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - // Landing here the whole time - } - else - { - appliedCheckpoint = targetCheckpoint; - logger.LogDebug("Validated checkpoint: {message}", appliedCheckpoint); - - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - DataFlow = new SyncDataFlowStatus - { - Downloading = false - } - }, new UpdateSyncStatusOptions - { - ClearDownloadError = true - }); - - } - - validatedCheckpoint = targetCheckpoint; - } - else if (line is StreamingSyncCheckpointDiff checkpointDiff) - { - // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint - if (targetCheckpoint == null) - { - throw new Exception("Checkpoint diff without previous checkpoint"); - } - - var diff = checkpointDiff.CheckpointDiff; - var newBuckets = new Dictionary(); - - foreach (var checksum in targetCheckpoint.Buckets) - { - newBuckets[checksum.Bucket] = checksum; - } - - foreach (var checksum in diff.UpdatedBuckets) - { - newBuckets[checksum.Bucket] = checksum; - } - - foreach (var bucket in diff.RemovedBuckets) - { - newBuckets.Remove(bucket); - } - - var newWriteCheckpoint = !string.IsNullOrEmpty(diff.WriteCheckpoint) ? diff.WriteCheckpoint : null; - var newCheckpoint = new Checkpoint - { - LastOpId = diff.LastOpId, - Buckets = [.. newBuckets.Values], - WriteCheckpoint = newWriteCheckpoint - }; - - targetCheckpoint = newCheckpoint; - - bucketSet = [.. newBuckets.Keys]; - - var bucketsToDelete = diff.RemovedBuckets.ToArray(); - if (bucketsToDelete.Length > 0) - { - logger.LogDebug("Remove buckets: {message}", string.Join(", ", bucketsToDelete)); - } - - // Perform async operations - await Options.Adapter.RemoveBuckets(bucketsToDelete); - await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); - } - else if (line is StreamingSyncDataJSON dataJSON) - { - UpdateSyncStatus(new SyncStatusOptions - { - DataFlow = new SyncDataFlowStatus - { - Downloading = true - } - }); - await Options.Adapter.SaveSyncData(new SyncDataBatch([SyncDataBucket.FromRow(dataJSON.Data)])); - } - else if (line is StreamingSyncKeepalive keepalive) - { - var remainingSeconds = keepalive.TokenExpiresIn; - if (remainingSeconds == 0) - { - // Connection would be closed automatically right after this - logger.LogDebug("Token expiring; reconnect"); - Options.Remote.InvalidateCredentials(); - - // For a rare case where the backend connector does not update the token - // (uses the same one), this should have some delay. - // - await DelayRetry(); - return new StreamingSyncIterationResult { LegacyRetry = true }; - } - else if (remainingSeconds < 30) - { - logger.LogDebug("Token will expire soon; reconnect"); - // Pre-emptively refresh the token - Options.Remote.InvalidateCredentials(); - return new StreamingSyncIterationResult { LegacyRetry = true }; - } - TriggerCrudUpload(); - } - else - { - logger.LogDebug("Sync complete"); - - if (targetCheckpoint == appliedCheckpoint) - { - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - }, - new UpdateSyncStatusOptions - { - ClearDownloadError = true - } - ); - } - else if (validatedCheckpoint == targetCheckpoint) - { - var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); - if (!result.CheckpointValid) - { - // This means checksums failed. Start again with a new checkpoint. - await Task.Delay(50); - return new StreamingSyncIterationResult { LegacyRetry = false }; - } - else if (!result.Ready) - { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - } - else - { - appliedCheckpoint = targetCheckpoint; - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - DataFlow = new SyncDataFlowStatus - { - Downloading = false, - } - }, - new UpdateSyncStatusOptions - { - ClearDownloadError = true - }); - } - } - } - } - - logger.LogDebug("Stream input empty"); - // Connection closed. Likely due to auth issue. - return new StreamingSyncIterationResult { LegacyRetry = true }; - } - public new void Close() { crudUpdateCts?.Cancel(); @@ -1043,10 +738,12 @@ protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptio Connected = options.Connected ?? SyncStatus.Connected, Connecting = !options.Connected.GetValueOrDefault() && (options.Connecting ?? SyncStatus.Connecting), LastSyncedAt = options.LastSyncedAt ?? SyncStatus.LastSyncedAt, + PriorityStatusEntries = options.PriorityStatusEntries ?? SyncStatus.PriorityStatusEntries, DataFlow = new SyncDataFlowStatus { Uploading = options.DataFlow?.Uploading ?? SyncStatus.DataFlowStatus.Uploading, Downloading = options.DataFlow?.Downloading ?? SyncStatus.DataFlowStatus.Downloading, + DownloadProgress = options.DataFlow?.DownloadProgress ?? SyncStatus.DataFlowStatus.DownloadProgress, DownloadError = updateOptions?.ClearDownloadError == true ? null : options.DataFlow?.DownloadError ?? SyncStatus.DataFlowStatus.DownloadError, UploadError = updateOptions?.ClearUploadError == true ? null : options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError, } @@ -1069,16 +766,6 @@ protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptio } } - private static DB.Crud.SyncPriorityStatus CoreStatusToSyncStatus(SyncPriorityStatus status) - { - return new DB.Crud.SyncPriorityStatus - { - Priority = status.Priority, - HasSynced = status.HasSynced ?? null, - LastSyncedAt = status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null - }; - } - private async Task DelayRetry() { if (Options.RetryDelayMs.HasValue) @@ -1088,6 +775,7 @@ private async Task DelayRetry() } } + enum LockType { CRUD, diff --git a/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs b/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs index 5fe8f02..6807a92 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/CrudEntry.cs @@ -32,20 +32,8 @@ public class CrudEntryDataJSON [JsonProperty("data")] public Dictionary? Data { get; set; } - [JsonProperty("op")] - public UpdateType Op { get; set; } - - [JsonProperty("type")] - public string Type { get; set; } = null!; - - [JsonProperty("id")] - public string Id { get; set; } = null!; -} - -public class CrudEntryOutputJSON -{ - [JsonProperty("op_id")] - public int OpId { get; set; } + [JsonProperty("old")] + public Dictionary? Old { get; set; } [JsonProperty("op")] public UpdateType Op { get; set; } @@ -56,22 +44,42 @@ public class CrudEntryOutputJSON [JsonProperty("id")] public string Id { get; set; } = null!; - [JsonProperty("tx_id")] - public long? TransactionId { get; set; } - - [JsonProperty("data")] - public Dictionary? Data { get; set; } + [JsonProperty("metadata")] + public string? Metadata { get; set; } } -public class CrudEntry(int clientId, UpdateType op, string table, string id, long? transactionId = null, Dictionary? opData = null) +public class CrudEntry( + int clientId, + UpdateType op, + string table, + string id, + long? transactionId = null, + Dictionary? opData = null, + Dictionary? previousValues = null, + string? metadata = null +) { public int ClientId { get; private set; } = clientId; public string Id { get; private set; } = id; public UpdateType Op { get; private set; } = op; + public Dictionary? OpData { get; private set; } = opData; public string Table { get; private set; } = table; public long? TransactionId { get; private set; } = transactionId; + /// + /// Previous values before this change. + /// + public Dictionary? PreviousValues { get; private set; } = previousValues; + + /// + /// Client-side metadata attached with this write. + /// + /// This field is only available when the `trackMetadata` option was set to `true` when creating a table + /// and the insert or update statement set the `_metadata` column. + /// + public string? Metadata { get; private set; } = metadata; + public static CrudEntry FromRow(CrudEntryJSON dbRow) { var data = JsonConvert.DeserializeObject(dbRow.Data) @@ -83,7 +91,9 @@ public static CrudEntry FromRow(CrudEntryJSON dbRow) data.Type, data.Id, dbRow.TransactionId, - data.Data + data.Data, + data.Old, + data.Metadata ); } diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs index b709a2c..ac349ac 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs @@ -68,7 +68,7 @@ public class ProgressWithOperations public int TotalOperations { get; set; } /// - /// The numnber of operations that have already been downloaded. + /// The number of operations that have already been downloaded. /// public int DownloadedOperations { get; set; } diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs index cdda36a..43da269 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs @@ -100,7 +100,7 @@ public class SyncStatus(SyncStatusOptions options) /// public SyncPriorityStatus[] PriorityStatusEntries => (Options.PriorityStatusEntries ?? []) - .OrderBy(entry => entry.Priority) + .OrderBy(x => x, Comparer.Create(ComparePriorities)) .ToArray(); /// @@ -152,9 +152,30 @@ public SyncPriorityStatus StatusForPriority(int priority) }; } + /// + /// Creates an updated SyncStatus by merging the current status with the provided updated status. + /// + public SyncStatus CreateUpdatedStatus(SyncStatus updatedStatus) + { + var updatedOptions = updatedStatus.Options; + var currentOptions = Options; + + var parsedOptions = new SyncStatusOptions + { + Connected = updatedOptions.Connected ?? currentOptions.Connected, + Connecting = updatedOptions.Connecting ?? currentOptions.Connecting, + LastSyncedAt = updatedOptions.LastSyncedAt ?? currentOptions.LastSyncedAt, + HasSynced = updatedOptions.HasSynced ?? currentOptions.HasSynced, + PriorityStatusEntries = updatedOptions.PriorityStatusEntries ?? currentOptions.PriorityStatusEntries, + DataFlow = updatedOptions.DataFlow ?? currentOptions.DataFlow, + }; + + return new SyncStatus(parsedOptions); + } + private string SerializeObject() { - return JsonConvert.SerializeObject(new { Options = Options, UploadErrorMessage = Options.DataFlow?.UploadError?.Message, DownloadErrorMessage = DataFlowStatus.DownloadError?.Message }); + return JsonConvert.SerializeObject(new { Options, UploadErrorMessage = Options.DataFlow?.UploadError?.Message, DownloadErrorMessage = DataFlowStatus.DownloadError?.Message }); } public bool IsEqual(SyncStatus status) @@ -173,4 +194,9 @@ public string ToJSON() { return SerializeObject(); } + + private static int ComparePriorities(SyncPriorityStatus a, SyncPriorityStatus b) + { + return b.Priority - a.Priority; // Reverse because higher priorities have lower numbers + } } \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs index b0c906b..a31ee49 100644 --- a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs +++ b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs @@ -30,13 +30,13 @@ public class QueryRows public interface IDBGetUtils { // Execute a read-only query and return results. - Task GetAll(string sql, params object[]? parameters); + Task GetAll(string sql, object[]? parameters = null); // Execute a read-only query and return the first result, or null if the ResultSet is empty. - Task GetOptional(string sql, params object[]? parameters); + Task GetOptional(string sql, object[]? parameters = null); // Execute a read-only query and return the first result, error if the ResultSet is empty. - Task Get(string sql, params object[]? parameters); + Task Get(string sql, object[]? parameters = null); } public interface ILockContext : IDBGetUtils diff --git a/PowerSync/PowerSync.Common/DB/Schema/Table.cs b/PowerSync/PowerSync.Common/DB/Schema/Table.cs index cd9d2b1..d0c56c7 100644 --- a/PowerSync/PowerSync.Common/DB/Schema/Table.cs +++ b/PowerSync/PowerSync.Common/DB/Schema/Table.cs @@ -7,7 +7,11 @@ public class TableOptions( Dictionary>? indexes = null, bool? localOnly = null, bool? insertOnly = null, - string? viewName = null) + string? viewName = null, + bool? trackMetadata = null, + TrackPreviousOptions? trackPreviousValues = null, + bool? ignoreEmptyUpdates = null +) { public Dictionary> Indexes { get; set; } = indexes ?? []; @@ -15,7 +19,44 @@ public class TableOptions( public bool InsertOnly { get; set; } = insertOnly ?? false; - public string? ViewName { get; set; } = viewName; + public string? ViewName { get; } = viewName; + + /// + /// Whether to add a hidden `_metadata` column that will be enabled for updates to attach custom + /// information about writes that will be reported through [CrudEntry.metadata]. + /// + public bool TrackMetadata { get; set; } = trackMetadata ?? false; + + /// + /// When set to a non-null value, track old values of columns + /// + public TrackPreviousOptions? TrackPreviousValues { get; set; } = trackPreviousValues ?? null; + + /// + /// Whether an `UPDATE` statement that doesn't change any values should be ignored when creating + /// CRUD entries. + /// + public bool IgnoreEmptyUpdates { get; set; } = ignoreEmptyUpdates ?? false; +} + +/// +/// Whether to include previous column values when PowerSync tracks local changes. +/// Including old values may be helpful for some backend connector implementations, +/// which is why it can be enabled on a per-table or per-column basis. +/// +public class TrackPreviousOptions +{ + /// + /// When defined, a list of column names for which old values should be tracked. + /// + [JsonProperty("columns")] + public List? Columns { get; set; } + + /// + /// When enabled, only include values that have actually been changed by an update. + /// + [JsonProperty("onlyWhenChanged")] + public bool? OnlyWhenChanged { get; set; } } public class Table @@ -35,16 +76,21 @@ public Table(Dictionary columns, TableOptions? options = nul { ConvertedColumns = [.. columns.Select(kv => new Column(new ColumnOptions(kv.Key, kv.Value)))]; - ConvertedIndexes = [.. (Options?.Indexes ?? []) + ConvertedIndexes = + [ + .. (Options?.Indexes ?? []) .Select(kv => new Index(new IndexOptions( kv.Key, - [.. kv.Value.Select(name => - new IndexedColumn(new IndexColumnOptions( - name.Replace("-", ""), !name.StartsWith("-"))) - )] + [ + .. kv.Value.Select(name => + new IndexedColumn(new IndexColumnOptions( + name.Replace("-", ""), !name.StartsWith("-"))) + ) + ] )) - )]; + ) + ]; Options = options ?? new TableOptions(); @@ -61,7 +107,18 @@ public void Validate() if (Columns.Count > Column.MAX_AMOUNT_OF_COLUMNS) { - throw new Exception($"Table has too many columns. The maximum number of columns is {Column.MAX_AMOUNT_OF_COLUMNS}."); + throw new Exception( + $"Table has too many columns. The maximum number of columns is {Column.MAX_AMOUNT_OF_COLUMNS}."); + } + + if (Options.TrackMetadata && Options.LocalOnly) + { + throw new Exception("Can't include metadata for local-only tables."); + } + + if (Options.TrackPreviousValues != null && Options.LocalOnly) + { + throw new Exception("Can't include old values for local-only tables."); } var columnNames = new HashSet { "id" }; @@ -103,15 +160,27 @@ public void Validate() public string ToJSON(string Name = "") { + var trackPrevious = Options.TrackPreviousValues; + var jsonObject = new { view_name = Options.ViewName ?? Name, local_only = Options.LocalOnly, insert_only = Options.InsertOnly, columns = ConvertedColumns.Select(c => JsonConvert.DeserializeObject(c.ToJSON())).ToList(), - indexes = ConvertedIndexes.Select(e => JsonConvert.DeserializeObject(e.ToJSON(this))).ToList() + indexes = ConvertedIndexes.Select(e => JsonConvert.DeserializeObject(e.ToJSON(this))).ToList(), + + include_metadata = Options.TrackMetadata, + ignore_empty_update = Options.IgnoreEmptyUpdates, + include_old = (object)(trackPrevious switch + { + null => false, + { Columns: null } => true, + { Columns: var cols } => cols + }), + include_old_only_when_changed = trackPrevious?.OnlyWhenChanged ?? false }; return JsonConvert.SerializeObject(jsonObject); } -} +} \ No newline at end of file diff --git a/PowerSync/PowerSync.Maui/CHANGELOG.md b/PowerSync/PowerSync.Maui/CHANGELOG.md index ee6b964..20aa5b1 100644 --- a/PowerSync/PowerSync.Maui/CHANGELOG.md +++ b/PowerSync/PowerSync.Maui/CHANGELOG.md @@ -1,5 +1,8 @@ # PowerSync.Maui Changelog +## 0.0.4-alpha.1 +- Upstream PowerSync.Common version bump (See Powersync.Common changelog for more information) + ## 0.0.3-alpha.1 - Upstream PowerSync.Common version bump - Using the latest (0.4.9) version of the core extension, it introduces support for the Rust Sync implementation and also makes it the default - users can still opt out and use the legacy C# sync implementation as option when calling `connect()`. diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeClient.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeClient.cs index eab9a3e..9ec8b1d 100644 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeClient.cs +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeClient.cs @@ -32,6 +32,7 @@ public Task CreateList(string id, string name) { return CreateItem("lists", id, name); } + async Task CreateItem(string table, string id, string name) { var data = new Dictionary @@ -74,6 +75,55 @@ public Task DeleteList(string id) return DeleteItem("lists", id); } + public Task CreateTodo(string id, string listId, string description) + { + return CreateTodoItem("todos", id, listId, description); + } + + async Task CreateTodoItem(string table, string id, string listId, string description) + { + var data = new Dictionary + { + { "created_at", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss") }, + { "description", description }, + { "list_id", listId }, + { "created_by", _userId }, + { "completed", 0 }, + }; + + var batch = new[] + { + new + { + op = UpdateType.PUT.ToString(), + table = table, + id = id, + data = data + } + }; + + var payload = JsonSerializer.Serialize(new { batch }); + var content = new StringContent(payload, Encoding.UTF8, "application/json"); + + HttpResponseMessage response = await _httpClient.PostAsync($"{_backendUrl}/api/data", content); + + if (!response.IsSuccessStatusCode) + { + Console.WriteLine(await response.Content.ReadAsStringAsync()); + throw new Exception( + $"Failed to create todo. Status: {response.StatusCode}, " + + $"Response: {await response.Content.ReadAsStringAsync()}" + ); + } + + return await response.Content.ReadAsStringAsync(); + } + + public Task DeleteTodo(string id) + { + return DeleteItem("todos", id); + } + async Task DeleteItem(string table, string id) { var batch = new[] diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs index 6e2da0e..ee5e03d 100644 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs @@ -1,8 +1,4 @@ -using Newtonsoft.Json; using PowerSync.Common.Client; -using System.Data.Common; -using System.Diagnostics; -using System.Threading.Tasks; using Microsoft.Extensions.Logging; using PowerSync.Common.Client.Sync.Stream; @@ -14,6 +10,8 @@ public class SyncIntegrationTests : IAsyncLifetime { private record ListResult(string id, string name, string owner_id, string created_at); + private record TodoResult(string id, string list_id, string content, string owner_id, string created_at); + private string userId = Uuid(); private NodeClient nodeClient = default!; @@ -42,8 +40,6 @@ public async Task InitializeAsync() await db.Init(); var connector = new NodeConnector(userId); - await ClearAllData(); - Console.WriteLine($"Using User ID: {userId}"); try { @@ -220,8 +216,67 @@ await db.Execute("insert into lists (id, name, owner_id, created_at) values (uui await backendInsertWatch.Task; } + + /// + /// Helper that requires manual setup of the data to verify that download progress updates are working. + /// Ensure backend has 5000+ entries, then run this test to see progress updates in the console. + /// + // [IntegrationFact(Timeout = 10000)] + // public async Task InitialSyncDownloadProgressTest() + // { + // ILoggerFactory loggerFactory = LoggerFactory.Create(builder => + // { + // builder.AddConsole(); + // builder.SetMinimumLevel(LogLevel.Information); + // }); + + // var logger = loggerFactory.CreateLogger("PowerSyncLogger"); + + // nodeClient = new NodeClient(userId); + // db = new PowerSyncDatabase(new PowerSyncDatabaseOptions + // { + // Database = new SQLOpenOptions { DbFilename = "powersync-sync-progress-tests.db" }, + // Schema = TestSchema.PowerSyncSchema, + // Logger = logger + + // }); + // await db.Init(); + // await db.DisconnectAndClear(); + + + // var clearListener = db.RunListener((update) => + // { + // if (update.StatusChanged != null) + // { + // try + // { + // Console.WriteLine("Total: " + update.StatusChanged.DownloadProgress()?.TotalOperations + " Downloaded: " + update.StatusChanged.DownloadProgress()?.DownloadedOperations); + // Console.WriteLine("Synced: " + Math.Round((decimal)((update.StatusChanged.DownloadProgress()?.DownloadedFraction ?? 0) * 100)) + "%"); + + // } + // catch (Exception ex) + // { + // Console.WriteLine("Exception reading DownloadProgress: " + ex); + // } + // } + // }); + + // var connector = new NodeConnector(userId); + // await db.Connect(connector); + // await db.WaitForFirstSync(); + + + // clearListener.Dispose(); + // await db.DisconnectAndClear(); + // await db.Close(); + // } + private async Task ClearAllData() { + if (db.Closed) + { + return; + } // Inefficient but simple way to clear all data, avoiding payload limitations var results = await db.GetAll("select * from lists"); foreach (var item in results) diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs deleted file mode 100644 index 7a2f9bf..0000000 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs +++ /dev/null @@ -1,1019 +0,0 @@ -namespace PowerSync.Common.Tests.Client.Sync; - -using System.Threading.Tasks; - -using Microsoft.Data.Sqlite; -using Microsoft.Extensions.Logging; -using Newtonsoft.Json; -using PowerSync.Common.Client; -using PowerSync.Common.Client.Sync.Bucket; -using PowerSync.Common.DB.Schema; - -class TestData -{ - public static OplogEntry putAsset1_1 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "1", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - ObjectType = "assets", - ObjectId = "O1", - Data = JsonConvert.SerializeObject(new { description = "bar" }), - Checksum = 1 - }); - - public static OplogEntry putAsset2_2 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "2", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - ObjectType = "assets", - ObjectId = "O2", - Data = JsonConvert.SerializeObject(new { description = "bar" }), - Checksum = 2 - }); - - public static OplogEntry putAsset1_3 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "3", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - ObjectType = "assets", - ObjectId = "O1", - Data = JsonConvert.SerializeObject(new { description = "bard" }), - Checksum = 3 - }); - - public static OplogEntry removeAsset1_4 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "4", - Op = new OpType(OpTypeEnum.REMOVE).ToJSON(), - ObjectType = "assets", - ObjectId = "O1", - Checksum = 4 - }); - - public static OplogEntry removeAsset1_5 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "5", - Op = new OpType(OpTypeEnum.REMOVE).ToJSON(), - ObjectType = "assets", - ObjectId = "O1", - Checksum = 5 - }); -} - -public class BucketStorageTests : IAsyncLifetime -{ - private PowerSyncDatabase db = default!; - private IBucketStorageAdapter bucketStorage = default!; - - public async Task InitializeAsync() - { - db = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = "powersync.db" }, - Schema = TestSchema.AppSchema, - }); - await db.Init(); - bucketStorage = new SqliteBucketStorage(db.Database, createLogger()); - - } - - public async Task DisposeAsync() - { - await db.DisconnectAndClear(); - await db.Close(); - bucketStorage.Close(); - } - - private record IdResult(string id); - private record DescriptionResult(string description); - private record AssetResult(string id, string description, string? make = null); - - static async Task ExpectAsset1_3(PowerSyncDatabase database) - { - var result = await database.GetAll("SELECT id, description, make FROM assets WHERE id = 'O1'"); - Assert.Equal(new AssetResult("O1", "bard", null), result[0]); - } - - static async Task ExpectNoAsset1(PowerSyncDatabase database) - { - var result = await database.GetAll("SELECT id, description, make FROM assets WHERE id = 'O1'"); - Assert.Empty(result); - } - - static async Task ExpectNoAssets(PowerSyncDatabase database) - { - var result = await database.GetAll("SELECT id, description, make FROM assets"); - Assert.Empty(result); - } - - async Task SyncLocalChecked(Checkpoint checkpoint) - { - var result = await bucketStorage.SyncLocalDatabase(checkpoint); - Assert.Equal(new SyncLocalDatabaseResult { Ready = true, CheckpointValid = true }, result); - } - - private ILogger createLogger() - { - ILoggerFactory loggerFactory = LoggerFactory.Create(builder => - { - builder.AddConsole(); // Enable console logging - builder.SetMinimumLevel(LogLevel.Debug); - }); - - return loggerFactory.CreateLogger("TestLogger"); - } - - [Fact] - public async Task BasicSetup() - { - await db.WaitForReady(); - var initialBucketStates = await bucketStorage.GetBucketStates(); - Assert.Empty(initialBucketStates); - - await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)])); - - var bucketStates = await bucketStorage.GetBucketStates(); - - Assert.Collection(bucketStates, state => - { - Assert.Equal("bucket1", state.Bucket); - Assert.Equal("3", state.OpId); - }); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldGetObjectFromMultipleBuckets() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 3 }, new BucketChecksum { Bucket = "bucket2", Checksum = 3 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldPrioritizeLaterUpdates() - { - // Test behavior when the same object is present in multiple buckets. - // In this case, there are two different versions in the different buckets. - // While we should not get this with our server implementation, the client still specifies this behavior: - // The largest op_id wins. - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_1], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 3 }, new BucketChecksum { Bucket = "bucket2", Checksum = 1 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldIgnoreRemoveFromOneBucket() - { - // When we have 1 PUT and 1 REMOVE, the object must be kept.); - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 3 }, new BucketChecksum { Bucket = "bucket2", Checksum = 7 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldRemoveWhenRemovedFromAllBuckets() - { - // When we only have REMOVE left for an object, it must be deleted. - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3, TestData.removeAsset1_5], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "5", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 8 }, new BucketChecksum { Bucket = "bucket2", Checksum = 7 }] - }); - - await ExpectNoAssets(db); - } - - [Fact] - public async Task ShouldUseSubkeys() - { - // Subkeys cause this to be treated as a separate entity in the oplog, - // but the same entity in the local database. - - var put4 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "4", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - Subkey = "b", - ObjectType = "assets", - ObjectId = "O1", - Data = JsonConvert.SerializeObject(new { description = "B" }), - Checksum = 4 - }); - - var remove5 = OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "5", - Op = new OpType(OpTypeEnum.REMOVE).ToJSON(), - Subkey = "b", - ObjectType = "assets", - ObjectId = "O1", - Checksum = 5 - }); - - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3, put4], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 8 }] - }); - - var result = await db.GetAll("SELECT id, description, make FROM assets WHERE id = 'O1'"); - Assert.Equal(new AssetResult("O1", "B", null), result[0]); - - await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [remove5], false)])); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "5", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 13 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldFailChecksumValidation() - { - // Simple checksum validation - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)]) - ); - - var result = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 10 }, new BucketChecksum { Bucket = "bucket2", Checksum = 1 }] - }); - - var expected = new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = false, - CheckpointFailures = ["bucket1", "bucket2"] - }; - - Assert.Equal(expected, result); - - await ExpectNoAssets(db); - } - - [Fact] - public async Task ShouldDeleteBuckets() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3], false)]) - ); - - await bucketStorage.RemoveBuckets(["bucket2"]); - // The delete only takes effect after syncLocal. - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 3 }] - }); - - // Bucket is deleted, but object is still present in other buckets. - await ExpectAsset1_3(db); - - await bucketStorage.RemoveBuckets(["bucket1"]); - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [] - }); - - // Both buckets deleted - object removed. - await ExpectNoAssets(db); - } - - [Fact] - public async Task ShouldDeleteAndRecreateBuckets() - { - // Save some data - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1], false)]) - ); - - // Delete the bucket - await bucketStorage.RemoveBuckets(["bucket1"]); - - // Save some data again - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3], false)]) - ); - - // Delete again - await bucketStorage.RemoveBuckets(["bucket1"]); - - // Final save of data - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3], false)]) - ); - - // Check that the data is there - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 4 }] - }); - - await ExpectAsset1_3(db); - - // Now final delete - await bucketStorage.RemoveBuckets(["bucket1"]); - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [] - }); - - await ExpectNoAssets(db); - } - - [Fact] - public async Task ShouldHandleMove() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", - [ - OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "1", - Op = new OpType(OpTypeEnum.MOVE).ToJSON(), - Checksum = 1 - }) - ], false) - ]) - ); - - await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 4 }] - }); - - await ExpectAsset1_3(db); - } - - [Fact] - public async Task ShouldHandleClear() - { - // Save some data - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "1", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 1 } - ] - }); - - // CLEAR, then save new data - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", - [ - OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "2", - Op = new OpType(OpTypeEnum.CLEAR).ToJSON(), - Checksum = 2 - }), - OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "3", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - Checksum = 3, - Data = TestData.putAsset2_2.Data, - ObjectId = TestData.putAsset2_2.ObjectId, - ObjectType = TestData.putAsset2_2.ObjectType - }) - ], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - // 2 + 3. 1 is replaced with 2. - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 5 }] - }); - - await ExpectNoAsset1(db); - - var result = await db.Get("SELECT id, description FROM assets WHERE id = 'O2'"); - - Assert.Equal(new AssetResult("O2", "bar"), result); - } - - [Fact] - public async Task UpdateWithNewTypes() - { - var dbName = "test-bucket-storage-new-types.db"; - var powersync = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = dbName }, - Schema = new Schema([]), - }); - await powersync.Init(); - bucketStorage = new SqliteBucketStorage(powersync.Database); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - // Ensure an exception is thrown due to missing table - await Assert.ThrowsAsync(async () => - await powersync.GetAll("SELECT * FROM assets")); - - await powersync.Close(); - - powersync = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = dbName }, - Schema = TestSchema.AppSchema, - }); - await powersync.Init(); - - await ExpectAsset1_3(powersync); - - await powersync.DisconnectAndClear(); - await powersync.Close(); - } - - [Fact] - public async Task ShouldRemoveTypes() - { - var dbName = "test-bucket-storage-remove-types.db"; - - // Create database with initial schema - var powersync = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = dbName }, - Schema = TestSchema.AppSchema, - }); - - await powersync.Init(); - bucketStorage = new SqliteBucketStorage(powersync.Database); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - await ExpectAsset1_3(powersync); - await powersync.Close(); - - // Now open another instance with an empty schema - powersync = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = dbName }, - Schema = new Schema([]), - }); - await powersync.Init(); - - await Assert.ThrowsAsync(async () => - await powersync.Execute("SELECT * FROM assets")); - - await powersync.Close(); - - // Reopen database with the original schema - powersync = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = dbName }, - Schema = TestSchema.AppSchema, - }); - await powersync.Init(); - - await ExpectAsset1_3(powersync); - - await powersync.DisconnectAndClear(); - await powersync.Close(); - } - - private record OplogStats(string Type, string Id, int Count); - - [Fact] - public async Task ShouldCompact() - { - // Test compacting behavior. - // This test relies heavily on internals and will have to be updated when the compact implementation is updated. - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.removeAsset1_4], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "4", - WriteCheckpoint = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 7 }] - }); - - await bucketStorage.ForceCompact(); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "4", - WriteCheckpoint = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 7 }] - }); - - var stats = await db.GetAll( - "SELECT row_type as Type, row_id as Id, count(*) as Count FROM ps_oplog GROUP BY row_type, row_id ORDER BY row_type, row_id" - ); - - var expectedStats = new List { new("assets", "O2", 1) }; - - Assert.Equal(expectedStats, stats); - } - - [Fact] - public async Task ShouldNotSyncLocalDbWithPendingCrud_ServerRemoved() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - // Local save - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O3"]); - - var insertedResult = await db.GetAll("SELECT id FROM assets WHERE id = 'O3'"); - Assert.Equal(new IdResult("O3"), insertedResult[0]); - - // At this point, we have data in the CRUD table and are not able to sync the local DB. - var result = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - var expectedResult = new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = true - }; - - Assert.Equal(expectedResult, result); - - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - // At this point, the data has been uploaded but not synced back yet. - var result3 = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - Assert.Equal(expectedResult, result3); - - // The data must still be present locally. - var stillPresentResult = await db.GetAll("SELECT id FROM assets WHERE id = 'O3'"); - Assert.Equal(new IdResult("O3"), stillPresentResult[0]); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", Array.Empty(), false) - ]) - ); - - // Now we have synced the data back (or lack of data in this case), - // so we can do a local sync. - await SyncLocalChecked(new Checkpoint - { - LastOpId = "5", - WriteCheckpoint = "5", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - // Since the object was not in the sync response, it is deleted. - var deletedResult = await db.GetAll("SELECT id FROM assets WHERE id = 'O3'"); - Assert.Empty(deletedResult); - } - - [Fact] - public async Task ShouldNotSyncLocalDbWithPendingCrud_WhenMoreCrudIsAdded_1() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - // Local save - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O3"]); - - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - var result3 = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - var expectedResult = new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = true - }; - - Assert.Equal(expectedResult, result3); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", Array.Empty(), false) - ]) - ); - - // Add more data before SyncLocalDatabase. - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O4"]); - - var result4 = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "5", - WriteCheckpoint = "5", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - Assert.Equal(expectedResult, result4); - } - - [Fact] - public async Task ShouldNotSyncLocalDbWithPendingCrud_WhenMoreCrudIsAdded_2() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - // Local save - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O3"]); - - var batch = await bucketStorage.GetCrudBatch(); - - // Add more data before calling complete() - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O4"]); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [], false) - ]) - ); - - var result4 = await bucketStorage.SyncLocalDatabase(new Checkpoint - { - LastOpId = "5", - WriteCheckpoint = "5", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - var expected = new SyncLocalDatabaseResult - { - Ready = false, - CheckpointValid = true - }; - - Assert.Equal(expected, result4); - } - - [Fact] - public async Task ShouldNotSyncLocalDbWithPendingCrud_UpdateOnServer() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - // Local save - await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O3"]); - - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", - [ - OplogEntry.FromRow(new OplogEntryJSON - { - OpId = "5", - Op = new OpType(OpTypeEnum.PUT).ToJSON(), - ObjectType = "assets", - ObjectId = "O3", - Checksum = 5, - Data = JsonConvert.SerializeObject(new { description = "server updated" }) - }) - ], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "5", - WriteCheckpoint = "5", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 11 } - ] - }); - - var updatedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O3'"); - Assert.Equal(new DescriptionResult("server updated"), updatedResult[0]); - } - - [Fact] - public async Task ShouldRevertAFailingInsert() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - // Local insert, later rejected by server - await db.Execute("INSERT INTO assets(id, description) VALUES(?, ?)", ["O3", "inserted"]); - - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - var insertedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O3'"); - Assert.Equal(new DescriptionResult("inserted"), insertedResult[0]); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - var revertedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O3'"); - Assert.Empty(revertedResult); - } - - [Fact] - public async Task ShouldRevertAFailingDelete() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - // Local delete, later rejected by server - await db.Execute("DELETE FROM assets WHERE id = ?", ["O2"]); - - var deletedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O2'"); - Assert.Empty(deletedResult); // Ensure the record is deleted locally - - // Simulate a permissions error when uploading - data should be preserved - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(() => Task.FromResult("4")); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "4", - Buckets = [new BucketChecksum { Bucket = "bucket1", Checksum = 6 }] - }); - - var revertedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O2'"); - Assert.Equal(new DescriptionResult("bar"), revertedResult[0]); - } - - [Fact] - public async Task ShouldRevertAFailingUpdate() - { - await bucketStorage.SaveSyncData( - new SyncDataBatch( - [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) - ]) - ); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "3", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - // Local update, later rejected by server - await db.Execute("UPDATE assets SET description = ? WHERE id = ?", ["updated", "O2"]); - - var updatedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O2'"); - Assert.Equal(new DescriptionResult("updated"), updatedResult[0]); - - // Simulate a permissions error when uploading - data should be preserved - var batch = await bucketStorage.GetCrudBatch(); - if (batch != null) - { - await batch.Complete(""); - } - - await bucketStorage.UpdateLocalTarget(async () => await Task.FromResult("4")); - - await SyncLocalChecked(new Checkpoint - { - LastOpId = "3", - WriteCheckpoint = "4", - Buckets = - [ - new BucketChecksum { Bucket = "bucket1", Checksum = 6 } - ] - }); - - var revertedResult = await db.GetAll("SELECT description FROM assets WHERE id = 'O2'"); - Assert.Equal(new DescriptionResult("bar"), revertedResult[0]); - } -} \ No newline at end of file diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs index c3ce177..f82b2da 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs @@ -9,6 +9,10 @@ namespace PowerSync.Common.Tests.Client.Sync; using PowerSync.Common.DB.Schema; using PowerSync.Common.Tests.Utils; + +/// +/// dotnet test -v n --framework net8.0 --filter "CRUDTests" +/// public class CRUDTests : IAsyncLifetime { private PowerSyncDatabase db = default!; @@ -32,6 +36,127 @@ public async Task DisposeAsync() DatabaseUtils.CleanDb(dbName); } + private async Task ResetDB(PowerSyncDatabase db) + { + await db.DisconnectAndClear(); + DatabaseUtils.CleanDb(db.Database.Name); + } + + [Fact] + public async Task IncludeMetadataTest() + { + var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions + { + Database = new SQLOpenOptions { DbFilename = "IncludeMetadataTest.db" }, + Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions + { + TrackMetadata = true + }), + }); + await ResetDB(db); + + await db.Execute("INSERT INTO assets (id, description, _metadata) VALUES(uuid(), 'xxxx', 'so meta');"); + + var batch = await db.GetNextCrudTransaction(); + Assert.Equal("so meta", batch?.Crud[0].Metadata); + } + + [Fact] + public async Task IncludeOldValuesTest() + { + var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions + { + Database = new SQLOpenOptions { DbFilename = "IncludeOldValuesTest.db" }, + Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions + { + TrackPreviousValues = new TrackPreviousOptions() + }), + }); + await ResetDB(db); + + await db.Execute("INSERT INTO assets (id, description) VALUES(?, ?);", ["a185b7e1-dffa-4a9a-888c-15c0f0cac4b3", "entry"]); + await db.Execute("DELETE FROM ps_crud;"); + await db.Execute("UPDATE assets SET description = ?", ["new name"]); + + var batch = await db.GetNextCrudTransaction(); + Assert.True(batch?.Crud[0].PreviousValues?.ContainsKey("description")); + Assert.Equal("entry", batch?.Crud[0].PreviousValues?["description"]); + } + + [Fact] + public async Task IncludeOldValuesWithColumnFilterTest() + { + var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions + { + Database = new SQLOpenOptions { DbFilename = "IncludeOldValuesWithColumnFilterTest.db" }, + Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions + { + TrackPreviousValues = new TrackPreviousOptions + { + Columns = new List { "description" } + } + }), + }); + await ResetDB(db); + + await db.Execute("INSERT INTO assets (id, description, make) VALUES(?, ?, ?);", ["a185b7e1-dffa-4a9a-888c-15c0f0cac4b3", "entry", "make1"]); + await db.Execute("DELETE FROM ps_crud;"); + await db.Execute("UPDATE assets SET description = ?, make = ?", ["new name", "make2"]); + + var batch = await db.GetNextCrudTransaction(); + Assert.NotNull(batch?.Crud[0].PreviousValues); + Assert.Equal("entry", batch?.Crud[0].PreviousValues?["description"]); + Assert.False(batch?.Crud[0].PreviousValues!.ContainsKey("make")); + } + + + [Fact] + public async Task IncludeOldValuesWhenChangedTest() + { + var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions + { + Database = new SQLOpenOptions { DbFilename = "oldValuesDb" }, + Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions + { + TrackPreviousValues = new TrackPreviousOptions + { + OnlyWhenChanged = true + } + }), + }); + await ResetDB(db); + + await db.Execute("INSERT INTO assets (id, description, make) VALUES(uuid(), ?, ?);", ["name", "make1"]); + await db.Execute("DELETE FROM ps_crud;"); + await db.Execute("UPDATE assets SET description = ?", ["new name"]); + + var batch = await db.GetNextCrudTransaction(); + Assert.Single(batch!.Crud); + Assert.NotNull(batch.Crud[0].PreviousValues); + Assert.Equal("name", batch.Crud[0].PreviousValues!["description"]); + Assert.False(batch.Crud[0].PreviousValues!.ContainsKey("make")); + } + + [Fact] + public async Task IgnoreEmptyUpdateTest() + { + var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions + { + Database = new SQLOpenOptions { DbFilename = "IgnoreEmptyUpdateTest.db" }, + Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions + { + IgnoreEmptyUpdates = true + }), + }); + await ResetDB(db); + await db.Execute("INSERT INTO assets (id, description) VALUES(?, ?);", [testId, "name"]); + await db.Execute("DELETE FROM ps_crud;"); + await db.Execute("UPDATE assets SET description = ?", ["name"]); + + var batch = await db.GetNextCrudTransaction(); + Assert.Null(batch); + } + [Fact] public async Task Insert_RecordCrudEntryTest() { diff --git a/Tests/PowerSync/PowerSync.Common.Tests/TestSchema.cs b/Tests/PowerSync/PowerSync.Common.Tests/TestSchema.cs index 1998f45..9b5d7f9 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/TestSchema.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/TestSchema.cs @@ -4,7 +4,7 @@ namespace PowerSync.Common.Tests; public class TestSchema { - public static readonly Table Assets = new Table(new Dictionary + public static readonly Dictionary AssetsColumns = new Dictionary { { "created_at", ColumnType.TEXT }, { "make", ColumnType.TEXT }, @@ -14,10 +14,12 @@ public class TestSchema { "user_id", ColumnType.TEXT }, { "customer_id", ColumnType.TEXT }, { "description", ColumnType.TEXT }, - }, new TableOptions - { - Indexes = new Dictionary> { { "makemodel", new List { "make", "model" } } } - }); + }; + + public static readonly Table Assets = new Table(AssetsColumns, new TableOptions + { + Indexes = new Dictionary> { { "makemodel", new List { "make", "model" } } }, + }); public static readonly Table Customers = new Table(new Dictionary { @@ -30,4 +32,15 @@ public class TestSchema { "assets", Assets }, { "customers", Customers } }); + + public static Schema GetSchemaWithCustomAssetOptions(TableOptions? assetOptions = null) + { + var customAssets = new Table(AssetsColumns, assetOptions); + + return new Schema(new Dictionary + { + { "assets", customAssets }, + // { "customers", Customers } + }); + } } \ No newline at end of file diff --git a/Tools/Setup/Setup.cs b/Tools/Setup/Setup.cs index 81fd9c3..17c5426 100644 --- a/Tools/Setup/Setup.cs +++ b/Tools/Setup/Setup.cs @@ -5,9 +5,12 @@ using System.Collections.Generic; using System.IO.Compression; +/// +/// Execute with `dotnet run --project Tools/Setup` +/// public class PowerSyncSetup { - private const string VERSION = "0.4.9"; + private const string VERSION = "0.4.10"; private const string GITHUB_BASE_URL = $"https://github.com/powersync-ja/powersync-sqlite-core/releases/download/v{VERSION}"; private const string MAVEN_BASE_URL = $"https://repo1.maven.org/maven2/com/powersync/powersync-sqlite-core/{VERSION}";