Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions engine/packages/api-peer/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ pub async fn get_epoxy_kv_optimistic(
replica_id,
key: key_bytes,
caching_behavior: epoxy::protocol::CachingBehavior::Optimistic,
target_replicas: None,
save_empty: false,
})
.await?;
Expand Down
14 changes: 13 additions & 1 deletion engine/packages/epoxy/src/ops/kv/get_optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ pub struct Input {
pub replica_id: ReplicaId,
pub key: Vec<u8>,
pub caching_behavior: protocol::CachingBehavior,
/// Optional active-replica scope for this proposal.
///
/// Epoxy only validates that the supplied replicas are active and include the local
/// replica. Callers are responsible for ensuring a given key stays on a stable scope over
/// time, or that any scope change is handled as an explicit reconfiguration at a higher
/// layer.
pub target_replicas: Option<Vec<ReplicaId>>,
// Whether or not to write an empty value into cache if it did not exist on read.
pub save_empty: bool,
}
Expand Down Expand Up @@ -73,7 +80,12 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
.await?
.config;

let quorum_members: Vec<ReplicaId> = utils::get_quorum_members(&config);
let replica_id = ctx.config().epoxy_replica_id();
let quorum_members = utils::resolve_active_quorum_members(
&config,
replica_id,
input.target_replicas.as_deref(),
)?;

if quorum_members.len() == 1 {
return Ok(Output { value: None });
Expand Down
119 changes: 5 additions & 114 deletions engine/packages/epoxy/src/ops/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use gas::prelude::*;
use rand::Rng;
use rivet_api_builder::ApiCtx;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeSet, HashSet};
use std::time::{Duration, Instant};

use crate::{
Expand Down Expand Up @@ -218,8 +217,11 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
.await
.context("failed reading config")?;

let quorum_members =
resolve_quorum_members(&config, replica_id, input.target_replicas.as_deref())?;
let quorum_members = utils::resolve_active_quorum_members(
&config,
replica_id,
input.target_replicas.as_deref(),
)?;
tracing::debug!(
?quorum_members,
quorum_size = quorum_members.len(),
Expand Down Expand Up @@ -1005,123 +1007,12 @@ fn prepare_retry_base_delay_ms(retry_count: usize) -> u64 {
.min(PREPARE_RETRY_MAX_DELAY_MS)
}

/// Returns the quorum members to use for this proposal. This supports scoped proposals because
/// runner configs are often only enabled in a couple of explicitly coupled regions. If
/// `target_replicas` is provided, it validates that the scope is non-empty, includes the local
/// replica, and only contains active replicas. Otherwise it falls back to the full active quorum
/// from the cluster config.
///
/// Scoped proposals rely on a caller-side invariant: the same key must continue using the same
/// replica scope unless some higher-level reconfiguration step coordinates the membership change.
/// This function does not persist or enforce that per-key scope stability.
fn resolve_quorum_members(
config: &protocol::ClusterConfig,
replica_id: ReplicaId,
target_replicas: Option<&[ReplicaId]>,
) -> Result<Vec<ReplicaId>> {
match target_replicas {
Some(target_replicas) => {
let active = utils::get_quorum_members(config)
.into_iter()
.collect::<HashSet<_>>();
let validated = target_replicas.iter().copied().collect::<BTreeSet<_>>();

if validated.is_empty() {
bail!("target_replicas cannot be empty");
}

if !validated.contains(&replica_id) {
bail!("target_replicas must include the local replica");
}

if !validated.iter().all(|replica| active.contains(replica)) {
bail!("target_replicas contains an inactive or unknown replica");
}

Ok(validated.into_iter().collect())
}
None => {
let replicas = utils::get_quorum_members(config);
if !replicas.contains(&replica_id) {
bail!("local replica is not active in the current epoxy config");
}
Ok(replicas)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use epoxy_protocol::protocol::{ClusterConfig, ReplicaConfig, ReplicaStatus};
use rand::{SeedableRng, rngs::StdRng};

fn make_config(replicas: &[(ReplicaId, ReplicaStatus)]) -> ClusterConfig {
ClusterConfig {
coordinator_replica_id: replicas[0].0,
epoch: 1,
replicas: replicas
.iter()
.map(|(replica_id, status)| ReplicaConfig {
replica_id: *replica_id,
status: status.clone(),
api_peer_url: String::new(),
guard_url: String::new(),
})
.collect(),
}
}

#[test]
fn resolve_quorum_members_none_uses_all_active() {
let config = make_config(&[
(1, ReplicaStatus::Active),
(2, ReplicaStatus::Active),
(3, ReplicaStatus::Joining),
]);
let result = resolve_quorum_members(&config, 1, None).unwrap();
assert_eq!(result, vec![1, 2]);
}

#[test]
fn resolve_quorum_members_requires_local_replica_to_be_active() {
let config = make_config(&[(1, ReplicaStatus::Learning), (2, ReplicaStatus::Active)]);
let result = resolve_quorum_members(&config, 1, None);
assert!(result.is_err());
}

#[test]
fn resolve_quorum_members_scoped_subset() {
let config = make_config(&[
(1, ReplicaStatus::Active),
(2, ReplicaStatus::Active),
(3, ReplicaStatus::Active),
]);
let result = resolve_quorum_members(&config, 1, Some(&[1, 2])).unwrap();
assert_eq!(result, vec![1, 2]);
}

#[test]
fn resolve_quorum_members_empty_target_errors() {
let config = make_config(&[(1, ReplicaStatus::Active)]);
let result = resolve_quorum_members(&config, 1, Some(&[]));
assert!(result.is_err());
}

#[test]
fn resolve_quorum_members_missing_local_errors() {
let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Active)]);
let result = resolve_quorum_members(&config, 1, Some(&[2]));
assert!(result.is_err());
}

#[test]
fn resolve_quorum_members_inactive_replica_errors() {
let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Learning)]);
let result = resolve_quorum_members(&config, 1, Some(&[1, 2]));
assert!(result.is_err());
}

#[test]
fn parses_set_command_as_set_proposal() {
let proposal = Proposal {
Expand Down
112 changes: 112 additions & 0 deletions engine/packages/epoxy/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{Result, bail};
use epoxy_protocol::protocol::{self, ReplicaId};
use std::collections::{BTreeSet, HashSet};
use std::fmt;
use universaldb::{Transaction, utils::IsolationLevel::*};

Expand Down Expand Up @@ -32,6 +33,51 @@ pub fn get_quorum_members(config: &protocol::ClusterConfig) -> Vec<ReplicaId> {
.collect()
}

/// Returns the quorum members to use for a given kv operation. This supports scoped operations because, for
/// example, runner configs are often only enabled in a couple of explicitly coupled regions. If
/// `target_replicas` is provided, it validates that the scope is non-empty, includes the local
/// replica, and only contains active replicas. Otherwise it falls back to the full active quorum
/// from the cluster config.
///
/// Scoped proposals rely on a caller-side invariant: the same key must continue using the same
/// replica scope unless some higher-level reconfiguration step coordinates the membership change.
/// This function does not persist or enforce that per-key scope stability.
pub fn resolve_active_quorum_members(
config: &protocol::ClusterConfig,
replica_id: ReplicaId,
target_replicas: Option<&[ReplicaId]>,
) -> Result<Vec<ReplicaId>> {
match target_replicas {
Some(target_replicas) => {
let active = get_quorum_members(config)
.into_iter()
.collect::<HashSet<_>>();
let validated = target_replicas.iter().copied().collect::<BTreeSet<_>>();

if validated.is_empty() {
bail!("target_replicas cannot be empty");
}

if !validated.contains(&replica_id) {
bail!("target_replicas must include the local replica");
}

if !validated.iter().all(|replica| active.contains(replica)) {
bail!("target_replicas contains an inactive or unknown replica");
}

Ok(validated.into_iter().collect())
}
None => {
let replicas = get_quorum_members(config);
if !replicas.contains(&replica_id) {
bail!("local replica is not active in the current epoxy config");
}
Ok(replicas)
}
}
}

/// Use this replica list for any action that should still be sent to joining replicas.
pub fn get_all_replicas(config: &protocol::ClusterConfig) -> Vec<ReplicaId> {
config.replicas.iter().map(|r| r.replica_id).collect()
Expand Down Expand Up @@ -195,4 +241,70 @@ mod tests {
}
}
}

fn make_config(replicas: &[(ReplicaId, ReplicaStatus)]) -> ClusterConfig {
ClusterConfig {
coordinator_replica_id: replicas[0].0,
epoch: 1,
replicas: replicas
.iter()
.map(|(replica_id, status)| ReplicaConfig {
replica_id: *replica_id,
status: status.clone(),
api_peer_url: String::new(),
guard_url: String::new(),
})
.collect(),
}
}

#[test]
fn resolve_active_quorum_members_none_uses_all_active() {
let config = make_config(&[
(1, ReplicaStatus::Active),
(2, ReplicaStatus::Active),
(3, ReplicaStatus::Joining),
]);
let result = resolve_active_quorum_members(&config, 1, None).unwrap();
assert_eq!(result, vec![1, 2]);
}

#[test]
fn resolve_active_quorum_members_requires_local_replica_to_be_active() {
let config = make_config(&[(1, ReplicaStatus::Learning), (2, ReplicaStatus::Active)]);
let result = resolve_active_quorum_members(&config, 1, None);
assert!(result.is_err());
}

#[test]
fn resolve_active_quorum_members_scoped_subset() {
let config = make_config(&[
(1, ReplicaStatus::Active),
(2, ReplicaStatus::Active),
(3, ReplicaStatus::Active),
]);
let result = resolve_active_quorum_members(&config, 1, Some(&[1, 2])).unwrap();
assert_eq!(result, vec![1, 2]);
}

#[test]
fn resolve_active_quorum_members_empty_target_errors() {
let config = make_config(&[(1, ReplicaStatus::Active)]);
let result = resolve_active_quorum_members(&config, 1, Some(&[]));
assert!(result.is_err());
}

#[test]
fn resolve_active_quorum_members_missing_local_errors() {
let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Active)]);
let result = resolve_active_quorum_members(&config, 1, Some(&[2]));
assert!(result.is_err());
}

#[test]
fn resolve_active_quorum_members_inactive_replica_errors() {
let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Learning)]);
let result = resolve_active_quorum_members(&config, 1, Some(&[1, 2]));
assert!(result.is_err());
}
}
1 change: 1 addition & 0 deletions engine/packages/epoxy/tests/kv_get_optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async fn get_with_behavior(
replica_id,
key: key.to_vec(),
caching_behavior,
target_replicas: None,
save_empty: false,
})
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub async fn pegboard_actor_get_reservation_for_key(
replica_id: ctx.config().epoxy_replica_id(),
key: keys::subspace().pack(&reservation_key),
caching_behavior: epoxy::protocol::CachingBehavior::Optimistic,
target_replicas: None,
save_empty: false,
})
.await?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ async fn list_runner_config_enabled_dcs_inner(
replica_id: ctx.config().epoxy_replica_id(),
key: namespace::keys::subspace().pack(&runner_config_key),
caching_behavior: CachingBehavior::Optimistic,
target_replicas: None,
save_empty: true,
})
.await?;
Expand Down
1 change: 1 addition & 0 deletions engine/packages/pegboard/src/workflows/actor/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub async fn lookup_key_optimistic(
replica_id: ctx.config().epoxy_replica_id(),
key: keys::subspace().pack(&reservation_key),
caching_behavior: epoxy::protocol::CachingBehavior::Optimistic,
target_replicas: None,
save_empty: false,
})
.await?
Expand Down
Loading
Loading