Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ members = [
"engine/sdks/rust/epoxy-protocol",
"engine/sdks/rust/test-envoy",
"engine/sdks/rust/ups-protocol",
"rivetkit-typescript/packages/rivetkit-native"
"rivetkit-typescript/packages/rivetkit-native",
"rivetkit-typescript/packages/sqlite-native"
]

[workspace.package]
Expand Down
18 changes: 9 additions & 9 deletions engine/packages/api-peer/src/namespaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,6 @@ pub async fn create(
) -> Result<CreateResponse> {
let namespace_id = Id::new_v1(ctx.config().dc_label());

ctx.workflow(namespace::workflows::namespace::Input {
namespace_id,
name: body.name.clone(),
display_name: body.display_name.clone(),
})
.tag("namespace_id", namespace_id)
.dispatch()
.await?;

let mut create_sub = ctx
.subscribe::<namespace::workflows::namespace::CreateComplete>((
"namespace_id",
Expand All @@ -116,6 +107,15 @@ pub async fn create(
.subscribe::<namespace::workflows::namespace::Failed>(("namespace_id", namespace_id))
.await?;

ctx.workflow(namespace::workflows::namespace::Input {
namespace_id,
name: body.name.clone(),
display_name: body.display_name.clone(),
})
.tag("namespace_id", namespace_id)
.dispatch()
.await?;

tokio::select! {
res = create_sub.next() => { res?; },
res = fail_sub.next() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ async fn resolve_query_target_dc_label(

fn serialize_actor_key(key: &[String]) -> Result<String> {
const EMPTY_KEY: &str = "/";
const KEY_SEPARATOR: char = '/';
const KEY_SEPARATOR: &str = "/";
const KEY_SEPARATOR_CHAR: char = '/';

if key.is_empty() {
return Ok(EMPTY_KEY.to_string());
Expand All @@ -221,11 +222,13 @@ fn serialize_actor_key(key: &[String]) -> Result<String> {
continue;
}

let escaped = part.replace('\\', "\\\\").replace(KEY_SEPARATOR, "\\/");
let escaped = part
.replace('\\', "\\\\")
.replace(KEY_SEPARATOR_CHAR, "\\/");
escaped_parts.push(escaped);
}

Ok(escaped_parts.join(EMPTY_KEY))
Ok(escaped_parts.join(KEY_SEPARATOR))
}

fn is_duplicate_key_error(err: &anyhow::Error) -> bool {
Expand Down
23 changes: 16 additions & 7 deletions engine/packages/pegboard-gateway/src/keepalive_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@ pub async fn task(
request_id: protocol::RequestId,
mut keepalive_abort_rx: watch::Receiver<()>,
) -> Result<LifecycleResult> {
let mut ping_interval = tokio::time::interval(Duration::from_millis(
(ctx.config()
.pegboard()
.hibernating_request_eligible_threshold()
/ 2)
.try_into()?,
));
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
actor_id,
gateway_id,
request_id,
})
.await?;
shared_state.keepalive_hws(request_id).await?;

let ping_interval_ms = (ctx
.config()
.pegboard()
.hibernating_request_eligible_threshold()
/ 2)
.max(1);
let mut ping_interval =
tokio::time::interval(Duration::from_millis(ping_interval_ms.try_into()?));
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ impl PegboardGateway {
})
.await?
{
if actor.runner_id.is_some() {
if !actor.sleeping && actor.runner_id.is_some() {
tracing::debug!("actor became ready during hibernation");

return Ok(HibernationResult::Continue);
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@
let pegboard_config = config.pegboard();
Self(Arc::new(SharedStateInner {
ups,
gateway_id,

Check warning on line 151 in engine/packages/pegboard-gateway/src/shared_state.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/pegboard-gateway/src/shared_state.rs
receiver_subject,
in_flight_requests: HashMap::new(),
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold(),
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold().max(1),
gc_interval: Duration::from_millis(pegboard_config.gateway_gc_interval_ms()),
tunnel_ping_timeout: pegboard_config.gateway_tunnel_ping_timeout_ms(),
hws_message_ack_timeout: Duration::from_millis(
Expand Down
23 changes: 16 additions & 7 deletions engine/packages/pegboard-gateway2/src/keepalive_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@ pub async fn task(
request_id: protocol::RequestId,
mut keepalive_abort_rx: watch::Receiver<()>,
) -> Result<LifecycleResult> {
let mut ping_interval = tokio::time::interval(Duration::from_millis(
(ctx.config()
.pegboard()
.hibernating_request_eligible_threshold()
/ 2)
.try_into()?,
));
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
actor_id,
gateway_id,
request_id,
})
.await?;
shared_state.keepalive_hws(request_id).await?;

let ping_interval_ms = (ctx
.config()
.pegboard()
.hibernating_request_eligible_threshold()
/ 2)
.max(1);
let mut ping_interval =
tokio::time::interval(Duration::from_millis(ping_interval_ms.try_into()?));
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ impl PegboardGateway2 {
})
.await?
{
if actor.envoy_key.is_some() {
if !actor.sleeping && actor.envoy_key.is_some() {
tracing::debug!("actor became ready during hibernation");

return Ok(HibernationResult::Continue);
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway2/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@
let pegboard_config = config.pegboard();
Self(Arc::new(SharedStateInner {
ups,
gateway_id,

Check warning on line 98 in engine/packages/pegboard-gateway2/src/shared_state.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/pegboard-gateway2/src/shared_state.rs
receiver_subject,
in_flight_requests: HashMap::new(),
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold(),
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold().max(1),
gc_interval: Duration::from_millis(pegboard_config.gateway_gc_interval_ms()),
tunnel_ping_timeout: pegboard_config.gateway_tunnel_ping_timeout_ms(),
hws_message_ack_timeout: Duration::from_millis(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ pub async fn pegboard_actor_hibernating_request_delete(
ctx: &OperationCtx,
input: &Input,
) -> Result<()> {
tracing::info!(
actor_id=%input.actor_id,
gateway_id=%protocol::util::id_to_string(&input.gateway_id),
request_id=%protocol::util::id_to_string(&input.request_id),
"deleting hibernating request"
);

ctx.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());
Expand Down
13 changes: 11 additions & 2 deletions engine/packages/pegboard/src/ops/actor/hibernating_request/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ pub async fn pegboard_actor_hibernating_request_list(
.pegboard()
.hibernating_request_eligible_threshold();

ctx.udb()?
let res = ctx
.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());

Expand Down Expand Up @@ -61,5 +62,13 @@ pub async fn pegboard_actor_hibernating_request_list(
.await
})
.custom_instrument(tracing::info_span!("hibernating_request_list_tx"))
.await
.await?;

tracing::info!(
actor_id=%input.actor_id,
count=res.len(),
"listed hibernating requests"
);

Ok(res)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ pub async fn pegboard_actor_hibernating_request_upsert(
ctx: &OperationCtx,
input: &Input,
) -> Result<()> {
tracing::info!(
actor_id=%input.actor_id,
gateway_id=%protocol::util::id_to_string(&input.gateway_id),
request_id=%protocol::util::id_to_string(&input.request_id),
"upserting hibernating request"
);

ctx.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());
Expand Down
73 changes: 47 additions & 26 deletions engine/packages/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1319,46 +1319,68 @@ pub async fn insert_and_send_commands(
input: &InsertAndSendCommandsInput,
) -> Result<()> {
let mut state = ctx.state::<State>()?;
let mut commands = input.commands.clone();

for command in &mut commands {
if let protocol::mk2::Command::CommandStartActor(start) = command {
start.hibernating_requests = ctx
.op(crate::ops::actor::hibernating_request::list::Input {
actor_id: input.actor_id,
})
.await?
.into_iter()
.map(|req| protocol::mk2::HibernatingRequest {
gateway_id: req.gateway_id,
request_id: req.request_id,
})
.collect();
}
}

let runner_state = state.runner_state.get_or_insert_default();
let old_last_command_idx = runner_state.last_command_idx;
runner_state.last_command_idx += input.commands.len() as i64;
runner_state.last_command_idx += commands.len() as i64;

// This does not have to be part of its own activity because the txn is idempotent
let last_command_idx = runner_state.last_command_idx;
let commands_for_tx = commands.clone();
ctx.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());
.run(|tx| {
let commands_for_tx = commands_for_tx.clone();

tx.write(
&keys::runner::ActorLastCommandIdxKey::new(
input.runner_id,
input.actor_id,
input.generation,
),
last_command_idx,
)?;
async move {
let tx = tx.with_subspace(keys::subspace());

for (i, command) in input.commands.iter().enumerate() {
tx.write(
&keys::runner::ActorCommandKey::new(
&keys::runner::ActorLastCommandIdxKey::new(
input.runner_id,
input.actor_id,
input.generation,
old_last_command_idx + i as i64 + 1,
),
match command {
protocol::mk2::Command::CommandStartActor(x) => {
protocol::mk2::ActorCommandKeyData::CommandStartActor(x.clone())
}
protocol::mk2::Command::CommandStopActor => {
protocol::mk2::ActorCommandKeyData::CommandStopActor
}
},
last_command_idx,
)?;
}

Ok(())
for (i, command) in commands_for_tx.iter().enumerate() {
tx.write(
&keys::runner::ActorCommandKey::new(
input.runner_id,
input.actor_id,
input.generation,
old_last_command_idx + i as i64 + 1,
),
match command {
protocol::mk2::Command::CommandStartActor(x) => {
protocol::mk2::ActorCommandKeyData::CommandStartActor(x.clone())
}
protocol::mk2::Command::CommandStopActor => {
protocol::mk2::ActorCommandKeyData::CommandStopActor
}
},
)?;
}

Ok(())
}
})
.await?;

Expand All @@ -1367,8 +1389,7 @@ pub async fn insert_and_send_commands(

let message_serialized =
versioned::ToRunnerMk2::wrap_latest(protocol::mk2::ToRunner::ToClientCommands(
input
.commands
commands
.iter()
.enumerate()
.map(|(i, command)| protocol::mk2::CommandWrapper {
Expand Down
Loading
Loading