Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions packages/rs-platform-wallet-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ rusqlite = { version = "0.38", features = [
"backup",
"blob",
"hooks",
"trace",
], optional = true }
refinery = { version = "0.9", default-features = false, features = [
"rusqlite",
Expand Down Expand Up @@ -72,7 +73,9 @@ assert_cmd = "2"
predicates = "3"
static_assertions = "1"
filetime = "0.2"
platform-wallet-storage = { path = ".", features = ["sqlite", "cli", "test-helpers"] }
tracing-test = { version = "0.2", features = ["no-env-filter"] }
serial_test = "3"
platform-wallet-storage = { path = ".", features = ["sqlite", "cli", "__test-helpers"] }

[features]
default = ["sqlite", "cli"]
Expand Down Expand Up @@ -106,6 +109,8 @@ cli = [
# beyond a `// pub mod secrets;` marker in `src/lib.rs`.
secrets = []
# Exposes `lock_conn_for_test` / `config_for_test` accessors on
# `SqlitePersister` so this crate's own integration tests can probe the
# write connection. Downstream code MUST NOT enable this feature.
test-helpers = ["sqlite"]
# `SqlitePersister` so this crate's own integration tests can probe
# the write connection. The double-underscore prefix follows Cargo's
# convention for "MUST NOT enable from downstream" features
# (https://doc.rust-lang.org/cargo/reference/features.html#feature-resolver-version-2).
__test-helpers = ["sqlite"]
50 changes: 48 additions & 2 deletions packages/rs-platform-wallet-storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,52 @@ structured so a future `SecretStore` (currently sketched in
safe under a concurrent writer.
- **No private-key material.** See [`SECRETS.md`](./SECRETS.md).
- `Send + Sync`; usable behind `Arc<dyn PlatformWalletPersistence>`.
- Writers use `prepare_cached` so each INSERT/UPDATE is parsed once
per `Connection` lifetime; subsequent flushes hit the cache.

## Flush semantics

`flush()` and `Immediate`-mode `store()` succeed-or-restore: on a
transient SQLite failure (`SQLITE_BUSY` / `SQLITE_LOCKED`) the
buffered changeset is merged back into the per-wallet buffer (LWW
with anything `store()`-d during the failed transaction) and the
call returns a `PersistenceError::Backend(_)` whose payload contains
the marker `flush failed transiently`. **Retry the call** — do not
discard state. Fatal failures (integrity check, encode error, mutex
poison, …) drop the buffer and surface verbatim.

The full classification lives on
[`WalletStorageError::is_transient`](src/sqlite/error.rs); the
boundary mapping into `PersistenceError::Backend(String)` flattens
the `Display` chain so operators can grep for variant names + hex
wallet ids in production logs.

## load() reconstruction

`SqlitePersister::load()` populates `ClientStartState` with every
sub-area that has a wired-up reader today:

| Slot | Reader | Status |
|---|---|---|
| `platform_addresses` | `schema::platform_addrs::load_state` | covered |
| `identities` | `schema::identities::load_state` | covered |
| `contacts` | `schema::contacts::load_state` | covered |
| `asset_locks` | `schema::asset_locks::load_state` | covered |
| `wallets` | — | empty pending upstream `Wallet::from_persisted` |

`ClientStartState` is `#[non_exhaustive]` — initialise via
`Default::default()` and overwrite individual slots; do not
exhaustively destructure. A future slot addition is non-breaking for
callers that respect the marker.

Each reader skips per-row decode failures (corruption tolerance):
the call still returns `Ok(state)` with the partial result, every
skipped row emits a structured `tracing::warn!` with `wallet_id` +
`table` + `error`, and the load summary log carries a
`skipped_rows` counter alongside `wallets_seen`,
`addresses_loaded`, `identities_loaded`, `contacts_loaded`,
`asset_locks_loaded`, `wallets_rehydrated`, and
`wallets_pending_rehydration`.

## Library usage

Expand All @@ -30,7 +76,7 @@ use platform_wallet_storage::{SqlitePersister, SqlitePersisterConfig};
let config = SqlitePersisterConfig::new("/tmp/wallets.db");
let persister: Arc<dyn PlatformWalletPersistence> =
Arc::new(SqlitePersister::open(config)?);
# Ok::<_, platform_wallet_storage::SqlitePersisterError>(())
# Ok::<_, platform_wallet_storage::WalletStorageError>(())
```

The same types are also reachable via their canonical submodule path —
Expand Down Expand Up @@ -71,7 +117,7 @@ validation failure (e.g. corrupt backup source).
| `sqlite` | yes | SQLite persister (`platform_wallet_storage::sqlite`) and all of its native deps (`rusqlite`, `refinery`, `dpp`, `dash-sdk`, `key-wallet`, etc.) |
| `cli` | yes | Maintenance binary `platform-wallet-storage`. Implies `sqlite`. |
| `secrets` | no | Reserved for the future `SecretStore` submodule. No code lands today. |
| `test-helpers` | no | Crate-private `lock_conn_for_test` / `config_for_test` accessors. Downstream MUST NOT enable. |
| `__test-helpers` | no | Crate-private `lock_conn_for_test` / `config_for_test` accessors. The double-underscore prefix follows Cargo's "do not enable from downstream" convention; the methods are also `#[doc(hidden)]`. |

`cargo build -p platform-wallet-storage --no-default-features` builds
the crate with neither the SQLite backend nor the CLI compiled in.
Expand Down
105 changes: 102 additions & 3 deletions packages/rs-platform-wallet-storage/src/sqlite/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ impl Buffer {
Ok(())
}

/// Drain (return) the buffered changeset for `wallet_id`. Returns
/// `None` if there is no pending data.
pub fn drain(
/// Move the buffered changeset out for `wallet_id`. Returns
/// `None` when nothing is staged. Callers MUST either commit it
/// (success path) or hand it back via [`Self::restore`] on
/// transient failure — dropping it on error == data loss.
pub fn take_for_flush(
&self,
wallet_id: &WalletId,
) -> Result<Option<PlatformWalletChangeSet>, WalletStorageError> {
Expand All @@ -54,6 +56,47 @@ impl Buffer {
Ok(guard.remove(wallet_id).filter(|cs| !cs.is_empty()))
}

/// Re-merge a previously-taken changeset back into the buffer
/// after a transient flush failure. Uses each sub-changeset's
/// `Merge` impl so any `store(...)` that arrived between the
/// `take_for_flush` and the failure wins on overlapping fields
/// (LWW). No clone: the caller hands ownership back.
pub fn restore(
&self,
wallet_id: WalletId,
cs: PlatformWalletChangeSet,
) -> Result<(), WalletStorageError> {
if cs.is_empty() {
return Ok(());
}
let mut guard = self
.inner
.lock()
.map_err(|_| WalletStorageError::LockPoisoned)?;
// Merge `cs` (older snapshot) FIRST, then re-apply anything
// that arrived later — done by swapping current with `cs` and
// merging the (originally newer) buffered value on top.
let entry = guard.entry(wallet_id).or_default();
let newer = std::mem::take(entry);
*entry = cs;
entry.merge(newer);
Ok(())
}

/// Deprecated alias for [`Self::take_for_flush`]. New call sites
/// MUST use the renamed pair so the take/restore lifecycle is
/// explicit.
#[deprecated(
since = "3.1.0-dev.1",
note = "use take_for_flush + restore for retry-safe semantics; remove in 3.2.0"
)]
pub fn drain(
&self,
wallet_id: &WalletId,
) -> Result<Option<PlatformWalletChangeSet>, WalletStorageError> {
self.take_for_flush(wallet_id)
}

/// Every wallet currently holding buffered data, sorted by id for
/// deterministic flush ordering.
pub fn dirty_wallets(&self) -> Result<Vec<WalletId>, WalletStorageError> {
Expand All @@ -66,3 +109,59 @@ impl Buffer {
Ok(ids)
}
}

#[cfg(test)]
mod tests {
use super::*;
use platform_wallet::changeset::CoreChangeSet;

fn cs_height(synced: u32, last_processed: u32) -> PlatformWalletChangeSet {
PlatformWalletChangeSet {
core: Some(CoreChangeSet {
synced_height: Some(synced),
last_processed_height: Some(last_processed),
..Default::default()
}),
..Default::default()
}
}

#[test]
fn take_then_restore_with_intervening_store_merges_lww() {
let buf = Buffer::new();
let w = [0xAAu8; 32];
// Stage A (older), take it out.
buf.store(w, cs_height(10, 10)).unwrap();
let taken = buf
.take_for_flush(&w)
.unwrap()
.expect("staged value present");
// B arrives during the imagined flush window.
buf.store(w, cs_height(20, 5)).unwrap();
// Restore the taken (older) snapshot — newer must win on the
// monotonic-max merge of `synced_height` / `last_processed_height`.
buf.restore(w, taken).unwrap();
let merged = buf
.take_for_flush(&w)
.unwrap()
.expect("merged value present");
let core = merged.core.expect("core present");
assert_eq!(core.synced_height, Some(20));
assert_eq!(core.last_processed_height, Some(10));
}

#[test]
fn restore_into_empty_slot_inserts() {
let buf = Buffer::new();
let w = [0xBBu8; 32];
// Buffer has nothing for `w`; restore must seed the slot.
buf.restore(w, cs_height(7, 7)).unwrap();
let got = buf
.take_for_flush(&w)
.unwrap()
.expect("restored value present");
let core = got.core.expect("core present");
assert_eq!(core.synced_height, Some(7));
assert_eq!(core.last_processed_height, Some(7));
}
}
Loading
Loading