From e17239dcf5fac6db54c2d0118802bdd030f4c652 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sun, 12 Apr 2026 22:54:08 -0700 Subject: [PATCH] feat: import/export actors --- .gitattributes | 1 + Cargo.lock | 126 +++ Cargo.toml | 7 +- engine/artifacts/openapi.json | 208 +++++ engine/packages/api-peer/src/actors/create.rs | 2 + .../api-peer/src/actors/get_or_create.rs | 2 + .../api-peer/src/actors/import_create.rs | 61 ++ engine/packages/api-peer/src/actors/mod.rs | 1 + engine/packages/api-peer/src/router.rs | 1 + engine/packages/api-public/Cargo.toml | 2 + .../api-public/src/actors/import_export.rs | 788 ++++++++++++++++++ engine/packages/api-public/src/actors/list.rs | 2 +- .../api-public/src/actors/list_names.rs | 2 +- engine/packages/api-public/src/actors/mod.rs | 1 + engine/packages/api-public/src/router.rs | 10 + .../api-types/src/actors/import_export.rs | 55 ++ .../packages/api-types/src/actors/kv_get.rs | 2 +- engine/packages/api-types/src/actors/mod.rs | 1 + .../engine/tests/actor_import_export_e2e.rs | 338 ++++++++ .../packages/engine/tests/common/api/peer.rs | 2 +- .../engine/tests/common/api/public.rs | 65 +- .../pegboard_gateway/resolve_actor_query.rs | 2 + .../packages/pegboard/src/ops/actor/create.rs | 6 + .../pegboard/src/workflows/actor/mod.rs | 37 +- .../pegboard/src/workflows/actor2/mod.rs | 40 +- .../scenarios/pb_actor_v1_pre_migration.rs | 2 + pnpm-lock.yaml | 6 - .../packages/rivetkit-native/index.d.ts | 1 + .../packages/rivetkit-native/wrapper.js | 2 +- .../driver-test-suite/registry-static.ts | 3 + .../d1h30qzt2m6xdtsefpuvllbjubbl00/kv.bin | 3 + .../metadata.json | 8 + .../sqlite-counter-v2_1_x/manifest.json | 15 + .../sqlite-import-snapshot.ts | 46 + .../rivetkit/src/driver-test-suite/mod.ts | 5 + .../tests/actor-import-snapshot-db.ts | 50 ++ .../rivetkit/src/driver-test-suite/utils.ts | 43 +- .../rivetkit/tests/driver-engine.test.ts | 2 +- 38 files changed, 1918 insertions(+), 30 deletions(-) create mode 100644 engine/packages/api-peer/src/actors/import_create.rs create mode 100644 engine/packages/api-public/src/actors/import_export.rs create mode 100644 engine/packages/api-types/src/actors/import_export.rs create mode 100644 engine/packages/engine/tests/actor_import_export_e2e.rs create mode 100644 rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/actors/d1h30qzt2m6xdtsefpuvllbjubbl00/kv.bin create mode 100644 rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/actors/d1h30qzt2m6xdtsefpuvllbjubbl00/metadata.json create mode 100644 rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/manifest.json create mode 100644 rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sqlite-import-snapshot.ts create mode 100644 rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-import-snapshot-db.ts diff --git a/.gitattributes b/.gitattributes index 975db5a272..0771fcaf27 100644 --- a/.gitattributes +++ b/.gitattributes @@ -7,6 +7,7 @@ *.tar.gz filter=lfs diff=lfs merge=lfs -text *.tgz filter=lfs diff=lfs merge=lfs -text engine/packages/test-snapshot-gen/snapshots/** filter=lfs diff=lfs merge=lfs -text +rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/**/kv.bin filter=lfs diff=lfs merge=lfs -text **/Cargo.lock linguist-generated=true diff --git a/Cargo.lock b/Cargo.lock index e4a1905a4b..c984ad0421 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -938,6 +938,15 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "cookie" version = "0.18.1" @@ -1041,6 +1050,16 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctor" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a2785755761f3ddc1492979ce1e48d2c00d09311c39e4466429188f3dd6501" +dependencies = [ + "quote", + "syn 2.0.104", +] + [[package]] name = "curve25519-dalek" version = "4.1.3" @@ -2605,6 +2624,17 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.22" @@ -2861,6 +2891,66 @@ dependencies = [ "vbare", ] +[[package]] +name = "napi" +version = "2.16.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55740c4ae1d8696773c78fdafd5d0e5fe9bc9f1b071c7ba493ba5c413a9184f3" +dependencies = [ + "bitflags", + "ctor", + "napi-derive", + "napi-sys", + "once_cell", + "serde", + "serde_json", + "tokio", +] + +[[package]] +name = "napi-build" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d376940fd5b723c6893cd1ee3f33abbfd86acb1cd1ec079f3ab04a2a3bc4d3b1" + +[[package]] +name = "napi-derive" +version = "2.16.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cbe2585d8ac223f7d34f13701434b9d5f4eb9c332cccce8dee57ea18ab8ab0c" +dependencies = [ + "cfg-if", + "convert_case", + "napi-derive-backend", + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "napi-derive-backend" +version = "1.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1639aaa9eeb76e91c6ae66da8ce3e89e921cd3885e99ec85f4abacae72fc91bf" +dependencies = [ + "convert_case", + "once_cell", + "proc-macro2", + "quote", + "regex", + "semver", + "syn 2.0.104", +] + +[[package]] +name = "napi-sys" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427802e8ec3a734331fec1035594a210ce1ff4dc5bc1950530920ab717964ea3" +dependencies = [ + "libloading", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -4353,11 +4443,13 @@ dependencies = [ "rivet-api-util", "rivet-config", "rivet-data", + "rivet-envoy-protocol", "rivet-error", "rivet-pools", "rivet-types", "rivet-util", "serde", + "serde_bare", "serde_json", "subtle", "tokio", @@ -5096,6 +5188,40 @@ dependencies = [ "tracing", ] +[[package]] +name = "rivetkit-native" +version = "2.2.1" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.22.1", + "hex", + "libsqlite3-sys", + "napi", + "napi-build", + "napi-derive", + "rivet-envoy-client", + "rivet-envoy-protocol", + "rivetkit-sqlite-native", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", + "uuid", +] + +[[package]] +name = "rivetkit-sqlite-native" +version = "2.1.6" +dependencies = [ + "async-trait", + "getrandom 0.2.16", + "libsqlite3-sys", + "tokio", + "tracing", +] + [[package]] name = "rocksdb" version = "0.24.0" diff --git a/Cargo.toml b/Cargo.toml index feed1aeaf1..e3aa932971 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,9 @@ members = [ "engine/sdks/rust/envoy-protocol", "engine/sdks/rust/epoxy-protocol", "engine/sdks/rust/test-envoy", - "engine/sdks/rust/ups-protocol" + "engine/sdks/rust/ups-protocol", + "rivetkit-typescript/packages/rivetkit-native", + "rivetkit-typescript/packages/sqlite-native" ] [workspace.package] @@ -514,6 +516,9 @@ members = [ [workspace.dependencies.rivet-envoy-protocol] path = "engine/sdks/rust/envoy-protocol" + [workspace.dependencies.rivetkit-sqlite-native] + path = "rivetkit-typescript/packages/sqlite-native" + [workspace.dependencies.epoxy-protocol] path = "engine/sdks/rust/epoxy-protocol" diff --git a/engine/artifacts/openapi.json b/engine/artifacts/openapi.json index 1ddab693cc..26b6c69fc3 100644 --- a/engine/artifacts/openapi.json +++ b/engine/artifacts/openapi.json @@ -443,6 +443,78 @@ ] } }, + "/admin/actors/export": { + "post": { + "tags": [ + "actors::import_export" + ], + "summary": "Dangerous and intended for operational use.", + "operationId": "admin_actors_export", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ExportRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ExportResponse" + } + } + } + } + }, + "security": [ + { + "bearer_auth": [] + } + ] + } + }, + "/admin/actors/import": { + "post": { + "tags": [ + "actors::import_export" + ], + "summary": "Dangerous and intended for operational use.", + "operationId": "admin_actors_import", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ImportRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ImportResponse" + } + } + } + } + }, + "security": [ + { + "bearer_auth": [] + } + ] + } + }, "/datacenters": { "get": { "tags": [ @@ -1588,6 +1660,101 @@ }, "additionalProperties": false }, + "ExportActorIdsSelector": { + "type": "object", + "required": [ + "ids" + ], + "properties": { + "ids": { + "type": "array", + "items": { + "$ref": "#/components/schemas/RivetId" + } + } + }, + "additionalProperties": false + }, + "ExportActorNamesSelector": { + "type": "object", + "required": [ + "names" + ], + "properties": { + "names": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "additionalProperties": false + }, + "ExportRequest": { + "type": "object", + "required": [ + "namespace", + "selector" + ], + "properties": { + "namespace": { + "type": "string" + }, + "selector": { + "$ref": "#/components/schemas/ExportSelector" + } + }, + "additionalProperties": false + }, + "ExportResponse": { + "type": "object", + "required": [ + "archive_path", + "actor_count" + ], + "properties": { + "actor_count": { + "type": "integer", + "minimum": 0 + }, + "archive_path": { + "type": "string" + } + }, + "additionalProperties": false + }, + "ExportSelector": { + "type": "object", + "properties": { + "actor_ids": { + "oneOf": [ + { + "type": "null" + }, + { + "$ref": "#/components/schemas/ExportActorIdsSelector" + } + ] + }, + "actor_names": { + "oneOf": [ + { + "type": "null" + }, + { + "$ref": "#/components/schemas/ExportActorNamesSelector" + } + ] + }, + "all": { + "type": [ + "boolean", + "null" + ] + } + }, + "additionalProperties": false + }, "HealthFanoutResponse": { "type": "object", "required": [ @@ -1628,6 +1795,47 @@ "error" ] }, + "ImportRequest": { + "type": "object", + "required": [ + "target_namespace", + "archive_path" + ], + "properties": { + "archive_path": { + "type": "string" + }, + "target_namespace": { + "type": "string" + } + }, + "additionalProperties": false + }, + "ImportResponse": { + "type": "object", + "required": [ + "imported_actors", + "skipped_actors", + "warnings" + ], + "properties": { + "imported_actors": { + "type": "integer", + "minimum": 0 + }, + "skipped_actors": { + "type": "integer", + "minimum": 0 + }, + "warnings": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "additionalProperties": false + }, "MetadataGetResponse": { "type": "object", "required": [ diff --git a/engine/packages/api-peer/src/actors/create.rs b/engine/packages/api-peer/src/actors/create.rs index 5f632ba7d3..d2006451ac 100644 --- a/engine/packages/api-peer/src/actors/create.rs +++ b/engine/packages/api-peer/src/actors/create.rs @@ -28,6 +28,8 @@ pub async fn create( runner_name_selector: body.runner_name_selector, input: body.input.clone(), crash_policy: body.crash_policy, + start_immediately: true, + create_ts: None, // NOTE: This can forward if the user attempts to create an actor with a target dc and this dc // ends up forwarding to another. forward_request: true, diff --git a/engine/packages/api-peer/src/actors/get_or_create.rs b/engine/packages/api-peer/src/actors/get_or_create.rs index 40e261520e..9840dc34d1 100644 --- a/engine/packages/api-peer/src/actors/get_or_create.rs +++ b/engine/packages/api-peer/src/actors/get_or_create.rs @@ -48,6 +48,8 @@ pub async fn get_or_create( runner_name_selector: body.runner_name_selector, input: body.input.clone(), crash_policy: body.crash_policy, + start_immediately: true, + create_ts: None, // NOTE: This can forward if the user attempts to create an actor with a target dc and this dc // ends up forwarding to another. forward_request: true, diff --git a/engine/packages/api-peer/src/actors/import_create.rs b/engine/packages/api-peer/src/actors/import_create.rs new file mode 100644 index 0000000000..d88a4fd4cb --- /dev/null +++ b/engine/packages/api-peer/src/actors/import_create.rs @@ -0,0 +1,61 @@ +use anyhow::Result; +use rivet_api_builder::ApiCtx; +use rivet_types::actors::CrashPolicy; +use rivet_util::Id; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ImportCreateQuery { + pub namespace: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ImportCreateRequest { + pub actor_id: Id, + pub name: String, + pub key: Option, + pub runner_name_selector: String, + pub crash_policy: CrashPolicy, + pub create_ts: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ImportCreateResponse { + pub actor: rivet_types::actors::Actor, +} + +#[tracing::instrument(skip_all)] +pub async fn create( + ctx: ApiCtx, + _path: (), + query: ImportCreateQuery, + body: ImportCreateRequest, +) -> Result { + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace, + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + let res = ctx + .op(pegboard::ops::actor::create::Input { + actor_id: body.actor_id, + namespace_id: namespace.namespace_id, + name: body.name, + key: body.key, + runner_name_selector: body.runner_name_selector, + crash_policy: body.crash_policy, + input: None, + forward_request: false, + datacenter_name: None, + start_immediately: false, + create_ts: Some(body.create_ts), + }) + .await?; + + Ok(ImportCreateResponse { actor: res.actor }) +} diff --git a/engine/packages/api-peer/src/actors/mod.rs b/engine/packages/api-peer/src/actors/mod.rs index 8b76651d01..77f6e341c5 100644 --- a/engine/packages/api-peer/src/actors/mod.rs +++ b/engine/packages/api-peer/src/actors/mod.rs @@ -1,6 +1,7 @@ pub mod create; pub mod delete; pub mod get_or_create; +pub mod import_create; pub mod kv_get; pub mod list; pub mod list_names; diff --git a/engine/packages/api-peer/src/router.rs b/engine/packages/api-peer/src/router.rs index c4facfee74..20d9234d02 100644 --- a/engine/packages/api-peer/src/router.rs +++ b/engine/packages/api-peer/src/router.rs @@ -24,6 +24,7 @@ pub async fn router( .route("/actors", get(actors::list::list)) .route("/actors", post(actors::create::create)) .route("/actors", put(actors::get_or_create::get_or_create)) + .route("/actors/import-create", post(actors::import_create::create)) .route("/actors/{actor_id}", delete(actors::delete::delete)) .route("/actors/names", get(actors::list_names::list_names)) .route( diff --git a/engine/packages/api-public/Cargo.toml b/engine/packages/api-public/Cargo.toml index 22f8f20b91..c758c12bd4 100644 --- a/engine/packages/api-public/Cargo.toml +++ b/engine/packages/api-public/Cargo.toml @@ -22,12 +22,14 @@ rivet-api-types.workspace = true rivet-api-util.workspace = true rivet-config.workspace = true rivet-data.workspace = true +rivet-envoy-protocol.workspace = true rivet-error.workspace = true rivet-pools.workspace = true rivet-types.workspace = true rivet-util.workspace = true serde_json.workspace = true serde.workspace = true +serde_bare.workspace = true subtle.workspace = true tokio.workspace = true tower-http.workspace = true diff --git a/engine/packages/api-public/src/actors/import_export.rs b/engine/packages/api-public/src/actors/import_export.rs new file mode 100644 index 0000000000..b9567a890d --- /dev/null +++ b/engine/packages/api-public/src/actors/import_export.rs @@ -0,0 +1,788 @@ +use std::{ + collections::HashSet, + path::{Path, PathBuf}, +}; + +use anyhow::{Context, Result}; +use axum::response::{IntoResponse, Response}; +use rivet_api_builder::{ + ApiError, + extract::{Extension, Json}, +}; +use rivet_api_types::actors::{ + delete, + import_export::{ + ExportActorIdsSelector, ExportActorNamesSelector, ExportRequest, ExportResponse, + ExportSelector, ImportRequest, ImportResponse, + }, + list as list_types, + list_names as list_names_types, +}; +use rivet_api_util::{Method, request_remote_datacenter}; +use rivet_envoy_protocol as ep; +use rivet_types::actors::{Actor, CrashPolicy}; +use rivet_util::Id; +use serde::{Deserialize, Serialize}; +use tokio::{ + fs, + io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, +}; + +use crate::{ + actors::{list as list_routes, list_names as list_names_routes, utils}, + ctx::ApiCtx, + errors, +}; + +const ARCHIVE_VERSION: u32 = 1; +const ACTOR_LIST_PAGE_SIZE: usize = 100; +const KV_BATCH_SIZE: usize = 64; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +struct ArchiveManifestV1 { + version: u32, + generated_at: i64, + source_cluster: Option, + source_namespace_id: Id, + source_namespace_name: Option, + selector: ExportSelector, + actor_count: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +struct ActorMetadataV1 { + source_actor_id: Id, + name: String, + key: Option, + runner_name_selector: String, + crash_policy: CrashPolicy, + create_ts: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +struct KvArchiveEntry { + key: Vec, + value: Vec, +} + +#[derive(Debug)] +enum SelectorVariant { + All, + ActorNames(Vec), + ActorIds(Vec), +} + +enum ImportActorOutcome { + Imported, + Skipped(String), +} + +/// Dangerous and intended for operational use. +#[utoipa::path( + post, + operation_id = "admin_actors_export", + path = "/admin/actors/export", + request_body(content = ExportRequest, content_type = "application/json"), + responses( + (status = 200, body = ExportResponse), + ), + security(("bearer_auth" = [])), +)] +pub async fn export(Extension(ctx): Extension, Json(body): Json) -> Response { + match export_inner(ctx, body).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +/// Dangerous and intended for operational use. +#[utoipa::path( + post, + operation_id = "admin_actors_import", + path = "/admin/actors/import", + request_body(content = ImportRequest, content_type = "application/json"), + responses( + (status = 200, body = ImportResponse), + ), + security(("bearer_auth" = [])), +)] +pub async fn import(Extension(ctx): Extension, Json(body): Json) -> Response { + match import_inner(ctx, body).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +#[tracing::instrument(skip_all)] +async fn export_inner(ctx: ApiCtx, body: ExportRequest) -> Result { + ctx.auth().await?; + + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: body.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + let actors = resolve_selected_actors(&ctx, &body.namespace, &body.selector).await?; + let export_id = format!("rivet-actor-export-{}", Id::new_v1(ctx.config().dc_label())); + let temp_path = std::env::temp_dir().join(format!("{export_id}.tmp")); + let final_path = std::env::temp_dir().join(&export_id); + + fs::create_dir_all(temp_path.join("actors")).await?; + + let export_res = async { + write_json( + &temp_path.join("manifest.json"), + &ArchiveManifestV1 { + version: ARCHIVE_VERSION, + generated_at: rivet_util::timestamp::now(), + source_cluster: None, + source_namespace_id: namespace.namespace_id, + source_namespace_name: Some(namespace.name.clone()), + selector: body.selector.clone(), + actor_count: 0, + }, + ) + .await?; + + for actor in &actors { + let actor_dir = temp_path.join("actors").join(actor.actor_id.to_string()); + fs::create_dir_all(&actor_dir).await?; + + write_json( + &actor_dir.join("metadata.json"), + &ActorMetadataV1 { + source_actor_id: actor.actor_id, + name: actor.name.clone(), + key: actor.key.clone(), + runner_name_selector: actor.runner_name_selector.clone(), + crash_policy: actor.crash_policy, + create_ts: actor.create_ts, + }, + ) + .await?; + + export_actor_kv(&ctx, actor, &actor_dir.join("kv.bin")).await?; + } + + write_json( + &temp_path.join("manifest.json"), + &ArchiveManifestV1 { + version: ARCHIVE_VERSION, + generated_at: rivet_util::timestamp::now(), + source_cluster: None, + source_namespace_id: namespace.namespace_id, + source_namespace_name: Some(namespace.name), + selector: body.selector, + actor_count: actors.len(), + }, + ) + .await?; + + Ok::<(), anyhow::Error>(()) + } + .await; + + if let Err(err) = export_res { + let _ = fs::remove_dir_all(&temp_path).await; + return Err(err); + } + + fs::rename(&temp_path, &final_path).await.with_context(|| { + format!( + "failed to finalize actor export archive at {}", + final_path.display() + ) + })?; + + Ok(ExportResponse { + archive_path: final_path.to_string_lossy().into_owned(), + actor_count: actors.len(), + }) +} + +#[tracing::instrument(skip_all)] +async fn import_inner(ctx: ApiCtx, body: ImportRequest) -> Result { + ctx.auth().await?; + + let target_namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: body.target_namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + let archive_path = PathBuf::from(&body.archive_path); + let manifest: ArchiveManifestV1 = read_json(&archive_path.join("manifest.json")).await?; + if manifest.version != ARCHIVE_VERSION { + return Err(errors::Validation::InvalidInput { + message: format!( + "unsupported actor archive version {}, expected {}", + manifest.version, ARCHIVE_VERSION + ), + } + .build()); + } + + let actors_dir = archive_path.join("actors"); + if !fs::try_exists(&actors_dir).await? { + return Err(errors::Validation::InvalidInput { + message: format!("archive is missing actors directory at {}", actors_dir.display()), + } + .build()); + } + + let mut imported_actors = 0; + let mut skipped_actors = 0; + let mut warnings = Vec::new(); + let mut dir_entries = fs::read_dir(&actors_dir).await?; + + while let Some(entry) = dir_entries.next_entry().await? { + if !entry.file_type().await?.is_dir() { + continue; + } + + match import_actor_dir( + &ctx, + &body.target_namespace, + target_namespace.namespace_id, + entry.path(), + ) + .await? + { + ImportActorOutcome::Imported => imported_actors += 1, + ImportActorOutcome::Skipped(warning) => { + tracing::warn!(warning = %warning, target_namespace = %body.target_namespace, "skipping imported actor"); + skipped_actors += 1; + warnings.push(warning); + } + } + } + + Ok(ImportResponse { + imported_actors, + skipped_actors, + warnings, + }) +} + +async fn import_actor_dir( + ctx: &ApiCtx, + target_namespace: &str, + target_namespace_id: Id, + actor_dir: PathBuf, +) -> Result { + let actor_folder = actor_dir + .file_name() + .map(|name| name.to_string_lossy().into_owned()) + .unwrap_or_else(|| actor_dir.display().to_string()); + let metadata_path = actor_dir.join("metadata.json"); + let kv_path = actor_dir.join("kv.bin"); + + if !fs::try_exists(&metadata_path).await? { + return Ok(ImportActorOutcome::Skipped(format!( + "skipped malformed archive entry {actor_folder}: missing metadata.json" + ))); + } + if !fs::try_exists(&kv_path).await? { + return Ok(ImportActorOutcome::Skipped(format!( + "skipped malformed archive entry {actor_folder}: missing kv.bin" + ))); + } + + let metadata: ActorMetadataV1 = match read_json(&metadata_path).await { + Ok(metadata) => metadata, + Err(err) => { + return Ok(ImportActorOutcome::Skipped(format!( + "skipped malformed archive entry {actor_folder}: failed to parse metadata.json: {err:#}" + ))); + } + }; + + if actor_exists_with_name_and_key(ctx, target_namespace, &metadata.name, metadata.key.as_deref()).await? { + return Ok(ImportActorOutcome::Skipped(format!( + "skipped archive actor {} (name={}, key={:?}) because target namespace {} already has the same (name, key)", + metadata.source_actor_id, + metadata.name, + metadata.key, + target_namespace, + ))); + } + + // Source actor IDs are retained in archive paths for provenance only. + // Import must always generate new actor IDs because the target may be another namespace in the same cluster. + let created_actor = create_imported_actor( + ctx, + target_namespace, + target_namespace_id, + &metadata, + ) + .await?; + + match replay_actor_kv(ctx, &created_actor, &kv_path).await { + Ok(()) => Ok(ImportActorOutcome::Imported), + Err(err) => match rollback_imported_actor(ctx, target_namespace, created_actor.actor_id).await { + Ok(()) => Ok(ImportActorOutcome::Skipped(format!( + "rolled back partial import for archive actor {} (name={}, key={:?}) in namespace {} after error: {err:#}", + metadata.source_actor_id, + metadata.name, + metadata.key, + target_namespace, + ))), + Err(rollback_err) => Err(rollback_err).context(format!( + "failed to roll back partial import for archive actor {} after import error: {err:#}", + metadata.source_actor_id, + )), + }, + } +} + +async fn resolve_selected_actors( + ctx: &ApiCtx, + namespace: &str, + selector: &ExportSelector, +) -> Result> { + match parse_selector(selector)? { + SelectorVariant::All => collect_all_actors(ctx, namespace).await, + SelectorVariant::ActorNames(names) => { + let mut actors = Vec::new(); + let mut seen = HashSet::new(); + for name in names { + for actor in collect_actors_for_name(ctx, namespace, &name).await? { + if seen.insert(actor.actor_id) { + actors.push(actor); + } + } + } + Ok(actors) + } + SelectorVariant::ActorIds(ids) => { + let inner_ctx: rivet_api_builder::ApiCtx = ctx.clone().into(); + utils::fetch_actors_by_ids(&inner_ctx, ids, namespace.to_string(), Some(false), None) + .await + } + } +} + +fn parse_selector(selector: &ExportSelector) -> Result { + let variant_count = usize::from(selector.all.unwrap_or(false)) + + usize::from(selector.actor_names.is_some()) + + usize::from(selector.actor_ids.is_some()); + if variant_count != 1 { + return Err(errors::Validation::InvalidInput { + message: "export selector must set exactly one of `all`, `actor_names`, or `actor_ids`".to_string(), + } + .build()); + } + + if selector.all == Some(true) { + return Ok(SelectorVariant::All); + } + + if let Some(ExportActorNamesSelector { names }) = &selector.actor_names { + if names.is_empty() { + return Err(errors::Validation::InvalidInput { + message: "`actor_names.names` must not be empty".to_string(), + } + .build()); + } + + let mut deduped = Vec::new(); + let mut seen = HashSet::new(); + for name in names { + if seen.insert(name.clone()) { + deduped.push(name.clone()); + } + } + return Ok(SelectorVariant::ActorNames(deduped)); + } + + if let Some(ExportActorIdsSelector { ids }) = &selector.actor_ids { + if ids.is_empty() { + return Err(errors::Validation::InvalidInput { + message: "`actor_ids.ids` must not be empty".to_string(), + } + .build()); + } + + let mut deduped = Vec::new(); + let mut seen = HashSet::new(); + for actor_id in ids { + if seen.insert(*actor_id) { + deduped.push(*actor_id); + } + } + return Ok(SelectorVariant::ActorIds(deduped)); + } + + Err(errors::Validation::InvalidInput { + message: "`all` must be true when used".to_string(), + } + .build()) +} + +async fn collect_all_actors(ctx: &ApiCtx, namespace: &str) -> Result> { + let mut actors = Vec::new(); + let mut names_cursor = None; + + loop { + let names_res = list_names_routes::list_names_inner( + // list_names_inner handles fanout and pagination for actor names across datacenters. + ctx.clone(), + list_names_types::ListNamesQuery { + namespace: namespace.to_string(), + limit: Some(ACTOR_LIST_PAGE_SIZE), + cursor: names_cursor.clone(), + }, + ) + .await?; + + let mut names = names_res.names.into_keys().collect::>(); + names.sort(); + + for name in names { + actors.extend(collect_actors_for_name(ctx, namespace, &name).await?); + } + + if names_res.pagination.cursor.is_none() { + break; + } + names_cursor = names_res.pagination.cursor; + } + + Ok(actors) +} + +async fn collect_actors_for_name(ctx: &ApiCtx, namespace: &str, name: &str) -> Result> { + let mut actors = Vec::new(); + let mut cursor = None; + + loop { + let res = list_routes::list_inner( + // list_inner handles the cross-datacenter actor fanout for a specific actor name. + ctx.clone(), + list_types::ListQuery { + namespace: namespace.to_string(), + name: Some(name.to_string()), + key: None, + actor_ids: None, + actor_id: Vec::new(), + include_destroyed: Some(false), + limit: Some(ACTOR_LIST_PAGE_SIZE), + cursor: cursor.clone(), + }, + ) + .await?; + + actors.extend(res.actors); + + if res.pagination.cursor.is_none() { + break; + } + cursor = res.pagination.cursor; + } + + Ok(actors) +} + +async fn export_actor_kv(ctx: &ApiCtx, actor: &Actor, path: &Path) -> Result<()> { + let file = fs::File::create(path).await?; + let mut writer = BufWriter::new(file); + let recipient = pegboard::actor_kv::Recipient { + actor_id: actor.actor_id, + namespace_id: actor.namespace_id, + name: actor.name.clone(), + }; + // KV keys are tuple-encoded with two wrapper bytes, so the largest legal raw key is + // `MAX_KEY_SIZE - 2` bytes long. + let max_end_key = vec![0xFF; pegboard::actor_kv::MAX_KEY_SIZE - 2]; + let mut after_key: Option> = None; + + loop { + let previous_key = after_key.clone(); + // TODO: v1 does not quiesce actors before export. A future workflow should freeze or otherwise + // quiesce actors before export to improve consistency. + let query = if let Some(start) = previous_key.clone() { + ep::KvListQuery::KvListRangeQuery(ep::KvListRangeQuery { + start, + end: max_end_key.clone(), + exclusive: true, + }) + } else { + ep::KvListQuery::KvListAllQuery + }; + let (keys, values, _) = + pegboard::actor_kv::list(&*ctx.udb()?, &recipient, query, false, Some(KV_BATCH_SIZE)) + .await?; + + if keys.is_empty() { + break; + } + + let mut wrote_any = false; + for (key, value) in keys.into_iter().zip(values.into_iter()).filter(|(key, _)| { + previous_key + .as_ref() + .map(|prev| key != prev) + .unwrap_or(true) + }) { + let payload = encode_kv_entry(&KvArchiveEntry { + key: key.clone(), + value, + })?; + writer.write_u32(payload.len().try_into()?).await?; + writer.write_all(&payload).await?; + after_key = Some(key); + wrote_any = true; + } + + if !wrote_any { + break; + } + } + + writer.flush().await?; + Ok(()) +} + +async fn create_imported_actor( + ctx: &ApiCtx, + target_namespace: &str, + target_namespace_id: Id, + metadata: &ActorMetadataV1, +) -> Result { + let inner_ctx: rivet_api_builder::ApiCtx = ctx.clone().into(); + let target_dc_label = utils::find_dc_for_actor_creation( + &inner_ctx, + target_namespace_id, + target_namespace, + &metadata.runner_name_selector, + None, + ) + .await?; + let actor_id = Id::new_v1(target_dc_label); + let query = rivet_api_peer::actors::import_create::ImportCreateQuery { + namespace: target_namespace.to_string(), + }; + let request = rivet_api_peer::actors::import_create::ImportCreateRequest { + actor_id, + name: metadata.name.clone(), + key: metadata.key.clone(), + runner_name_selector: metadata.runner_name_selector.clone(), + crash_policy: metadata.crash_policy, + create_ts: metadata.create_ts, + }; + + let response = if target_dc_label == ctx.config().dc_label() { + rivet_api_peer::actors::import_create::create(ctx.clone().into(), (), query, request).await? + } else { + request_remote_datacenter::( + ctx.config(), + target_dc_label, + "/actors/import-create", + Method::POST, + Some(&query), + Some(&request), + ) + .await? + }; + + Ok(response.actor) +} + +async fn replay_actor_kv(ctx: &ApiCtx, actor: &Actor, kv_path: &Path) -> Result<()> { + let file = fs::File::open(kv_path).await?; + let mut reader = BufReader::new(file); + let recipient = pegboard::actor_kv::Recipient { + actor_id: actor.actor_id, + namespace_id: actor.namespace_id, + name: actor.name.clone(), + }; + let mut keys = Vec::new(); + let mut values = Vec::new(); + + loop { + let entry_len = match reader.read_u32().await { + Ok(len) => usize::try_from(len)?, + Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => break, + Err(err) => return Err(err.into()), + }; + + let mut payload = vec![0; entry_len]; + reader.read_exact(&mut payload).await?; + let entry = decode_kv_entry(&payload)?; + keys.push(entry.key); + values.push(entry.value); + + if keys.len() >= KV_BATCH_SIZE { + pegboard::actor_kv::put(&*ctx.udb()?, &recipient, keys, values).await?; + keys = Vec::new(); + values = Vec::new(); + } + } + + if !keys.is_empty() { + pegboard::actor_kv::put(&*ctx.udb()?, &recipient, keys, values).await?; + } + + Ok(()) +} + +async fn rollback_imported_actor( + ctx: &ApiCtx, + target_namespace: &str, + actor_id: Id, +) -> Result<()> { + if actor_id.label() == ctx.config().dc_label() { + rivet_api_peer::actors::delete::delete( + ctx.clone().into(), + delete::DeletePath { actor_id }, + delete::DeleteQuery { + namespace: target_namespace.to_string(), + }, + ) + .await?; + } else { + request_remote_datacenter::( + ctx.config(), + actor_id.label(), + &format!("/actors/{actor_id}"), + Method::DELETE, + Some(&delete::DeleteQuery { + namespace: target_namespace.to_string(), + }), + Option::<&()>::None, + ) + .await?; + } + + Ok(()) +} + +async fn actor_exists_with_name_and_key( + ctx: &ApiCtx, + namespace: &str, + name: &str, + key: Option<&str>, +) -> Result { + if let Some(key) = key { + let res = list_routes::list_inner( + ctx.clone(), + list_types::ListQuery { + namespace: namespace.to_string(), + name: Some(name.to_string()), + key: Some(key.to_string()), + actor_ids: None, + actor_id: Vec::new(), + include_destroyed: Some(false), + limit: Some(1), + cursor: None, + }, + ) + .await?; + + return Ok(!res.actors.is_empty()); + } + + let mut cursor = None; + loop { + let res = list_routes::list_inner( + ctx.clone(), + list_types::ListQuery { + namespace: namespace.to_string(), + name: Some(name.to_string()), + key: None, + actor_ids: None, + actor_id: Vec::new(), + include_destroyed: Some(false), + limit: Some(ACTOR_LIST_PAGE_SIZE), + cursor: cursor.clone(), + }, + ) + .await?; + + if res.actors.iter().any(|actor| actor.key.is_none()) { + return Ok(true); + } + + if res.pagination.cursor.is_none() { + return Ok(false); + } + cursor = res.pagination.cursor; + } +} + +async fn write_json(path: &Path, value: &T) -> Result<()> { + let bytes = serde_json::to_vec_pretty(value)?; + fs::write(path, bytes).await?; + Ok(()) +} + +async fn read_json Deserialize<'de>>(path: &Path) -> Result { + let bytes = fs::read(path).await?; + Ok(serde_json::from_slice(&bytes)?) +} + +fn encode_kv_entry(entry: &KvArchiveEntry) -> Result> { + Ok(serde_bare::to_vec(entry)?) +} + +fn decode_kv_entry(payload: &[u8]) -> Result { + Ok(serde_bare::from_slice(payload)?) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn selector_requires_exactly_one_variant() { + let err = parse_selector(&ExportSelector { + all: Some(true), + actor_names: Some(ExportActorNamesSelector { + names: vec!["foo".to_string()], + }), + actor_ids: None, + }) + .expect_err("selector with multiple variants should fail"); + + assert!( + err.to_string().contains("exactly one"), + "unexpected selector validation error: {err:#}" + ); + } + + #[test] + fn selector_accepts_actor_ids() { + let selector = parse_selector(&ExportSelector { + all: None, + actor_names: None, + actor_ids: Some(ExportActorIdsSelector { + ids: vec![Id::new_v1(1), Id::new_v1(1)], + }), + }) + .expect("selector with actor ids should be valid"); + + match selector { + SelectorVariant::ActorIds(ids) => assert_eq!(ids.len(), 2), + _ => panic!("expected actor id selector"), + } + } + + #[test] + fn kv_entry_round_trip() { + let encoded = encode_kv_entry(&KvArchiveEntry { + key: b"hello".to_vec(), + value: b"world".to_vec(), + }) + .expect("failed to encode kv entry"); + let decoded = decode_kv_entry(&encoded).expect("failed to decode kv entry"); + + assert_eq!(decoded.key, b"hello"); + assert_eq!(decoded.value, b"world"); + } +} diff --git a/engine/packages/api-public/src/actors/list.rs b/engine/packages/api-public/src/actors/list.rs index 766dc3e26c..1069c40c43 100644 --- a/engine/packages/api-public/src/actors/list.rs +++ b/engine/packages/api-public/src/actors/list.rs @@ -47,7 +47,7 @@ pub async fn list(Extension(ctx): Extension, Query(query): Query Result { +pub(crate) async fn list_inner(ctx: ApiCtx, query: ListQuery) -> Result { // Reading is allowed, list requires auth if query.actor_ids.is_none() && query.actor_id.is_empty() && query.key.is_none() { ctx.auth().await?; diff --git a/engine/packages/api-public/src/actors/list_names.rs b/engine/packages/api-public/src/actors/list_names.rs index 9815d39a63..743bfa4895 100644 --- a/engine/packages/api-public/src/actors/list_names.rs +++ b/engine/packages/api-public/src/actors/list_names.rs @@ -38,7 +38,7 @@ pub async fn list_names( } #[tracing::instrument(skip_all)] -async fn list_names_inner(ctx: ApiCtx, query: ListNamesQuery) -> Result { +pub(crate) async fn list_names_inner(ctx: ApiCtx, query: ListNamesQuery) -> Result { ctx.auth().await?; // Prepare peer query for local handler diff --git a/engine/packages/api-public/src/actors/mod.rs b/engine/packages/api-public/src/actors/mod.rs index 710e3e4f96..05a21fedcb 100644 --- a/engine/packages/api-public/src/actors/mod.rs +++ b/engine/packages/api-public/src/actors/mod.rs @@ -1,6 +1,7 @@ pub mod create; pub mod delete; pub mod get_or_create; +pub mod import_export; pub mod kv_get; pub mod list; pub mod list_names; diff --git a/engine/packages/api-public/src/router.rs b/engine/packages/api-public/src/router.rs index 942c7b48c2..8265a15012 100644 --- a/engine/packages/api-public/src/router.rs +++ b/engine/packages/api-public/src/router.rs @@ -20,6 +20,8 @@ use crate::{ actors::delete::delete, actors::list_names::list_names, actors::get_or_create::get_or_create, + actors::import_export::export, + actors::import_export::import, actors::kv_get::kv_get, actors::sleep::sleep, actors::reschedule::reschedule, @@ -88,6 +90,14 @@ pub async fn router( "/actors", axum::routing::put(actors::get_or_create::get_or_create), ) + .route( + "/admin/actors/export", + axum::routing::post(actors::import_export::export), + ) + .route( + "/admin/actors/import", + axum::routing::post(actors::import_export::import), + ) .route( "/actors/{actor_id}", axum::routing::delete(actors::delete::delete), diff --git a/engine/packages/api-types/src/actors/import_export.rs b/engine/packages/api-types/src/actors/import_export.rs new file mode 100644 index 0000000000..8865068dc1 --- /dev/null +++ b/engine/packages/api-types/src/actors/import_export.rs @@ -0,0 +1,55 @@ +use gas::prelude::*; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(deny_unknown_fields)] +pub struct ExportRequest { + pub namespace: String, + pub selector: ExportSelector, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(deny_unknown_fields)] +pub struct ExportSelector { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub all: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub actor_names: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub actor_ids: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(deny_unknown_fields)] +pub struct ExportActorNamesSelector { + pub names: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(deny_unknown_fields)] +pub struct ExportActorIdsSelector { + pub ids: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(deny_unknown_fields)] +pub struct ExportResponse { + pub archive_path: String, + pub actor_count: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(deny_unknown_fields)] +pub struct ImportRequest { + pub target_namespace: String, + pub archive_path: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(deny_unknown_fields)] +pub struct ImportResponse { + pub imported_actors: usize, + pub skipped_actors: usize, + pub warnings: Vec, +} diff --git a/engine/packages/api-types/src/actors/kv_get.rs b/engine/packages/api-types/src/actors/kv_get.rs index 7d5167cfcb..25fc6fd27b 100644 --- a/engine/packages/api-types/src/actors/kv_get.rs +++ b/engine/packages/api-types/src/actors/kv_get.rs @@ -16,7 +16,7 @@ pub struct KvGetPath { pub key: String, } -#[derive(Serialize, ToSchema)] +#[derive(Debug, Deserialize, Serialize, ToSchema)] #[schema(as = ActorsKvGetResponse)] #[serde(deny_unknown_fields)] pub struct KvGetResponse { diff --git a/engine/packages/api-types/src/actors/mod.rs b/engine/packages/api-types/src/actors/mod.rs index 8b76651d01..081991964a 100644 --- a/engine/packages/api-types/src/actors/mod.rs +++ b/engine/packages/api-types/src/actors/mod.rs @@ -1,6 +1,7 @@ pub mod create; pub mod delete; pub mod get_or_create; +pub mod import_export; pub mod kv_get; pub mod list; pub mod list_names; diff --git a/engine/packages/engine/tests/actor_import_export_e2e.rs b/engine/packages/engine/tests/actor_import_export_e2e.rs new file mode 100644 index 0000000000..331ff06b89 --- /dev/null +++ b/engine/packages/engine/tests/actor_import_export_e2e.rs @@ -0,0 +1,338 @@ +#[path = "common/api/mod.rs"] +mod api; +#[path = "common/ctx.rs"] +mod ctx; + +use std::{collections::HashMap, future::Future, time::Duration}; + +use anyhow::{Context, Result}; +use base64::Engine; +use gas::prelude::*; +use rivet_api_types::{ + actors::{ + import_export::{ExportActorIdsSelector, ExportRequest, ExportSelector, ImportRequest}, + kv_get, list, + }, + namespaces::runner_configs::{RunnerConfig, RunnerConfigKind}, +}; + +const RUNNER_NAME: &str = "import-export-runner"; +const ACTOR_NAME: &str = "import-export-actor"; +const KV_KEY: &[u8] = b"test-key"; +const KV_VALUE: &[u8] = b"test-value"; + +#[test] +fn actor_import_export_round_trip_e2e() { + run_test(30, |ctx| async move { + let source = create_namespace(&ctx, "source").await?; + let target = create_namespace(&ctx, "target").await?; + + upsert_normal_runner_config(ctx.leader_dc().guard_port(), &source.name, RUNNER_NAME).await?; + upsert_normal_runner_config(ctx.leader_dc().guard_port(), &target.name, RUNNER_NAME).await?; + + let source_actor = create_sleeping_actor_with_kv( + ctx.leader_dc(), + &source, + ACTOR_NAME, + Some("round-trip-key".to_string()), + ) + .await?; + + wait_for_actor( + ctx.leader_dc().guard_port(), + &source.name, + ACTOR_NAME, + source_actor.key.clone(), + ) + .await?; + + let export = api::public::admin_actors_export( + ctx.leader_dc().guard_port(), + ExportRequest { + namespace: source.name.clone(), + selector: ExportSelector { + all: None, + actor_names: None, + actor_ids: Some(ExportActorIdsSelector { + ids: vec![source_actor.actor_id], + }), + }, + }, + ) + .await?; + + assert_eq!(export.actor_count, 1); + + let import = api::public::admin_actors_import( + ctx.leader_dc().guard_port(), + ImportRequest { + target_namespace: target.name.clone(), + archive_path: export.archive_path.clone(), + }, + ) + .await?; + + assert_eq!(import.imported_actors, 1); + assert_eq!(import.skipped_actors, 0); + assert!(import.warnings.is_empty()); + + let imported_actor = wait_for_actor( + ctx.leader_dc().guard_port(), + &target.name, + ACTOR_NAME, + source_actor.key.clone(), + ) + .await?; + + assert_ne!(imported_actor.actor_id, source_actor.actor_id); + assert_eq!(imported_actor.create_ts, source_actor.create_ts); + assert!(imported_actor.start_ts.is_none()); + assert!(imported_actor.sleep_ts.is_some()); + + let kv = api::public::actors_kv_get( + ctx.leader_dc().guard_port(), + kv_get::KvGetPath { + actor_id: imported_actor.actor_id, + key: base64::engine::general_purpose::STANDARD.encode(KV_KEY), + }, + kv_get::KvGetQuery { + namespace: target.name.clone(), + }, + ) + .await?; + + assert_eq!( + base64::engine::general_purpose::STANDARD + .decode(kv.value) + .context("decode imported kv value")?, + KV_VALUE + ); + + tokio::fs::remove_dir_all(&export.archive_path) + .await + .with_context(|| format!("remove archive {}", export.archive_path))?; + + Ok(()) + }); +} + +#[test] +fn actor_import_export_skips_name_key_collisions_e2e() { + run_test(30, |ctx| async move { + let source = create_namespace(&ctx, "collision-source").await?; + let target = create_namespace(&ctx, "collision-target").await?; + + upsert_normal_runner_config(ctx.leader_dc().guard_port(), &source.name, RUNNER_NAME).await?; + upsert_normal_runner_config(ctx.leader_dc().guard_port(), &target.name, RUNNER_NAME).await?; + + let actor_key = Some("collision-key".to_string()); + let source_actor = + create_sleeping_actor_with_kv(ctx.leader_dc(), &source, ACTOR_NAME, actor_key.clone()) + .await?; + let existing_target_actor = + create_sleeping_actor_with_kv(ctx.leader_dc(), &target, ACTOR_NAME, actor_key.clone()) + .await?; + + let export = api::public::admin_actors_export( + ctx.leader_dc().guard_port(), + ExportRequest { + namespace: source.name.clone(), + selector: ExportSelector { + all: None, + actor_names: None, + actor_ids: Some(ExportActorIdsSelector { + ids: vec![source_actor.actor_id], + }), + }, + }, + ) + .await?; + + let import = api::public::admin_actors_import( + ctx.leader_dc().guard_port(), + ImportRequest { + target_namespace: target.name.clone(), + archive_path: export.archive_path.clone(), + }, + ) + .await?; + + assert_eq!(import.imported_actors, 0); + assert_eq!(import.skipped_actors, 1); + assert_eq!(import.warnings.len(), 1); + + let actors = list_matching_actors( + ctx.leader_dc().guard_port(), + &target.name, + ACTOR_NAME, + actor_key.clone(), + ) + .await?; + + assert_eq!(actors.len(), 1); + assert_eq!(actors[0].actor_id, existing_target_actor.actor_id); + + tokio::fs::remove_dir_all(&export.archive_path) + .await + .with_context(|| format!("remove archive {}", export.archive_path))?; + + Ok(()) + }); +} + +fn run_test(timeout_secs: u64, test_fn: F) +where + F: FnOnce(ctx::TestCtx) -> Fut, + Fut: Future>, +{ + let runtime = tokio::runtime::Runtime::new().expect("build tokio runtime"); + runtime.block_on(async move { + let ctx = ctx::TestCtx::new_with_opts(ctx::TestOpts::new(1).with_timeout(timeout_secs)) + .await + .expect("build test ctx"); + tokio::time::timeout(Duration::from_secs(timeout_secs), test_fn(ctx)) + .await + .expect("test timed out") + .expect("test failed"); + }); +} + +struct TestNamespace { + name: String, + id: rivet_util::Id, +} + +async fn create_namespace(ctx: &ctx::TestCtx, prefix: &str) -> Result { + let namespace_name = format!("{prefix}-{:04x}", rand::random::()); + let response = api::public::namespaces_create( + ctx.leader_dc().guard_port(), + rivet_api_peer::namespaces::CreateRequest { + name: namespace_name, + display_name: "Test Namespace".to_string(), + }, + ) + .await?; + + Ok(TestNamespace { + name: response.namespace.name, + id: response.namespace.namespace_id, + }) +} + +async fn upsert_normal_runner_config(port: u16, namespace: &str, runner_name: &str) -> Result<()> { + let mut datacenters = HashMap::new(); + datacenters.insert( + "dc-1".to_string(), + RunnerConfig { + kind: RunnerConfigKind::Normal {}, + metadata: None, + drain_on_version_upgrade: true, + }, + ); + + api::public::runner_configs_upsert( + port, + rivet_api_peer::runner_configs::UpsertPath { + runner_name: runner_name.to_string(), + }, + rivet_api_peer::runner_configs::UpsertQuery { + namespace: namespace.to_string(), + }, + rivet_api_public::runner_configs::upsert::UpsertRequest { datacenters }, + ) + .await?; + + Ok(()) +} + +async fn create_sleeping_actor_with_kv( + dc: &ctx::TestDatacenter, + namespace: &TestNamespace, + name: &str, + key: Option, +) -> Result { + let actor_id = rivet_util::Id::new_v1(dc.config.dc_label()); + let actor = dc + .workflow_ctx + .op(pegboard::ops::actor::create::Input { + actor_id, + namespace_id: namespace.id, + name: name.to_string(), + key, + runner_name_selector: RUNNER_NAME.to_string(), + crash_policy: rivet_types::actors::CrashPolicy::Destroy, + input: None, + start_immediately: false, + create_ts: None, + forward_request: false, + datacenter_name: Some( + dc.config + .dc_name() + .context("test dc missing name")? + .to_string(), + ), + }) + .await? + .actor; + + let recipient = pegboard::actor_kv::Recipient { + actor_id: actor.actor_id, + namespace_id: namespace.id, + name: actor.name.clone(), + }; + pegboard::actor_kv::put( + &*dc.workflow_ctx.udb().context("missing workflow db")?, + &recipient, + vec![KV_KEY.to_vec()], + vec![KV_VALUE.to_vec()], + ) + .await?; + + Ok(actor) +} + +async fn wait_for_actor( + port: u16, + namespace: &str, + name: &str, + key: Option, +) -> Result { + let start = std::time::Instant::now(); + let timeout = Duration::from_secs(10); + + loop { + let actors = list_matching_actors(port, namespace, name, key.clone()).await?; + if let Some(actor) = actors.into_iter().next() { + return Ok(actor); + } + + if start.elapsed() >= timeout { + anyhow::bail!("timed out waiting for actor {name} in namespace {namespace}"); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + } +} + +async fn list_matching_actors( + port: u16, + namespace: &str, + name: &str, + key: Option, +) -> Result> { + Ok(api::public::actors_list( + port, + list::ListQuery { + namespace: namespace.to_string(), + name: Some(name.to_string()), + key, + actor_ids: None, + actor_id: Vec::new(), + include_destroyed: Some(false), + limit: Some(10), + cursor: None, + }, + ) + .await? + .actors) +} diff --git a/engine/packages/engine/tests/common/api/peer.rs b/engine/packages/engine/tests/common/api/peer.rs index 99b0519c4f..7256395382 100644 --- a/engine/packages/engine/tests/common/api/peer.rs +++ b/engine/packages/engine/tests/common/api/peer.rs @@ -1,7 +1,7 @@ #![allow(dead_code, unused_variables)] use anyhow::*; -use rivet_api_types::{actors, namespaces, pagination, runner_configs, runners}; +use rivet_api_types::{actors, namespaces, runner_configs, runners}; use super::get_endpoint; diff --git a/engine/packages/engine/tests/common/api/public.rs b/engine/packages/engine/tests/common/api/public.rs index 302de68861..ce125ff942 100644 --- a/engine/packages/engine/tests/common/api/public.rs +++ b/engine/packages/engine/tests/common/api/public.rs @@ -1,7 +1,7 @@ #![allow(dead_code, unused_variables)] use anyhow::*; -use rivet_api_types::{actors, datacenters, namespaces, pagination, runner_configs, runners}; +use rivet_api_types::{actors, datacenters, namespaces, runner_configs, runners}; use serde::{Deserialize, Serialize}; use super::get_endpoint; @@ -299,6 +299,44 @@ pub async fn actors_create( parse_response(response).await } +pub async fn build_admin_actors_export_request( + port: u16, + request: actors::import_export::ExportRequest, +) -> Result { + let client = rivet_pools::reqwest::client().await?; + Ok(client + .post(format!("{}/admin/actors/export", get_endpoint(port))) + .json(&request)) +} + +pub async fn admin_actors_export( + port: u16, + request: actors::import_export::ExportRequest, +) -> Result { + let req = build_admin_actors_export_request(port, request).await?; + let response = req.send().await?; + parse_response(response).await +} + +pub async fn build_admin_actors_import_request( + port: u16, + request: actors::import_export::ImportRequest, +) -> Result { + let client = rivet_pools::reqwest::client().await?; + Ok(client + .post(format!("{}/admin/actors/import", get_endpoint(port))) + .json(&request)) +} + +pub async fn admin_actors_import( + port: u16, + request: actors::import_export::ImportRequest, +) -> Result { + let req = build_admin_actors_import_request(port, request).await?; + let response = req.send().await?; + parse_response(response).await +} + #[derive(Debug, Serialize, Deserialize)] pub struct GetOrCreateQuery { pub namespace: String, @@ -369,6 +407,31 @@ pub async fn actors_delete( parse_response(response).await } +pub async fn build_actors_kv_get_request( + port: u16, + path: actors::kv_get::KvGetPath, + query: actors::kv_get::KvGetQuery, +) -> Result { + let client = rivet_pools::reqwest::client().await?; + Ok(client.get(format!( + "{}/actors/{}/kv/keys/{}?{}", + get_endpoint(port), + path.actor_id, + urlencoding::encode(&path.key), + serde_html_form::to_string(&query)?, + ))) +} + +pub async fn actors_kv_get( + port: u16, + path: actors::kv_get::KvGetPath, + query: actors::kv_get::KvGetQuery, +) -> Result { + let request = build_actors_kv_get_request(port, path, query).await?; + let response = request.send().await?; + parse_response(response).await +} + pub async fn build_actors_list_names_request( port: u16, query: actors::list_names::ListNamesQuery, diff --git a/engine/packages/guard/src/routing/pegboard_gateway/resolve_actor_query.rs b/engine/packages/guard/src/routing/pegboard_gateway/resolve_actor_query.rs index 61b2d3def0..bc76d512a3 100644 --- a/engine/packages/guard/src/routing/pegboard_gateway/resolve_actor_query.rs +++ b/engine/packages/guard/src/routing/pegboard_gateway/resolve_actor_query.rs @@ -128,6 +128,8 @@ async fn resolve_query_get_or_create_actor_id( runner_name_selector: runner_name.to_string(), crash_policy, input: encoded_input, + start_immediately: true, + create_ts: None, forward_request: true, datacenter_name: None, }) diff --git a/engine/packages/pegboard/src/ops/actor/create.rs b/engine/packages/pegboard/src/ops/actor/create.rs index 74cbe08898..47b4d44d2b 100644 --- a/engine/packages/pegboard/src/ops/actor/create.rs +++ b/engine/packages/pegboard/src/ops/actor/create.rs @@ -12,6 +12,8 @@ pub struct Input { pub runner_name_selector: String, pub crash_policy: CrashPolicy, pub input: Option, + pub start_immediately: bool, + pub create_ts: Option, /// If true, will handle ForwardToDatacenter errors by forwarding the request to the correct datacenter. /// Used by api-public. api-peer should set this to false. pub forward_request: bool, @@ -67,6 +69,8 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result< namespace_id: input.namespace_id, input: input.input.clone(), from_v1: false, + start_immediately: input.start_immediately, + create_ts: input.create_ts, }) .tag("actor_id", input.actor_id) .dispatch() @@ -117,6 +121,8 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result< namespace_id: input.namespace_id, crash_policy: input.crash_policy, input: input.input.clone(), + start_immediately: input.start_immediately, + create_ts: input.create_ts, }) .tag("actor_id", input.actor_id) .dispatch() diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index d0ea680041..f4b9c49b91 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -30,6 +30,14 @@ pub struct Input { /// Arbitrary user-provided binary encoded in base64. We assume this is valid base64. pub input: Option, + #[serde(default = "default_true")] + pub start_immediately: bool, + #[serde(default)] + pub create_ts: Option, +} + +fn default_true() -> bool { + true } #[workflow] @@ -73,7 +81,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> namespace_id: input.namespace_id, runner_name_selector: input.runner_name_selector.clone(), crash_policy: input.crash_policy, - create_ts: ctx.create_ts(), + create_ts: input.create_ts.unwrap_or_else(|| ctx.create_ts()), }) .await?; @@ -170,12 +178,24 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .await?; - ctx.msg(CreateComplete {}) - .topic(("actor_id", input.actor_id)) - .send() + let lifecycle_state = if !input.start_immediately { + ctx.activity(runtime::SetSleepingInput { + actor_id: input.actor_id, + }) .await?; - let lifecycle_state = + ctx.msg(CreateComplete {}) + .topic(("actor_id", input.actor_id)) + .send() + .await?; + + runtime::LifecycleState::new_sleeping() + } else { + ctx.msg(CreateComplete {}) + .topic(("actor_id", input.actor_id)) + .send() + .await?; + match runtime::spawn_actor(ctx, input, 0, AllocationOverride::None).await? { runtime::SpawnActorOutput::Allocated { runner_id, @@ -233,6 +253,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> namespace_id: input.namespace_id, input: input.input.clone(), from_v1: true, + start_immediately: input.start_immediately, + create_ts: input.create_ts, }) .tag("actor_id", input.actor_id) .dispatch() @@ -245,7 +267,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> return Ok(()); } - }; + } + }; let lifecycle_res = ctx .loope(lifecycle_state, |ctx, state| { @@ -868,6 +891,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> namespace_id: input.namespace_id, input: input.input.clone(), from_v1: true, + start_immediately: input.start_immediately, + create_ts: input.create_ts, }) .tag("actor_id", input.actor_id) .dispatch() diff --git a/engine/packages/pegboard/src/workflows/actor2/mod.rs b/engine/packages/pegboard/src/workflows/actor2/mod.rs index 4f4e8c96d6..d643ab4995 100644 --- a/engine/packages/pegboard/src/workflows/actor2/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor2/mod.rs @@ -28,6 +28,14 @@ pub struct Input { /// Arbitrary user-provided binary data encoded in base64. pub input: Option, pub from_v1: bool, + #[serde(default = "default_true")] + pub start_immediately: bool, + #[serde(default)] + pub create_ts: Option, +} + +fn default_true() -> bool { + true } #[derive(Deserialize, Serialize)] @@ -117,7 +125,7 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> pool_name: input.pool_name.clone(), key: input.key.clone(), namespace_id: input.namespace_id, - create_ts: ctx.create_ts(), + create_ts: input.create_ts.unwrap_or_else(|| ctx.create_ts()), from_v1: input.from_v1, }) .await?; @@ -170,11 +178,6 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } ctx.activity(PopulateIndexesInput {}).await?; - - ctx.msg(CreateComplete {}) - .topic(("actor_id", input.actor_id)) - .send() - .await?; } // Spawn adjacent workflows @@ -188,10 +191,29 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .dispatch() .await?; - let mut lifecycle_state = runtime::LifecycleState::new(); + let mut lifecycle_state = if input.start_immediately { + runtime::LifecycleState::new() + } else { + ctx.activity(runtime::SetSleepingInput {}).await?; + runtime::LifecycleState { + generation: 0, + transition: runtime::Transition::Sleeping, + alarm_ts: None, + retry_backoff_state: runtime::RetryBackoffState::default(), + } + }; - // Attempt initial allocation - runtime::reschedule_actor(ctx, input, &mut lifecycle_state, metrics_workflow_id).await?; + if !input.from_v1 { + ctx.msg(CreateComplete {}) + .topic(("actor_id", input.actor_id)) + .send() + .await?; + } + + if input.start_immediately { + // Attempt initial allocation + runtime::reschedule_actor(ctx, input, &mut lifecycle_state, metrics_workflow_id).await?; + } ctx.loope(lifecycle_state, |ctx, state| { let input = input.clone(); diff --git a/engine/packages/test-snapshot-gen/src/scenarios/pb_actor_v1_pre_migration.rs b/engine/packages/test-snapshot-gen/src/scenarios/pb_actor_v1_pre_migration.rs index 6f512b42f9..7dd29e66e1 100644 --- a/engine/packages/test-snapshot-gen/src/scenarios/pb_actor_v1_pre_migration.rs +++ b/engine/packages/test-snapshot-gen/src/scenarios/pb_actor_v1_pre_migration.rs @@ -43,6 +43,8 @@ impl Scenario for PbActorV1PreMigration { runner_name_selector: "default".to_string(), input: None, crash_policy: CrashPolicy::Sleep, + start_immediately: true, + create_ts: None, forward_request: false, datacenter_name: None, }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index be21a76d89..704b57621d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -914,15 +914,9 @@ importers: examples/cursors: dependencies: - '@hono/node-server': - specifier: ^1.19.13 - version: 1.19.13(hono@4.11.9) '@rivetkit/react': specifier: workspace:* version: link:../../rivetkit-typescript/packages/react - hono: - specifier: ^4.7.0 - version: 4.11.9 react: specifier: 19.1.0 version: 19.1.0 diff --git a/rivetkit-typescript/packages/rivetkit-native/index.d.ts b/rivetkit-typescript/packages/rivetkit-native/index.d.ts index 2153132bcc..8dd3ad8de6 100644 --- a/rivetkit-typescript/packages/rivetkit-native/index.d.ts +++ b/rivetkit-typescript/packages/rivetkit-native/index.d.ts @@ -27,6 +27,7 @@ export interface JsEnvoyConfig { poolName: string version: number metadata?: any + notGlobal: boolean /** * Log level for the Rust tracing subscriber (e.g. "trace", "debug", "info", "warn", "error"). * Falls back to RIVET_LOG_LEVEL, then LOG_LEVEL, then RUST_LOG env vars. Defaults to "warn". diff --git a/rivetkit-typescript/packages/rivetkit-native/wrapper.js b/rivetkit-typescript/packages/rivetkit-native/wrapper.js index b65c635ef2..05361772ae 100644 --- a/rivetkit-typescript/packages/rivetkit-native/wrapper.js +++ b/rivetkit-typescript/packages/rivetkit-native/wrapper.js @@ -134,7 +134,7 @@ function startEnvoySync(config) { poolName: config.poolName, version: config.version, metadata: config.metadata || null, - notGlobal: config.notGlobal, + notGlobal: config.notGlobal ?? false, }, (event) => { handleEvent(event, config, wrappedHandle); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts index d43ced6f96..4371843147 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts @@ -80,6 +80,7 @@ import { dockerSandboxActor, dockerSandboxControlActor } from "./sandbox"; import { scheduled } from "./scheduled"; import { dbStressActor } from "./db-stress"; import { scheduledDb } from "./scheduled-db"; +import { sqliteCounter } from "./sqlite-import-snapshot"; import { sleep, sleepRawWsAddEventListenerClose, @@ -175,6 +176,8 @@ export const registry = setup({ dbStressActor, // From scheduled-db.ts scheduledDb, + // From sqlite-import-snapshot.ts + sqliteCounter, // From sandbox.ts dockerSandboxControlActor, dockerSandboxActor, diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/actors/d1h30qzt2m6xdtsefpuvllbjubbl00/kv.bin b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/actors/d1h30qzt2m6xdtsefpuvllbjubbl00/kv.bin new file mode 100644 index 0000000000..4050c988e7 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/actors/d1h30qzt2m6xdtsefpuvllbjubbl00/kv.bin @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:de46ee62727d96caa061750ae7622ed835c62727a886d81976f3c0edb1962c10 +size 8329 diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/actors/d1h30qzt2m6xdtsefpuvllbjubbl00/metadata.json b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/actors/d1h30qzt2m6xdtsefpuvllbjubbl00/metadata.json new file mode 100644 index 0000000000..4442dfc66e --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/actors/d1h30qzt2m6xdtsefpuvllbjubbl00/metadata.json @@ -0,0 +1,8 @@ +{ + "source_actor_id": "d1h30qzt2m6xdtsefpuvllbjubbl00", + "name": "sqliteCounter", + "key": "sqlite-import-snapshot-v2-1-x", + "runner_name_selector": "test-driver", + "crash_policy": "sleep", + "create_ts": 1776064053262 +} \ No newline at end of file diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/manifest.json b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/manifest.json new file mode 100644 index 0000000000..bf65f839f1 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x/manifest.json @@ -0,0 +1,15 @@ +{ + "version": 1, + "generated_at": 1776064053803, + "source_cluster": null, + "source_namespace_id": "57gkbnf8rnr9jobipp4xq85jlrcl00", + "source_namespace_name": "sqlite-snapshot-8688608a", + "selector": { + "actor_names": { + "names": [ + "sqliteCounter" + ] + } + }, + "actor_count": 1 +} \ No newline at end of file diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sqlite-import-snapshot.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sqlite-import-snapshot.ts new file mode 100644 index 0000000000..dc4c541270 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sqlite-import-snapshot.ts @@ -0,0 +1,46 @@ +import { actor } from "rivetkit"; +import { db } from "rivetkit/db"; + +const COUNTER_ROW_ID = 1; + +export const sqliteCounter = actor({ + db: db({ + onMigrate: async (database) => { + await database.execute(` + CREATE TABLE IF NOT EXISTS counter_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + count INTEGER NOT NULL + ) + `); + await database.execute( + "INSERT OR IGNORE INTO counter_state (id, count) VALUES (1, 0)", + ); + }, + }), + actions: { + increment: async (c, amount: number) => { + if (!Number.isFinite(amount)) { + throw new Error("increment value must be a finite number"); + } + + const delta = Math.trunc(amount); + await c.db.execute( + "UPDATE counter_state SET count = count + ? WHERE id = ?", + delta, + COUNTER_ROW_ID, + ); + const rows = await c.db.execute<{ count: number }>( + "SELECT count FROM counter_state WHERE id = ?", + COUNTER_ROW_ID, + ); + return Number(rows[0]?.count ?? 0); + }, + getCount: async (c) => { + const rows = await c.db.execute<{ count: number }>( + "SELECT count FROM counter_state WHERE id = ?", + COUNTER_ROW_ID, + ); + return Number(rows[0]?.count ?? 0); + }, + }, +}); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts index 665b4e23fa..2dd9f50fe1 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts @@ -13,6 +13,7 @@ import { runActorConnTests } from "./tests/actor-conn"; import { runActorConnHibernationTests } from "./tests/actor-conn-hibernation"; import { runActorConnStateTests } from "./tests/actor-conn-state"; import { runActorDbTests } from "./tests/actor-db"; +import { runActorImportSnapshotDbTests } from "./tests/actor-import-snapshot-db"; import { runActorDbRawTests } from "./tests/actor-db-raw"; import { runActorDbStressTests } from "./tests/actor-db-stress"; import { runConnErrorSerializationTests } from "./tests/conn-error-serialization"; @@ -108,6 +109,7 @@ export interface DriverDeployOutput { endpoint: string; namespace: string; runnerName: string; + token?: string; hardCrashActor?: (actorId: string) => Promise; hardCrashPreservesData?: boolean; @@ -152,6 +154,7 @@ export function runDriverTests( runConnErrorSerializationTests(driverTestConfig); runActorDbTests(driverTestConfig); + runActorImportSnapshotDbTests(driverTestConfig); runActorDestroyTests(driverTestConfig); @@ -304,6 +307,7 @@ export async function createTestRuntime( endpoint: rivetEngine.endpoint, namespace: rivetEngine.namespace, runnerName: rivetEngine.runnerName, + token: rivetEngine.token, hardCrashActor, hardCrashPreservesData, cleanup, @@ -389,6 +393,7 @@ export async function createTestRuntime( endpoint: serverEndpoint, namespace: "default", runnerName: "default", + token: undefined, hardCrashActor: managerDriver.hardCrashActor?.bind(managerDriver), hardCrashPreservesData: true, cleanup, diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-import-snapshot-db.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-import-snapshot-db.ts new file mode 100644 index 0000000000..154457b36b --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-import-snapshot-db.ts @@ -0,0 +1,50 @@ +import { fileURLToPath } from "node:url"; +import { describe, expect, test } from "vitest"; +import type { DriverTestConfig } from "../mod"; +import { importActorSnapshot, setupDriverTest } from "../utils"; + +const SQLITE_SNAPSHOT_DIR = fileURLToPath( + new URL( + "../../../fixtures/driver-test-suite/snapshots/sqlite-counter-v2_1_x", + import.meta.url, + ), +); +const SQLITE_SNAPSHOT_KEY = "sqlite-import-snapshot-v2-1-x"; + +export function runActorImportSnapshotDbTests( + driverTestConfig: DriverTestConfig, +) { + describe.skipIf(driverTestConfig.clientType !== "http")( + "Actor Import Snapshot Database Tests", + () => { + test( + "imports a v2.1.x sqlite snapshot and keeps the database readable", + async (c) => { + const { client, endpoint, namespace, token } = + await setupDriverTest(c, driverTestConfig); + if (!token) { + throw new Error( + "actor snapshot import tests require an admin token", + ); + } + + const importResponse = await importActorSnapshot({ + endpoint, + namespace, + token, + archivePath: SQLITE_SNAPSHOT_DIR, + }); + expect(importResponse.imported_actors).toBe(1); + expect(importResponse.skipped_actors).toBe(0); + expect(importResponse.warnings).toEqual([]); + + const actor = client.sqliteCounter.get([SQLITE_SNAPSHOT_KEY]); + expect(await actor.getCount()).toBe(5); + expect(await actor.increment(1)).toBe(6); + expect(await actor.getCount()).toBe(6); + }, + 60_000, + ); + }, + ); +} diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts index c38ac3767f..11004321f6 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts @@ -2,7 +2,7 @@ import { type TestContext, vi } from "vitest"; import { assertUnreachable } from "@/actor/utils"; import { type Client, createClient } from "@/client/mod"; import { createClientWithDriver } from "@/mod"; -import type { registry } from "../../fixtures/driver-test-suite/registry"; +import type { registry } from "../../fixtures/driver-test-suite/registry-static"; import { logger } from "./log"; import type { DriverTestConfig } from "./mod"; import { createTestInlineClientDriver } from "./test-inline-client-driver"; @@ -17,6 +17,9 @@ export async function setupDriverTest( ): Promise<{ client: Client; endpoint: string; + namespace: string; + runnerName: string; + token?: string; hardCrashActor?: (actorId: string) => Promise; hardCrashPreservesData: boolean; }> { @@ -30,6 +33,7 @@ export async function setupDriverTest( endpoint, namespace, runnerName, + token, hardCrashActor, hardCrashPreservesData, cleanup, @@ -73,11 +77,48 @@ export async function setupDriverTest( return { client, endpoint, + namespace, + runnerName, + token, hardCrashActor, hardCrashPreservesData: hardCrashPreservesData ?? false, }; } +export interface ImportActorSnapshotResponse { + imported_actors: number; + skipped_actors: number; + warnings: string[]; +} + +export async function importActorSnapshot(input: { + endpoint: string; + namespace: string; + token: string; + archivePath: string; +}): Promise { + const response = await fetch( + `${input.endpoint.replace(/\/$/, "")}/admin/actors/import`, + { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${input.token}`, + }, + body: JSON.stringify({ + target_namespace: input.namespace, + archive_path: input.archivePath, + }), + }, + ); + if (!response.ok) { + throw new Error( + `import actor snapshot failed: ${response.status} ${response.statusText} ${await response.text()}`, + ); + } + return (await response.json()) as ImportActorSnapshotResponse; +} + export async function waitFor( driverTestConfig: DriverTestConfig, ms: number, diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts b/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts index 9eb6d6772d..6a4b21533c 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts @@ -63,7 +63,7 @@ for (const registryVariant of getDriverRegistryVariants(__dirname)) { const namespace = `test-${crypto.randomUUID().slice(0, 8)}`; const poolName = process.env.RIVET_POOL_NAME || - `test-driver-${crypto.randomUUID().slice(0, 8)}`; + "test-driver"; const token = process.env.RIVET_TOKEN || "dev";