From 2573bfc54f52a987c006dbfa34cd331d292e7989 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sun, 12 Apr 2026 19:10:38 -0700 Subject: [PATCH] fix(pegboard): skip protocol version keys in runner pool backfill --- .../pegboard/src/keys/runner_config.rs | 5 +- .../src/workflows/runner_pool_backfill.rs | 8 +++ .../packages/rivetkit-native/src/database.rs | 54 ++++++------------- 3 files changed, 28 insertions(+), 39 deletions(-) diff --git a/engine/packages/pegboard/src/keys/runner_config.rs b/engine/packages/pegboard/src/keys/runner_config.rs index 7d3b64d944..a71a272d34 100644 --- a/engine/packages/pegboard/src/keys/runner_config.rs +++ b/engine/packages/pegboard/src/keys/runner_config.rs @@ -268,8 +268,11 @@ impl TuplePack for ProtocolVersionKey { impl<'de> TupleUnpack<'de> for ProtocolVersionKey { fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, _, _, namespace_id, name, _)) = + let (input, (_, _, _, namespace_id, name, data)) = <(usize, usize, usize, Id, String, usize)>::unpack(input, tuple_depth)?; + if data != PROTOCOL_VERSION { + return Err(PackError::Message("expected PROTOCOL_VERSION data".into())); + } let v = ProtocolVersionKey { namespace_id, name }; diff --git a/engine/packages/pegboard/src/workflows/runner_pool_backfill.rs b/engine/packages/pegboard/src/workflows/runner_pool_backfill.rs index 006bb24027..80067758fa 100644 --- a/engine/packages/pegboard/src/workflows/runner_pool_backfill.rs +++ b/engine/packages/pegboard/src/workflows/runner_pool_backfill.rs @@ -99,6 +99,14 @@ async fn backfill_chunk(ctx: &ActivityCtx, input: &BackfillChunkInput) -> Result }; new_last_key = [entry.key(), &[0xff]].concat(); + + if tx + .unpack::(entry.key()) + .is_ok() + { + continue; + } + entries.push(tx.read_entry::(&entry)?); } diff --git a/rivetkit-typescript/packages/rivetkit-native/src/database.rs b/rivetkit-typescript/packages/rivetkit-native/src/database.rs index e5fe5167ce..be98463611 100644 --- a/rivetkit-typescript/packages/rivetkit-native/src/database.rs +++ b/rivetkit-typescript/packages/rivetkit-native/src/database.rs @@ -1,15 +1,15 @@ -use std::ffi::{c_char, CStr, CString}; +use std::ffi::{CStr, CString, c_char}; use std::ptr; use std::sync::{Arc, Mutex}; use async_trait::async_trait; use libsqlite3_sys::{ - sqlite3, sqlite3_bind_blob, sqlite3_bind_double, sqlite3_bind_int64, sqlite3_bind_null, - sqlite3_bind_text, sqlite3_changes, sqlite3_column_blob, sqlite3_column_bytes, - sqlite3_column_count, sqlite3_column_double, sqlite3_column_int64, sqlite3_column_name, - sqlite3_column_text, sqlite3_column_type, sqlite3_errmsg, sqlite3_finalize, - sqlite3_prepare_v2, sqlite3_step, SQLITE_BLOB, SQLITE_DONE, SQLITE_FLOAT, SQLITE_INTEGER, - SQLITE_NULL, SQLITE_OK, SQLITE_ROW, SQLITE_TEXT, SQLITE_TRANSIENT, + SQLITE_BLOB, SQLITE_DONE, SQLITE_FLOAT, SQLITE_INTEGER, SQLITE_NULL, SQLITE_OK, SQLITE_ROW, + SQLITE_TEXT, SQLITE_TRANSIENT, sqlite3, sqlite3_bind_blob, sqlite3_bind_double, + sqlite3_bind_int64, sqlite3_bind_null, sqlite3_bind_text, sqlite3_changes, sqlite3_column_blob, + sqlite3_column_bytes, sqlite3_column_count, sqlite3_column_double, sqlite3_column_int64, + sqlite3_column_name, sqlite3_column_text, sqlite3_column_type, sqlite3_errmsg, + sqlite3_finalize, sqlite3_prepare_v2, sqlite3_step, }; use napi::bindgen_prelude::Buffer; use napi_derive::napi; @@ -109,8 +109,7 @@ pub struct JsNativeDatabase { impl JsNativeDatabase { pub fn as_ptr(&self) -> *mut libsqlite3_sys::sqlite3 { - self - .db + self.db .lock() .ok() .and_then(|guard| guard.as_ref().map(NativeDatabase::as_ptr)) @@ -243,13 +242,7 @@ fn bind_params( let text = CString::new(param.text_value.clone().unwrap_or_default()) .map_err(|err| napi::Error::from_reason(err.to_string()))?; unsafe { - sqlite3_bind_text( - stmt, - bind_index, - text.as_ptr(), - -1, - SQLITE_TRANSIENT(), - ) + sqlite3_bind_text(stmt, bind_index, text.as_ptr(), -1, SQLITE_TRANSIENT()) } } "blob" => { @@ -291,26 +284,17 @@ fn collect_columns(stmt: *mut libsqlite3_sys::sqlite3_stmt) -> Vec { if name_ptr.is_null() { String::new() } else { - CStr::from_ptr(name_ptr) - .to_string_lossy() - .into_owned() + CStr::from_ptr(name_ptr).to_string_lossy().into_owned() } }) .collect() } -fn column_value( - stmt: *mut libsqlite3_sys::sqlite3_stmt, - index: i32, -) -> serde_json::Value { +fn column_value(stmt: *mut libsqlite3_sys::sqlite3_stmt, index: i32) -> serde_json::Value { match unsafe { sqlite3_column_type(stmt, index) } { SQLITE_NULL => serde_json::Value::Null, - SQLITE_INTEGER => { - serde_json::Value::from(unsafe { sqlite3_column_int64(stmt, index) }) - } - SQLITE_FLOAT => { - serde_json::Value::from(unsafe { sqlite3_column_double(stmt, index) }) - } + SQLITE_INTEGER => serde_json::Value::from(unsafe { sqlite3_column_int64(stmt, index) }), + SQLITE_FLOAT => serde_json::Value::from(unsafe { sqlite3_column_double(stmt, index) }), SQLITE_TEXT => { let text_ptr = unsafe { sqlite3_column_text(stmt, index) }; if text_ptr.is_null() { @@ -328,9 +312,7 @@ fn column_value( serde_json::Value::Null } else { let blob_len = unsafe { sqlite3_column_bytes(stmt, index) } as usize; - let blob = unsafe { - std::slice::from_raw_parts(blob_ptr as *const u8, blob_len) - }; + let blob = unsafe { std::slice::from_raw_parts(blob_ptr as *const u8, blob_len) }; serde_json::Value::Array( blob.iter() .map(|byte| serde_json::Value::from(*byte)) @@ -349,9 +331,7 @@ fn execute_statement( ) -> napi::Result { let c_sql = CString::new(sql).map_err(|err| napi::Error::from_reason(err.to_string()))?; let mut stmt = ptr::null_mut(); - let rc = unsafe { - sqlite3_prepare_v2(db, c_sql.as_ptr(), -1, &mut stmt, ptr::null_mut()) - }; + let rc = unsafe { sqlite3_prepare_v2(db, c_sql.as_ptr(), -1, &mut stmt, ptr::null_mut()) }; if rc != SQLITE_OK { return Err(sqlite_error(db, "failed to prepare sqlite statement")); } @@ -393,9 +373,7 @@ fn query_statement( ) -> napi::Result { let c_sql = CString::new(sql).map_err(|err| napi::Error::from_reason(err.to_string()))?; let mut stmt = ptr::null_mut(); - let rc = unsafe { - sqlite3_prepare_v2(db, c_sql.as_ptr(), -1, &mut stmt, ptr::null_mut()) - }; + let rc = unsafe { sqlite3_prepare_v2(db, c_sql.as_ptr(), -1, &mut stmt, ptr::null_mut()) }; if rc != SQLITE_OK { return Err(sqlite_error(db, "failed to prepare sqlite query")); }