Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 44 additions & 36 deletions PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,48 @@ public async Task WaitForReady()
await isReadyTask;
}

public async Task WaitForFirstSync(CancellationToken? cancellationToken = null)
public class PrioritySyncRequest
{
if (CurrentStatus.HasSynced == true)
public CancellationToken? Token { get; set; }
public int? Priority { get; set; }
}

/// <summary>
/// Wait for the first sync operation to complete.
/// </summary>
/// <param name="request">
/// An object providing a cancellation token and a priority target.
/// When a priority target is set, the task may complete when all buckets with the given (or higher)
/// priorities have been synchronized. This can be earlier than a complete sync.
/// </param>
/// <returns>A task which will complete once the first full sync has completed.</returns>
public async Task WaitForFirstSync(PrioritySyncRequest? request = null)
{
var priority = request?.Priority ?? null;
var cancellationToken = request?.Token ?? null;

bool StatusMatches(SyncStatus status)
{
if (priority == null)
{
return status.HasSynced == true;
}
return status.StatusForPriority(priority.Value).HasSynced == true;
}

if (StatusMatches(CurrentStatus))
{
return;
}

var tcs = new TaskCompletionSource<bool>();
var cts = new CancellationTokenSource();

var _ = Task.Run(() =>
_ = Task.Run(() =>
{
foreach (var update in Listen(cts.Token))
{
if (update.StatusChanged?.HasSynced == true)
if (update.StatusChanged != null && StatusMatches(update.StatusChanged!))
{
cts.Cancel();
tcs.SetResult(true);
Expand All @@ -192,7 +219,7 @@ protected async Task Initialize()
await BucketStorageAdapter.Init();
await LoadVersion();
await UpdateSchema(schema);
await UpdateHasSynced();
await ResolveOfflineSyncStatus();
await Database.Execute("PRAGMA RECURSIVE_TRIGGERS=TRUE");
Ready = true;
Emit(new PowerSyncDBEvent { Initialized = true });
Expand All @@ -216,48 +243,29 @@ private async Task LoadVersion()
catch (Exception e)
{
throw new Exception(
$"Unsupported PowerSync extension version. Need >=0.2.0 <1.0.0, got: {sdkVersion}. Details: {e.Message}"
$"Unsupported PowerSync extension version. Need >=0.4.10 <1.0.0, got: {sdkVersion}. Details: {e.Message}"
);
}

// Validate version is >= 0.2.0 and < 1.0.0
if (versionInts[0] != 0 || versionInts[1] < 2 || versionInts[2] < 0)
// Validate version is >= 0.4.10 and < 1.0.0
if (versionInts[0] != 0 || versionInts[1] < 4 || (versionInts[1] == 4 && versionInts[2] < 10))
{
throw new Exception($"Unsupported PowerSync extension version. Need >=0.2.0 <1.0.0, got: {sdkVersion}");
throw new Exception($"Unsupported PowerSync extension version. Need >=0.4.10 <1.0.0, got: {sdkVersion}");
}
}

private record LastSyncedResult(int? priority, string? last_synced_at);

protected async Task UpdateHasSynced()
private record OfflineSyncStatusResult(string r);
protected async Task ResolveOfflineSyncStatus()
{
var results = await Database.GetAll<LastSyncedResult>(
"SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC"
);
var result = await Database.Get<OfflineSyncStatusResult>("SELECT powersync_offline_sync_status() as r");
var parsed = JsonConvert.DeserializeObject<CoreSyncStatus>(result.r);

DateTime? lastCompleteSync = null;
var parsedSyncStatus = CoreInstructionHelpers.CoreStatusToSyncStatus(parsed!);
var updatedStatus = CurrentStatus.CreateUpdatedStatus(parsedSyncStatus);

// TODO: Will be altered/extended when reporting individual sync priority statuses is supported
foreach (var result in results)
if (!updatedStatus.IsEqual(CurrentStatus))
{
var parsedDate = DateTime.Parse(result.last_synced_at + "Z");

if (result.priority == FULL_SYNC_PRIORITY)
{
// This lowest-possible priority represents a complete sync.
lastCompleteSync = parsedDate;
}
}

var hasSynced = lastCompleteSync != null;
if (hasSynced != CurrentStatus.HasSynced)
{
CurrentStatus = new SyncStatus(new SyncStatusOptions(CurrentStatus.Options)
{
HasSynced = hasSynced,
LastSyncedAt = lastCompleteSync,
});

CurrentStatus = updatedStatus;
Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus });
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,6 @@ public class BucketStorageEvent
public interface IBucketStorageAdapter : IEventStream<BucketStorageEvent>
{
Task Init();
Task SaveSyncData(SyncDataBatch batch);
Task RemoveBuckets(string[] buckets);
Task SetTargetCheckpoint(Checkpoint checkpoint);

void StartSession();

Task<BucketState[]> GetBucketStates();

Task<SyncLocalDatabaseResult> SyncLocalDatabase(Checkpoint checkpoint);

Task<CrudEntry?> NextCrudItem();
Task<bool> HasCrud();
Expand All @@ -112,12 +103,6 @@ public interface IBucketStorageAdapter : IEventStream<BucketStorageEvent>
Task<bool> HasCompletedSync();
Task<bool> UpdateLocalTarget(Func<Task<string>> callback);

/// <summary>
/// Exposed for tests only.
/// </summary>
Task AutoCompact();
Task ForceCompact();

string GetMaxOpId();

/// <summary>
Expand Down
187 changes: 0 additions & 187 deletions PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ public class SqliteBucketStorage : EventStream<BucketStorageEvent>, IBucketStora
private readonly HashSet<string> tableNames;
private string? clientId;

private static readonly int COMPACT_OPERATION_INTERVAL = 1000;
private int compactCounter = COMPACT_OPERATION_INTERVAL;

private ILogger logger;

private CancellationTokenSource updateCts;
Expand Down Expand Up @@ -95,50 +92,6 @@ public string GetMaxOpId()
return MAX_OP_ID;
}

public void StartSession() { }

public async Task<BucketState[]> GetBucketStates()
{
return
await db.GetAll<BucketState>("SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'");
}

public async Task SaveSyncData(SyncDataBatch batch)
{
await db.WriteTransaction(async tx =>
{
int count = 0;
foreach (var b in batch.Buckets)
{
var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
["save", JsonConvert.SerializeObject(new { buckets = new[] { JsonConvert.DeserializeObject(b.ToJSON()) } })]);
logger.LogDebug("saveSyncData {message}", JsonConvert.SerializeObject(result));
count += b.Data.Length;
}
compactCounter += count;
});
}

public async Task RemoveBuckets(string[] buckets)
{
foreach (var bucket in buckets)
{
await DeleteBucket(bucket);
}
}

private async Task DeleteBucket(string bucket)
{
await db.WriteTransaction(async tx =>
{
await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
["delete_bucket", bucket]);
});

logger.LogDebug("Done deleting bucket");
pendingBucketDeletes = true;
}

private record LastSyncedResult(string? synced_at);
public async Task<bool> HasCompletedSync()
{
Expand All @@ -150,71 +103,6 @@ public async Task<bool> HasCompletedSync()
return hasCompletedSync;
}

public async Task<SyncLocalDatabaseResult> SyncLocalDatabase(Checkpoint checkpoint)
{
var validation = await ValidateChecksums(checkpoint);
if (!validation.CheckpointValid)
{
logger.LogError("Checksums failed for {failures}", JsonConvert.SerializeObject(validation.CheckpointFailures));
foreach (var failedBucket in validation.CheckpointFailures ?? [])
{
await DeleteBucket(failedBucket);
}
return new SyncLocalDatabaseResult
{
Ready = false,
CheckpointValid = false,
CheckpointFailures = validation.CheckpointFailures
};
}

var bucketNames = checkpoint.Buckets.Select(b => b.Bucket).ToArray();
await db.WriteTransaction(async tx =>
{
await tx.Execute(
"UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))",
[checkpoint.LastOpId, JsonConvert.SerializeObject(bucketNames)]
);

if (checkpoint.WriteCheckpoint != null)
{
await tx.Execute(
"UPDATE ps_buckets SET last_op = ? WHERE name = '$local'",
[checkpoint.WriteCheckpoint]
);
}
});

var valid = await UpdateObjectsFromBuckets(checkpoint);
if (!valid)
{
logger.LogDebug("Not at a consistent checkpoint - cannot update local db");
return new SyncLocalDatabaseResult
{
Ready = false,
CheckpointValid = true
};
}

await ForceCompact();

return new SyncLocalDatabaseResult
{
Ready = true,
CheckpointValid = true
};
}

private async Task<bool> UpdateObjectsFromBuckets(Checkpoint checkpoint)
{
return await db.WriteTransaction(async tx =>
{
var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
["sync_local", ""]);

return result.InsertId == 1;
});
}

private record ResultResult(object result);

Expand All @@ -227,75 +115,6 @@ public class ResultDetail
public List<string>? FailedBuckets { get; set; }
}

public async Task<SyncLocalDatabaseResult> ValidateChecksums(
Checkpoint checkpoint)
{
var result = await db.Get<ResultResult>("SELECT powersync_validate_checkpoint(?) as result",
[JsonConvert.SerializeObject(checkpoint)]);

logger.LogDebug("validateChecksums result item {message}", JsonConvert.SerializeObject(result));

if (result == null) return new SyncLocalDatabaseResult { CheckpointValid = false, Ready = false };

var resultDetail = JsonConvert.DeserializeObject<ResultDetail>(result.result.ToString() ?? "{}");

if (resultDetail?.Valid == true)
{
return new SyncLocalDatabaseResult { Ready = true, CheckpointValid = true };
}
else
{
return new SyncLocalDatabaseResult
{
CheckpointValid = false,
Ready = false,
CheckpointFailures = resultDetail?.FailedBuckets?.ToArray() ?? []
};
}
}

/// <summary>
/// Force a compact operation, primarily for testing purposes.
/// </summary>
public async Task ForceCompact()
{
compactCounter = COMPACT_OPERATION_INTERVAL;
pendingBucketDeletes = true;

await AutoCompact();
}

public async Task AutoCompact()
{
await DeletePendingBuckets();
await ClearRemoveOps();
}

private async Task DeletePendingBuckets()
{
if (!pendingBucketDeletes) return;

await db.WriteTransaction(async tx =>
{
await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES (?, ?)",
["delete_pending_buckets", ""]);
});

pendingBucketDeletes = false;
}

private async Task ClearRemoveOps()
{
if (compactCounter < COMPACT_OPERATION_INTERVAL) return;

await db.WriteTransaction(async tx =>
{
await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES (?, ?)",
["clear_remove_ops", ""]);
});

compactCounter = 0;
}

private record TargetOpResult(string target_op);
private record SequenceResult(int seq);
Expand Down Expand Up @@ -431,12 +250,6 @@ public async Task<bool> HasCrud()
return await db.GetOptional<object>("SELECT 1 as ignore FROM ps_crud LIMIT 1") != null;
}

public async Task SetTargetCheckpoint(Checkpoint checkpoint)
{
// No Op
await Task.CompletedTask;
}

record ControlResult(string? r);

public async Task<string> Control(string op, object? payload = null)
Expand Down
Loading