diff --git a/engine/packages/epoxy/src/ops/kv/get_local.rs b/engine/packages/epoxy/src/ops/kv/get_local.rs index 1c9ec16d17..c2fd540150 100644 --- a/engine/packages/epoxy/src/ops/kv/get_local.rs +++ b/engine/packages/epoxy/src/ops/kv/get_local.rs @@ -1,8 +1,11 @@ -use anyhow::*; +use anyhow::Result; use epoxy_protocol::protocol::ReplicaId; use gas::prelude::*; +use universaldb::utils::{FormalKey, IsolationLevel::Serializable}; -use super::read_value; +use crate::keys::{ + self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey, +}; #[derive(Debug)] pub struct Input { @@ -10,26 +13,98 @@ pub struct Input { pub key: Vec, } +#[operation] +pub async fn epoxy_kv_get_local( + ctx: &OperationCtx, + input: &Input, +) -> Result> { + Ok(read_local_value(ctx, input.replica_id, &input.key, false) + .await? + .value) +} + #[derive(Debug)] -pub struct Output { - pub value: Option>, - pub version: Option, - pub mutable: bool, +pub(crate) struct LocalValueRead { + pub value: Option, + pub cache_value: Option, } -#[operation] -pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result { - let committed_value = - read_value::read_local_value(ctx, input.replica_id, input.key.clone(), false) - .await? - .value; - - Ok(Output { - value: committed_value.as_ref().map(|value| value.value.clone()), - version: committed_value.as_ref().map(|value| value.version), - mutable: committed_value - .as_ref() - .map(|value| value.mutable) - .unwrap_or(false), - }) +/// Reads a committed value from the local replica with dual-read fallback. +/// +/// This performs a cascading lookup across storage generations so that values written +/// before the v2 migration remain readable without a full data migration: +/// +/// 1. **V2 value** (`EPOXY_V2/replica/{id}/kv/{key}/value`). The current write path. +/// 2. **Legacy committed value** (`EPOXY_V1/replica/{id}/kv/{key}/committed_value`). Written by +/// the original EPaxos protocol. Deserialized as raw bytes with version 0 and mutable=false. +/// 3. **Optimistic cache** (`EPOXY_V2/replica/{id}/kv/{key}/cache`). Only checked when +/// `include_cache` is true. Contains values fetched from remote replicas for the optimistic +/// read path. +/// +/// The first path that returns a value wins. This lets the background backfill migrate data +/// at its own pace without blocking reads. +pub(crate) async fn read_local_value( + ctx: &OperationCtx, + replica_id: ReplicaId, + key: &[u8], + include_cache: bool, +) -> Result { + ctx.udb()? + .run(|tx| { + async move { + let value_key = KvValueKey::new(key.to_vec()); + let legacy_value_key = LegacyCommittedValueKey::new(key.to_vec()); + let cache_key = KvOptimisticCacheKey::new(key.to_vec()); + let packed_value_key = keys::subspace(replica_id).pack(&value_key); + let packed_legacy_value_key = + keys::legacy_subspace(replica_id).pack(&legacy_value_key); + let packed_cache_key = keys::subspace(replica_id).pack(&cache_key); + + let (local_value, legacy_value, cache_value) = tokio::try_join!( + tx.get(&packed_value_key, Serializable), + tx.get(&packed_legacy_value_key, Serializable), + async { + if include_cache { + tx.get(&packed_cache_key, Serializable).await + } else { + Ok(None) + } + }, + )?; + + // V2 committed value (current write path) + if let Some(value) = local_value { + return Ok(LocalValueRead { + value: Some(value_key.deserialize(&value)?), + cache_value: None, + }); + } + + // Legacy committed value (original EPaxos raw bytes) + if let Some(value) = legacy_value { + return Ok(LocalValueRead { + value: Some(CommittedValue { + value: legacy_value_key.deserialize(&value)?, + version: 0, + mutable: false, + }), + cache_value: None, + }); + } + + if let Some(value) = cache_value { + return Ok(LocalValueRead { + value: None, + cache_value: Some(cache_key.deserialize(&value)?), + }); + } + + Ok(LocalValueRead { + value: None, + cache_value: None, + }) + } + }) + .custom_instrument(tracing::info_span!("read_local_value_tx")) + .await } diff --git a/engine/packages/epoxy/src/ops/kv/get_optimistic.rs b/engine/packages/epoxy/src/ops/kv/get_optimistic.rs index fe0b9a67af..fb54da7f7f 100644 --- a/engine/packages/epoxy/src/ops/kv/get_optimistic.rs +++ b/engine/packages/epoxy/src/ops/kv/get_optimistic.rs @@ -9,7 +9,7 @@ use crate::{ utils, }; -use super::read_value; +use super::get_local::read_local_value; #[derive(Debug)] pub struct Input { @@ -50,20 +50,14 @@ pub struct Output { /// best-effort lookup. #[operation] pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Result { - let local_read = read_value::read_local_value( + let local_read = read_local_value( ctx, input.replica_id, - input.key.clone(), + &input.key, input.caching_behavior == protocol::CachingBehavior::Optimistic, ) .await?; - if local_read.value.is_some() { - return Ok(Output { - value: local_read.value.map(|value| value.value), - }); - } - - if let Some(value) = local_read.cache_value { + if let Some(value) = local_read.value.or(local_read.cache_value) { return Ok(Output { value: Some(value.value), }); @@ -114,22 +108,21 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul ) .await?; - for response in responses { - if let Some(value) = response { - let value = CommittedValue { - value: value.value, - version: value.version, - mutable: value.mutable, - }; - - if input.caching_behavior == protocol::CachingBehavior::Optimistic { - cache_fanout_value(ctx, input.replica_id, input.key.clone(), value.clone()).await?; - } + // Should only have 1 response + if let Some(value) = responses.first().and_then(|r| r.response) { + let value = CommittedValue { + value: value.value, + version: value.version, + mutable: value.mutable, + }; - return Ok(Output { - value: Some(value.value), - }); + if input.caching_behavior == protocol::CachingBehavior::Optimistic { + cache_fanout_value(ctx, input.replica_id, input.key.clone(), value.clone()).await?; } + + return Ok(Output { + value: Some(value.value), + }); } // No value found in any datacenter diff --git a/engine/packages/epoxy/src/ops/kv/mod.rs b/engine/packages/epoxy/src/ops/kv/mod.rs index ecdbf3b33b..aac85283e3 100644 --- a/engine/packages/epoxy/src/ops/kv/mod.rs +++ b/engine/packages/epoxy/src/ops/kv/mod.rs @@ -1,4 +1,3 @@ pub mod get_local; pub mod get_optimistic; pub mod purge_local; -pub(crate) mod read_value; diff --git a/engine/packages/epoxy/src/ops/kv/read_value.rs b/engine/packages/epoxy/src/ops/kv/read_value.rs deleted file mode 100644 index 3d36af6e0a..0000000000 --- a/engine/packages/epoxy/src/ops/kv/read_value.rs +++ /dev/null @@ -1,92 +0,0 @@ -use anyhow::Result; -use epoxy_protocol::protocol::ReplicaId; -use gas::prelude::*; -use universaldb::utils::{FormalKey, IsolationLevel::Serializable}; - -use crate::keys::{ - self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey, -}; - -#[derive(Debug)] -pub(crate) struct LocalValueRead { - pub value: Option, - pub cache_value: Option, -} - -/// Reads a committed value from the local replica with dual-read fallback. -/// -/// This performs a cascading lookup across storage generations so that values written -/// before the v2 migration remain readable without a full data migration: -/// -/// 1. **V2 value** (`EPOXY_V2/replica/{id}/kv/{key}/value`). The current write path. -/// 2. **Legacy committed value** (`EPOXY_V1/replica/{id}/kv/{key}/committed_value`). Written by -/// the original EPaxos protocol. Deserialized as raw bytes with version 0 and mutable=false. -/// 3. **Optimistic cache** (`EPOXY_V2/replica/{id}/kv/{key}/cache`). Only checked when -/// `include_cache` is true. Contains values fetched from remote replicas for the optimistic -/// read path. -/// -/// The first path that returns a value wins. This lets the background backfill migrate data -/// at its own pace without blocking reads. -pub(crate) async fn read_local_value( - ctx: &OperationCtx, - replica_id: ReplicaId, - key: Vec, - include_cache: bool, -) -> Result { - let value_key = KvValueKey::new(key.clone()); - let legacy_value_key = LegacyCommittedValueKey::new(key.clone()); - let cache_key = KvOptimisticCacheKey::new(key); - let subspace = keys::subspace(replica_id); - let legacy_subspace = keys::legacy_subspace(replica_id); - let packed_value_key = subspace.pack(&value_key); - let packed_legacy_value_key = legacy_subspace.pack(&legacy_value_key); - let packed_cache_key = subspace.pack(&cache_key); - - ctx.udb()? - .run(|tx| { - let packed_value_key = packed_value_key.clone(); - let packed_legacy_value_key = packed_legacy_value_key.clone(); - let packed_cache_key = packed_cache_key.clone(); - let value_key = value_key.clone(); - let legacy_value_key = legacy_value_key.clone(); - let cache_key = cache_key.clone(); - - async move { - // V2 committed value (current write path) - if let Some(value) = tx.get(&packed_value_key, Serializable).await? { - return Ok(LocalValueRead { - value: Some(value_key.deserialize(&value)?), - cache_value: None, - }); - } - - // Legacy committed value (original EPaxos raw bytes) - if let Some(value) = tx.get(&packed_legacy_value_key, Serializable).await? { - return Ok(LocalValueRead { - value: Some(CommittedValue { - value: legacy_value_key.deserialize(&value)?, - version: 0, - mutable: false, - }), - cache_value: None, - }); - } - - let cache_value = if include_cache { - tx.get(&packed_cache_key, Serializable) - .await? - .map(|value| cache_key.deserialize(&value)) - .transpose()? - } else { - None - }; - - Ok(LocalValueRead { - value: None, - cache_value, - }) - } - }) - .custom_instrument(tracing::info_span!("read_local_value_tx")) - .await -} diff --git a/engine/packages/epoxy/src/replica/message_request.rs b/engine/packages/epoxy/src/replica/message_request.rs index c2cf1054b6..4c75941127 100644 --- a/engine/packages/epoxy/src/replica/message_request.rs +++ b/engine/packages/epoxy/src/replica/message_request.rs @@ -129,10 +129,10 @@ async fn message_request_inner( .await?; protocol::ResponseKind::KvGetResponse(protocol::KvGetResponse { - value: result.value.map(|value| protocol::CommittedValue { - value, - version: result.version.unwrap_or(0), - mutable: result.mutable, + value: result.map(|value| protocol::CommittedValue { + value: value.value, + version: value.version, + mutable: value.mutable, }), }) }