Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 96 additions & 21 deletions engine/packages/epoxy/src/ops/kv/get_local.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,110 @@
use anyhow::*;
use anyhow::Result;
use epoxy_protocol::protocol::ReplicaId;
use gas::prelude::*;
use universaldb::utils::{FormalKey, IsolationLevel::Serializable};

use super::read_value;
use crate::keys::{
self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey,
};

#[derive(Debug)]
pub struct Input {
pub replica_id: ReplicaId,
pub key: Vec<u8>,
}

#[operation]
pub async fn epoxy_kv_get_local(
ctx: &OperationCtx,
input: &Input,
) -> Result<Option<CommittedValue>> {
Ok(read_local_value(ctx, input.replica_id, &input.key, false)
.await?
.value)
}

#[derive(Debug)]
pub struct Output {
pub value: Option<Vec<u8>>,
pub version: Option<u64>,
pub mutable: bool,
pub(crate) struct LocalValueRead {
pub value: Option<CommittedValue>,
pub cache_value: Option<CommittedValue>,
}

#[operation]
pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result<Output> {
let committed_value =
read_value::read_local_value(ctx, input.replica_id, input.key.clone(), false)
.await?
.value;

Ok(Output {
value: committed_value.as_ref().map(|value| value.value.clone()),
version: committed_value.as_ref().map(|value| value.version),
mutable: committed_value
.as_ref()
.map(|value| value.mutable)
.unwrap_or(false),
})
/// Reads a committed value from the local replica with dual-read fallback.
///
/// This performs a cascading lookup across storage generations so that values written
/// before the v2 migration remain readable without a full data migration:
///
/// 1. **V2 value** (`EPOXY_V2/replica/{id}/kv/{key}/value`). The current write path.
/// 2. **Legacy committed value** (`EPOXY_V1/replica/{id}/kv/{key}/committed_value`). Written by
/// the original EPaxos protocol. Deserialized as raw bytes with version 0 and mutable=false.
/// 3. **Optimistic cache** (`EPOXY_V2/replica/{id}/kv/{key}/cache`). Only checked when
/// `include_cache` is true. Contains values fetched from remote replicas for the optimistic
/// read path.
///
/// The first path that returns a value wins. This lets the background backfill migrate data
/// at its own pace without blocking reads.
pub(crate) async fn read_local_value(
ctx: &OperationCtx,
replica_id: ReplicaId,
key: &[u8],
include_cache: bool,
) -> Result<LocalValueRead> {
ctx.udb()?
.run(|tx| {
async move {
let value_key = KvValueKey::new(key.to_vec());
let legacy_value_key = LegacyCommittedValueKey::new(key.to_vec());
let cache_key = KvOptimisticCacheKey::new(key.to_vec());
let packed_value_key = keys::subspace(replica_id).pack(&value_key);
let packed_legacy_value_key =
keys::legacy_subspace(replica_id).pack(&legacy_value_key);
let packed_cache_key = keys::subspace(replica_id).pack(&cache_key);

let (local_value, legacy_value, cache_value) = tokio::try_join!(
tx.get(&packed_value_key, Serializable),
tx.get(&packed_legacy_value_key, Serializable),
async {
if include_cache {
tx.get(&packed_cache_key, Serializable).await
} else {
Ok(None)
}
},
)?;

// V2 committed value (current write path)
if let Some(value) = local_value {
return Ok(LocalValueRead {
value: Some(value_key.deserialize(&value)?),
cache_value: None,
});
}

// Legacy committed value (original EPaxos raw bytes)
if let Some(value) = legacy_value {
return Ok(LocalValueRead {
value: Some(CommittedValue {
value: legacy_value_key.deserialize(&value)?,
version: 0,
mutable: false,
}),
cache_value: None,
});
}

if let Some(value) = cache_value {
return Ok(LocalValueRead {
value: None,
cache_value: Some(cache_key.deserialize(&value)?),
});
}

Ok(LocalValueRead {
value: None,
cache_value: None,
})
}
})
.custom_instrument(tracing::info_span!("read_local_value_tx"))
.await
}
41 changes: 17 additions & 24 deletions engine/packages/epoxy/src/ops/kv/get_optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
utils,
};

use super::read_value;
use super::get_local::read_local_value;

#[derive(Debug)]
pub struct Input {
Expand Down Expand Up @@ -50,20 +50,14 @@ pub struct Output {
/// best-effort lookup.
#[operation]
pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Result<Output> {
let local_read = read_value::read_local_value(
let local_read = read_local_value(
ctx,
input.replica_id,
input.key.clone(),
&input.key,
input.caching_behavior == protocol::CachingBehavior::Optimistic,
)
.await?;
if local_read.value.is_some() {
return Ok(Output {
value: local_read.value.map(|value| value.value),
});
}

if let Some(value) = local_read.cache_value {
if let Some(value) = local_read.value.or(local_read.cache_value) {
return Ok(Output {
value: Some(value.value),
});
Expand Down Expand Up @@ -114,22 +108,21 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
)
.await?;

for response in responses {
if let Some(value) = response {
let value = CommittedValue {
value: value.value,
version: value.version,
mutable: value.mutable,
};

if input.caching_behavior == protocol::CachingBehavior::Optimistic {
cache_fanout_value(ctx, input.replica_id, input.key.clone(), value.clone()).await?;
}
// Should only have 1 response
if let Some(value) = responses.first().and_then(|r| r.response) {
let value = CommittedValue {
value: value.value,
version: value.version,
mutable: value.mutable,
};

return Ok(Output {
value: Some(value.value),
});
if input.caching_behavior == protocol::CachingBehavior::Optimistic {
cache_fanout_value(ctx, input.replica_id, input.key.clone(), value.clone()).await?;
}

return Ok(Output {
value: Some(value.value),
});
}

// No value found in any datacenter
Expand Down
1 change: 0 additions & 1 deletion engine/packages/epoxy/src/ops/kv/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod get_local;
pub mod get_optimistic;
pub mod purge_local;
pub(crate) mod read_value;
92 changes: 0 additions & 92 deletions engine/packages/epoxy/src/ops/kv/read_value.rs

This file was deleted.

8 changes: 4 additions & 4 deletions engine/packages/epoxy/src/replica/message_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ async fn message_request_inner(
.await?;

protocol::ResponseKind::KvGetResponse(protocol::KvGetResponse {
value: result.value.map(|value| protocol::CommittedValue {
value,
version: result.version.unwrap_or(0),
mutable: result.mutable,
value: result.map(|value| protocol::CommittedValue {
value: value.value,
version: value.version,
mutable: value.mutable,
}),
})
}
Expand Down
Loading