Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions rust/api/src/kv_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ macro_rules! define_kv_store_tests {
create_test!(list_should_honour_page_size_and_key_prefix_if_provided);
create_test!(list_should_return_zero_global_version_when_global_versioning_not_enabled);
create_test!(list_should_limit_max_page_size);
create_test!(list_should_return_results_ordered_by_creation_time);
create_test!(list_should_paginate_by_creation_time_with_prefix);
};
}

Expand Down Expand Up @@ -506,6 +508,59 @@ pub trait KvStoreTestSuite {
Ok(())
}

async fn list_should_return_results_ordered_by_creation_time() -> Result<(), VssError> {
let kv_store = Self::create_store().await;
let ctx = TestContext::new(&kv_store);

// Insert keys in reverse-alphabetical order so creation order != key order.
ctx.put_objects(Some(0), vec![kv("z_first", "v1", 0)]).await?;
ctx.put_objects(Some(1), vec![kv("m_second", "v1", 0)]).await?;
ctx.put_objects(Some(2), vec![kv("a_third", "v1", 0)]).await?;

let page = ctx.list(None, None, None).await?;
let keys: Vec<&str> = page.key_versions.iter().map(|kv| kv.key.as_str()).collect();

// Results should be in creation order, not alphabetical.
assert_eq!(keys, vec!["z_first", "m_second", "a_third"]);

Ok(())
}

async fn list_should_paginate_by_creation_time_with_prefix() -> Result<(), VssError> {
let kv_store = Self::create_store().await;
let ctx = TestContext::new(&kv_store);

// Insert prefixed keys in reverse-alphabetical order with a page_size of 1
// to force multiple pages and verify cross-page ordering.
ctx.put_objects(Some(0), vec![kv("pfx_z", "v1", 0)]).await?;
ctx.put_objects(Some(1), vec![kv("pfx_a", "v1", 0)]).await?;
ctx.put_objects(Some(2), vec![kv("other", "v1", 0)]).await?;
ctx.put_objects(Some(3), vec![kv("pfx_m", "v1", 0)]).await?;

let mut next_page_token: Option<String> = None;
let mut all_keys: Vec<String> = Vec::new();

loop {
let current_page = match next_page_token.take() {
None => ctx.list(None, Some(1), Some("pfx_".to_string())).await?,
Some(token) => ctx.list(Some(token), Some(1), Some("pfx_".to_string())).await?,
};

if current_page.key_versions.is_empty() {
break;
}

assert!(current_page.key_versions.len() <= 1);
all_keys.extend(current_page.key_versions.into_iter().map(|kv| kv.key));
next_page_token = current_page.next_page_token;
}

// Should get prefixed keys in creation order, excluding "other".
assert_eq!(all_keys, vec!["pfx_z", "pfx_a", "pfx_m"]);

Ok(())
}

async fn list_should_limit_max_page_size() -> Result<(), VssError> {
let kv_store = Self::create_store().await;
let ctx = TestContext::new(&kv_store);
Expand Down
3 changes: 3 additions & 0 deletions rust/impls/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub(crate) const MIGRATIONS: &[&str] = &[
PRIMARY KEY (user_token, store_id, key)
);",
"ALTER TABLE vss_db DROP CONSTRAINT IF EXISTS vss_db_store_id_check;",
"UPDATE vss_db SET created_at = COALESCE(last_updated_at, NOW()) WHERE created_at IS NULL;",
"ALTER TABLE vss_db ALTER COLUMN created_at SET NOT NULL;",
"CREATE INDEX idx_vss_db_created_at ON vss_db (user_token, store_id, created_at, key);",
];
#[cfg(test)]
pub(crate) const DUMMY_MIGRATION: &str = "SELECT 1 WHERE FALSE;";
76 changes: 64 additions & 12 deletions rust/impls/src/postgres_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ pub(crate) struct VssDbRecord {
const KEY_COLUMN: &str = "key";
const VALUE_COLUMN: &str = "value";
const VERSION_COLUMN: &str = "version";
const CREATED_AT_COLUMN: &str = "created_at";

/// Page token is encoded as 20-char zero-padded epoch microseconds followed by the key.
fn encode_page_token(created_at: &chrono::DateTime<Utc>, key: &str) -> String {
format!("{:020}{}", created_at.timestamp_micros(), key)
}

fn decode_page_token(token: &str) -> Result<(chrono::DateTime<Utc>, String), VssError> {
if token.len() < 20 {
return Err(VssError::InvalidRequestError("Invalid page token".to_string()));
}
let micros: i64 = token[..20]
.parse()
.map_err(|_| VssError::InvalidRequestError("Invalid page token".to_string()))?;
let created_at = chrono::DateTime::from_timestamp_micros(micros)
.ok_or_else(|| VssError::InvalidRequestError("Invalid page token".to_string()))?;
let key = token[20..].to_string();
Ok((created_at, key))
}

/// The maximum number of key versions that can be returned in a single page.
///
Expand Down Expand Up @@ -663,17 +682,24 @@ where

let conn = self.pool.get().await?;

let stmt = "SELECT key, version FROM vss_db WHERE user_token = $1 AND store_id = $2 AND key > $3 AND key LIKE $4 ORDER BY key LIMIT $5";

let key_like = format!("{}%", key_prefix.as_deref().unwrap_or_default());
let page_token_param = page_token.as_deref().unwrap_or_default();
let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
vec![&user_token, &store_id, &page_token_param, &key_like, &limit];

let rows = conn
.query(stmt, &params)
.await
.map_err(|e| Error::new(ErrorKind::Other, format!("Query error: {}", e)))?;
let rows = if let Some(ref token) = page_token {
let (page_created_at, page_key) = decode_page_token(token)?;
let stmt = "SELECT key, version, created_at FROM vss_db WHERE user_token = $1 AND store_id = $2 AND (created_at, key) > ($3, $4) AND key LIKE $5 ORDER BY created_at, key LIMIT $6";
let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
vec![&user_token, &store_id, &page_created_at, &page_key, &key_like, &limit];
conn.query(stmt, &params)
.await
.map_err(|e| Error::new(ErrorKind::Other, format!("Query error: {}", e)))?
} else {
let stmt = "SELECT key, version, created_at FROM vss_db WHERE user_token = $1 AND store_id = $2 AND key LIKE $3 ORDER BY created_at, key LIMIT $4";
let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
vec![&user_token, &store_id, &key_like, &limit];
conn.query(stmt, &params)
.await
.map_err(|e| Error::new(ErrorKind::Other, format!("Query error: {}", e)))?
};

let key_versions: Vec<_> = rows
.iter()
Expand All @@ -686,8 +712,10 @@ where
.collect();

let mut next_page_token = Some("".to_string());
if !key_versions.is_empty() {
next_page_token = key_versions.get(key_versions.len() - 1).map(|kv| kv.key.to_string());
if let Some(last_kv) = key_versions.last() {
let last_created_at =
rows[rows.len() - 1].get::<&str, chrono::DateTime<Utc>>(CREATED_AT_COLUMN);
next_page_token = Some(encode_page_token(&last_created_at, &last_kv.key));
}

Ok(ListKeyVersionsResponse { key_versions, next_page_token, global_version })
Expand All @@ -696,11 +724,12 @@ where

#[cfg(test)]
mod tests {
use super::{drop_database, DUMMY_MIGRATION, MIGRATIONS};
use super::{decode_page_token, drop_database, encode_page_token, DUMMY_MIGRATION, MIGRATIONS};
use crate::postgres_store::PostgresPlaintextBackend;
use api::define_kv_store_tests;
use api::kv_store::KvStore;
use api::types::{DeleteObjectRequest, GetObjectRequest, KeyValue, PutObjectRequest};
use chrono::{TimeZone, Utc as ChronoUtc};

use bytes::Bytes;
use tokio::sync::OnceCell;
Expand Down Expand Up @@ -884,4 +913,27 @@ mod tests {

drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap();
}

#[test]
fn page_token_roundtrips() {
let created_at = ChronoUtc.with_ymd_and_hms(2026, 3, 15, 12, 30, 45).unwrap();
let key = "some/test/key";

let token = encode_page_token(&created_at, key);
let (decoded_time, decoded_key) = decode_page_token(&token).unwrap();

assert_eq!(decoded_time, created_at);
assert_eq!(decoded_key, key);
}

#[test]
fn page_token_rejects_short_input() {
assert!(decode_page_token("tooshort").is_err());
assert!(decode_page_token("").is_err());
}

#[test]
fn page_token_rejects_non_numeric_prefix() {
assert!(decode_page_token("abcdefghijklmnopqrstkey").is_err());
}
}
Loading