Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions engine/packages/epoxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tracing.workspace = true
universaldb.workspace = true
url.workspace = true
uuid.workspace = true
vbare.workspace = true

[dev-dependencies]
gas.workspace = true
Expand Down
25 changes: 9 additions & 16 deletions engine/packages/epoxy/src/keys/keys.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
use anyhow::Result;
use epoxy_protocol::protocol;
use epoxy_protocol::{PROTOCOL_VERSION, protocol, versioned};
use serde::{Deserialize, Serialize};
use universaldb::prelude::*;
use universaldb::tuple::Versionstamp;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct CommittedValue {
// NOTE: An empty value may exist for cached entries to denote the value was not found on any datacenter
// and cached as such.
pub value: Vec<u8>,
pub version: u64,
pub mutable: bool,
}
use vbare::OwnedVersionedData;

/// In-flight accepted proposal state stored under `kv/{key}/accepted`.
///
Expand Down Expand Up @@ -44,14 +36,15 @@ impl KvValueKey {
}

impl FormalKey for KvValueKey {
type Value = CommittedValue;
type Value = protocol::CommittedValue;

fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
serde_bare::from_slice(raw).map_err(Into::into)
versioned::CommittedValue::deserialize_with_embedded_version(raw)
}

fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
serde_bare::to_vec(&value).map_err(Into::into)
versioned::CommittedValue::wrap_latest(value)
.serialize_with_embedded_version(PROTOCOL_VERSION)
}
}

Expand Down Expand Up @@ -260,14 +253,14 @@ impl KvOptimisticCacheKey {
}

impl FormalKey for KvOptimisticCacheKey {
type Value = CommittedValue;
type Value = protocol::CachedValue;

fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
serde_bare::from_slice(raw).map_err(Into::into)
versioned::CachedValue::deserialize_with_embedded_version(raw)
}

fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
serde_bare::to_vec(&value).map_err(Into::into)
versioned::CachedValue::wrap_latest(value).serialize_with_embedded_version(PROTOCOL_VERSION)
}
}

Expand Down
4 changes: 2 additions & 2 deletions engine/packages/epoxy/src/keys/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ pub mod keys;
pub mod replica;

pub use self::keys::{
ChangelogKey, CommittedValue, KvAcceptedKey, KvAcceptedValue, KvBallotKey,
KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey,
ChangelogKey, KvAcceptedKey, KvAcceptedValue, KvBallotKey, KvOptimisticCacheKey, KvValueKey,
LegacyCommittedValueKey,
};
pub use self::replica::ConfigKey;

Expand Down
18 changes: 5 additions & 13 deletions engine/packages/epoxy/src/ops/kv/get_local.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use anyhow::Result;
use epoxy_protocol::protocol::ReplicaId;
use epoxy_protocol::protocol::{CachedValue, CommittedValue, ReplicaId};
use gas::prelude::*;
use universaldb::utils::{FormalKey, IsolationLevel::Serializable};

use crate::keys::{
self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey,
};
use crate::keys::{self, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey};

#[derive(Debug)]
pub struct Input {
Expand All @@ -26,7 +24,7 @@ pub async fn epoxy_kv_get_local(
#[derive(Debug)]
pub(crate) struct LocalValueRead {
pub value: Option<CommittedValue>,
pub cache_value: Option<CommittedValue>,
pub cache_value: Option<CachedValue>,
}

/// Reads a committed value from the local replica with dual-read fallback.
Expand Down Expand Up @@ -95,18 +93,12 @@ pub(crate) async fn read_local_value(
if let Some(value) = cache_value {
let cache_value = cache_key.deserialize(&value)?;

// Special case with empty values. These are inserted in kv_get_optimistic with `save_empty`
if cache_value.value.is_empty() {
if let Some(value) = cache_value.value {
return Ok(LocalValueRead {
value: None,
cache_value: None,
cache_value: Some(cache_key.deserialize(&value)?),
});
}

return Ok(LocalValueRead {
value: None,
cache_value: Some(cache_key.deserialize(&value)?),
});
}

Ok(LocalValueRead {
Expand Down
32 changes: 17 additions & 15 deletions engine/packages/epoxy/src/ops/kv/get_optimistic.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use anyhow::*;
use epoxy_protocol::protocol::{self, ReplicaId};
use epoxy_protocol::protocol::{self, CachedValue, CommittedValue, ReplicaId};
use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use universaldb::prelude::*;

use crate::{
http_client,
keys::{self, CommittedValue},
utils,
};
use crate::{http_client, keys, utils};

use super::get_local::read_local_value;

Expand Down Expand Up @@ -68,10 +64,12 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
input.caching_behavior == protocol::CachingBehavior::Optimistic,
)
.await?;
if let Some(value) = local_read.value.or(local_read.cache_value) {
return Ok(Output {
value: Some(value.value),
});
if let Some(value) = local_read
.value
.map(|v| v.value)
.or(local_read.cache_value.and_then(|v| v.value))
{
return Ok(Output { value: Some(value) });
}

// Request fanout to other datacenters, return first datacenter with any non-none value
Expand Down Expand Up @@ -169,7 +167,13 @@ async fn cache_fanout_value(
}
}

tx.write(&cache_key, value_to_cache.clone())?;
tx.write(
&cache_key,
CachedValue {
value: Some(value_to_cache.value.clone()),
version: value_to_cache.version,
},
)?;
Ok(())
}
})
Expand All @@ -189,11 +193,9 @@ async fn cache_empty_value(ctx: &OperationCtx, replica_id: ReplicaId, key: &[u8]

tx.write(
&cache_key,
CommittedValue {
value: Vec::new(),
CachedValue {
value: None,
version: 0,
// TODO: What should this be set to?
mutable: true,
},
)?;
Ok(())
Expand Down
6 changes: 2 additions & 4 deletions engine/packages/epoxy/src/ops/propose.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{Context, Result, bail};
use epoxy_protocol::protocol::{self, ReplicaId};
use epoxy_protocol::protocol::{self, CommittedValue, ReplicaId};
use futures_util::{StreamExt, stream::FuturesUnordered};
use gas::prelude::*;
use rand::Rng;
Expand All @@ -8,9 +8,7 @@ use serde::{Deserialize, Serialize};
use std::time::{Duration, Instant};

use crate::{
http_client,
keys::CommittedValue,
metrics,
http_client, metrics,
replica::{
ballot::{self, Ballot, BallotSelection},
commit_kv::{self, CommitKvOutcome},
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/epoxy/src/replica/ballot.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::{Context, Result};
use epoxy_protocol::protocol;
use epoxy_protocol::protocol::{self, CommittedValue};
use std::cmp::Ordering;
use universaldb::Transaction;
use universaldb::utils::{FormalKey, IsolationLevel::Serializable};

use crate::keys::{self, CommittedValue, KvBallotKey, KvValueKey, LegacyCommittedValueKey};
use crate::keys::{self, KvBallotKey, KvValueKey, LegacyCommittedValueKey};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Ballot {
Expand Down
5 changes: 2 additions & 3 deletions engine/packages/epoxy/src/replica/changelog.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{Context, Result, bail};
use epoxy_protocol::protocol;
use epoxy_protocol::protocol::{self, CommittedValue};
use futures_util::TryStreamExt;
use universaldb::{
KeySelector, RangeOption, Transaction,
Expand All @@ -10,8 +10,7 @@ use universaldb::{
};

use crate::keys::{
self, ChangelogKey, CommittedValue, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey,
KvValueKey,
self, ChangelogKey, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, KvValueKey,
};
use crate::metrics;

Expand Down
4 changes: 2 additions & 2 deletions engine/packages/epoxy/src/replica/commit_kv.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use anyhow::Result;
use epoxy_protocol::protocol;
use epoxy_protocol::protocol::{self, CommittedValue};
use universaldb::{Transaction, utils::IsolationLevel::Serializable};

use crate::{
keys::{self, CommittedValue, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, KvValueKey},
keys::{self, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, KvValueKey},
replica::ballot::Ballot,
};

Expand Down
4 changes: 2 additions & 2 deletions engine/packages/epoxy/src/workflows/backfill.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use epoxy_protocol::protocol;
use epoxy_protocol::protocol::{self, CommittedValue};
use futures_util::{FutureExt, TryStreamExt};
use gas::prelude::*;
use serde::{Deserialize, Serialize};
Expand All @@ -13,7 +13,7 @@ use universaldb::{
},
};

use crate::keys::{self, CommittedValue, KvValueKey, LegacyCommittedValueKey};
use crate::keys::{self, KvValueKey, LegacyCommittedValueKey};

const DEFAULT_CHUNK_SIZE: usize = 500;

Expand Down
5 changes: 3 additions & 2 deletions engine/packages/epoxy/tests/kv.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod common;

use epoxy::ops::propose::{CommandError, ProposalResult};
use epoxy_protocol::protocol::CommittedValue;

use common::{
THREE_REPLICAS, TestCtx,
Expand Down Expand Up @@ -80,7 +81,7 @@ async fn test_kv_operations() {
assert!(matches!(first_result, ProposalResult::Committed));
assert_eq!(
read_v2_committed_value(ctx, replica_id, key).await.unwrap(),
Some(epoxy::keys::CommittedValue {
Some(CommittedValue {
value: b"value1".to_vec(),
version: 1,
mutable: true,
Expand All @@ -92,7 +93,7 @@ async fn test_kv_operations() {
assert!(matches!(second_result, ProposalResult::Committed));
assert_eq!(
read_v2_committed_value(ctx, replica_id, key).await.unwrap(),
Some(epoxy::keys::CommittedValue {
Some(CommittedValue {
value: b"value2".to_vec(),
version: 2,
mutable: true,
Expand Down
10 changes: 5 additions & 5 deletions engine/packages/epoxy/tests/kv_get_optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use common::{
},
};
use epoxy::ops::propose::ProposalResult;
use epoxy_protocol::protocol::{CachingBehavior, ReplicaId};
use epoxy_protocol::protocol::{CachingBehavior, CommittedValue, ReplicaId};

static TEST_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());

Expand Down Expand Up @@ -182,7 +182,7 @@ async fn test_kv_get_optimistic_paths() {
test_ctx.get_ctx(writer_replica_id),
writer_replica_id,
key,
epoxy::keys::CommittedValue {
CommittedValue {
value: b"remote-value".to_vec(),
version: 2,
mutable: true,
Expand All @@ -194,7 +194,7 @@ async fn test_kv_get_optimistic_paths() {
test_ctx.get_ctx(reader_replica_id),
reader_replica_id,
key,
epoxy::keys::CommittedValue {
CommittedValue {
value: b"stale-cache".to_vec(),
version: 1,
mutable: true,
Expand All @@ -221,7 +221,7 @@ async fn test_kv_get_optimistic_paths() {
)
.await
.unwrap(),
Some(epoxy::keys::CommittedValue {
Some(CommittedValue {
value: b"stale-cache".to_vec(),
version: 1,
mutable: true,
Expand All @@ -247,7 +247,7 @@ async fn test_kv_get_optimistic_paths() {
follower_ctx,
follower_replica_id,
key,
epoxy::keys::CommittedValue {
CommittedValue {
value: b"value1".to_vec(),
version: 1,
mutable: true,
Expand Down
14 changes: 1 addition & 13 deletions engine/packages/pegboard-envoy/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub async fn init_conn(
}
.build());
};
let is_serverless = matches!(pool.config.kind, RunnerConfigKind::Serverless { .. });

tracing::debug!(namespace_id=?namespace.namespace_id, "new envoy connection");

Expand All @@ -81,16 +82,6 @@ pub async fn init_conn(
.with_label_values(&[namespace.namespace_id.to_string().as_str(), &pool_name])
.observe(start.elapsed().as_secs_f64());

let serverless_drain_grace_period = if let RunnerConfigKind::Serverless {
drain_grace_period,
..
} = &pool.config.kind
{
Some(*drain_grace_period as i64)
} else {
None
};

let udb = ctx.udb()?;
let (_, mut missed_commands) = tokio::try_join!(
// Send init packet as soon as possible
Expand All @@ -103,7 +94,6 @@ pub async fn init_conn(
metadata: protocol::ProtocolMetadata {
envoy_lost_threshold: pb.envoy_lost_threshold(),
actor_stop_threshold: pb.actor_stop_threshold(),
serverless_drain_grace_period,
max_response_payload_size: pb.envoy_max_response_payload_size() as u64,
},
},
Expand Down Expand Up @@ -317,8 +307,6 @@ pub async fn init_conn(
.await?;
}

let is_serverless = serverless_drain_grace_period.is_some();

if is_serverless {
report_success(ctx, namespace.namespace_id, &pool_name).await;
}
Expand Down
1 change: 1 addition & 0 deletions engine/packages/pegboard/src/ops/envoy/drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct Input {
pub version: u32,
}

// NOTE: Only applies to serverless
#[operation]
pub async fn pegboard_envoy_drain_older_versions(ctx: &OperationCtx, input: &Input) -> Result<()> {
let pool_res = ctx
Expand Down
Loading
Loading