Skip to content

Commit 5bdaebb

Browse files
committed
BREAK THIS UP: WIP patches
1 parent 4170ac2 commit 5bdaebb

66 files changed

Lines changed: 3781 additions & 1497 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ members = [
5757
"engine/sdks/rust/epoxy-protocol",
5858
"engine/sdks/rust/test-envoy",
5959
"engine/sdks/rust/ups-protocol",
60-
"rivetkit-typescript/packages/rivetkit-native"
60+
"rivetkit-typescript/packages/rivetkit-native",
61+
"rivetkit-typescript/packages/sqlite-native"
6162
]
6263

6364
[workspace.package]

engine/packages/api-peer/src/namespaces.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,6 @@ pub async fn create(
9797
) -> Result<CreateResponse> {
9898
let namespace_id = Id::new_v1(ctx.config().dc_label());
9999

100-
ctx.workflow(namespace::workflows::namespace::Input {
101-
namespace_id,
102-
name: body.name.clone(),
103-
display_name: body.display_name.clone(),
104-
})
105-
.tag("namespace_id", namespace_id)
106-
.dispatch()
107-
.await?;
108-
109100
let mut create_sub = ctx
110101
.subscribe::<namespace::workflows::namespace::CreateComplete>((
111102
"namespace_id",
@@ -116,6 +107,15 @@ pub async fn create(
116107
.subscribe::<namespace::workflows::namespace::Failed>(("namespace_id", namespace_id))
117108
.await?;
118109

110+
ctx.workflow(namespace::workflows::namespace::Input {
111+
namespace_id,
112+
name: body.name.clone(),
113+
display_name: body.display_name.clone(),
114+
})
115+
.tag("namespace_id", namespace_id)
116+
.dispatch()
117+
.await?;
118+
119119
tokio::select! {
120120
res = create_sub.next() => { res?; },
121121
res = fail_sub.next() => {

engine/packages/guard/src/routing/pegboard_gateway/resolve_actor_query.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ async fn resolve_query_target_dc_label(
208208

209209
fn serialize_actor_key(key: &[String]) -> Result<String> {
210210
const EMPTY_KEY: &str = "/";
211-
const KEY_SEPARATOR: char = '/';
211+
const KEY_SEPARATOR: &str = "/";
212+
const KEY_SEPARATOR_CHAR: char = '/';
212213

213214
if key.is_empty() {
214215
return Ok(EMPTY_KEY.to_string());
@@ -221,11 +222,13 @@ fn serialize_actor_key(key: &[String]) -> Result<String> {
221222
continue;
222223
}
223224

224-
let escaped = part.replace('\\', "\\\\").replace(KEY_SEPARATOR, "\\/");
225+
let escaped = part
226+
.replace('\\', "\\\\")
227+
.replace(KEY_SEPARATOR_CHAR, "\\/");
225228
escaped_parts.push(escaped);
226229
}
227230

228-
Ok(escaped_parts.join(EMPTY_KEY))
231+
Ok(escaped_parts.join(KEY_SEPARATOR))
229232
}
230233

231234
fn is_duplicate_key_error(err: &anyhow::Error) -> bool {

engine/packages/pegboard-gateway/src/keepalive_task.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,22 @@ pub async fn task(
2020
request_id: protocol::RequestId,
2121
mut keepalive_abort_rx: watch::Receiver<()>,
2222
) -> Result<LifecycleResult> {
23-
let mut ping_interval = tokio::time::interval(Duration::from_millis(
24-
(ctx.config()
25-
.pegboard()
26-
.hibernating_request_eligible_threshold()
27-
/ 2)
28-
.try_into()?,
29-
));
23+
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
24+
actor_id,
25+
gateway_id,
26+
request_id,
27+
})
28+
.await?;
29+
shared_state.keepalive_hws(request_id).await?;
30+
31+
let ping_interval_ms = (ctx
32+
.config()
33+
.pegboard()
34+
.hibernating_request_eligible_threshold()
35+
/ 2)
36+
.max(1);
37+
let mut ping_interval =
38+
tokio::time::interval(Duration::from_millis(ping_interval_ms.try_into()?));
3039
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3140

3241
loop {

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,7 @@ impl PegboardGateway {
826826
})
827827
.await?
828828
{
829-
if actor.runner_id.is_some() {
829+
if !actor.sleeping && actor.runner_id.is_some() {
830830
tracing::debug!("actor became ready during hibernation");
831831

832832
return Ok(HibernationResult::Continue);

engine/packages/pegboard-gateway/src/shared_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ impl SharedState {
151151
gateway_id,
152152
receiver_subject,
153153
in_flight_requests: HashMap::new(),
154-
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold(),
154+
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold().max(1),
155155
gc_interval: Duration::from_millis(pegboard_config.gateway_gc_interval_ms()),
156156
tunnel_ping_timeout: pegboard_config.gateway_tunnel_ping_timeout_ms(),
157157
hws_message_ack_timeout: Duration::from_millis(

engine/packages/pegboard-gateway2/src/keepalive_task.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,22 @@ pub async fn task(
2020
request_id: protocol::RequestId,
2121
mut keepalive_abort_rx: watch::Receiver<()>,
2222
) -> Result<LifecycleResult> {
23-
let mut ping_interval = tokio::time::interval(Duration::from_millis(
24-
(ctx.config()
25-
.pegboard()
26-
.hibernating_request_eligible_threshold()
27-
/ 2)
28-
.try_into()?,
29-
));
23+
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
24+
actor_id,
25+
gateway_id,
26+
request_id,
27+
})
28+
.await?;
29+
shared_state.keepalive_hws(request_id).await?;
30+
31+
let ping_interval_ms = (ctx
32+
.config()
33+
.pegboard()
34+
.hibernating_request_eligible_threshold()
35+
/ 2)
36+
.max(1);
37+
let mut ping_interval =
38+
tokio::time::interval(Duration::from_millis(ping_interval_ms.try_into()?));
3039
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3140

3241
loop {

engine/packages/pegboard-gateway2/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -832,7 +832,7 @@ impl PegboardGateway2 {
832832
})
833833
.await?
834834
{
835-
if actor.envoy_key.is_some() {
835+
if !actor.sleeping && actor.envoy_key.is_some() {
836836
tracing::debug!("actor became ready during hibernation");
837837

838838
return Ok(HibernationResult::Continue);

engine/packages/pegboard-gateway2/src/shared_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl SharedState {
9898
gateway_id,
9999
receiver_subject,
100100
in_flight_requests: HashMap::new(),
101-
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold(),
101+
hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold().max(1),
102102
gc_interval: Duration::from_millis(pegboard_config.gateway_gc_interval_ms()),
103103
tunnel_ping_timeout: pegboard_config.gateway_tunnel_ping_timeout_ms(),
104104
hws_message_ack_timeout: Duration::from_millis(

engine/packages/pegboard/src/ops/actor/hibernating_request/delete.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ pub async fn pegboard_actor_hibernating_request_delete(
1616
ctx: &OperationCtx,
1717
input: &Input,
1818
) -> Result<()> {
19+
tracing::info!(
20+
actor_id=%input.actor_id,
21+
gateway_id=%protocol::util::id_to_string(&input.gateway_id),
22+
request_id=%protocol::util::id_to_string(&input.request_id),
23+
"deleting hibernating request"
24+
);
25+
1926
ctx.udb()?
2027
.run(|tx| async move {
2128
let tx = tx.with_subspace(keys::subspace());

0 commit comments

Comments
 (0)