From 7c4f5bc8a25c80c96d6860e516f84e49d869a22a Mon Sep 17 00:00:00 2001 From: John Vandenberg Date: Sun, 10 May 2026 17:19:16 +0800 Subject: [PATCH 1/2] Switch to actix-ws --- Cargo.lock | 79 +----- services/ws-server/Cargo.toml | 1 - services/ws-server/src/lib.rs | 4 +- services/ws/Cargo.toml | 5 +- services/ws/src/lib.rs | 470 +++++++++++++++++----------------- 5 files changed, 249 insertions(+), 310 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04f3c1c..33f4555 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,31 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "actix" -version = "0.13.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de7fa236829ba0841304542f7614c42b80fca007455315c45c785ccfa873a85b" -dependencies = [ - "actix-macros", - "actix-rt", - "actix_derive", - "bitflags", - "bytes", - "crossbeam-channel", - "futures-core", - "futures-sink", - "futures-task", - "futures-util", - "log", - "once_cell", - "parking_lot", - "pin-project-lite", - "smallvec", - "tokio", - "tokio-util", -] - [[package]] name = "actix-codec" version = "0.5.2" @@ -243,24 +218,6 @@ dependencies = [ "url", ] -[[package]] -name = "actix-web-actors" -version = "4.3.1+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f98c5300b38fd004fe7d2a964f9a90813fdbe8a81fed500587e78b1b71c6f980" -dependencies = [ - "actix", - "actix-codec", - "actix-http", - "actix-web", - "bytes", - "bytestring", - "futures-core", - "pin-project-lite", - "tokio", - "tokio-util", -] - [[package]] name = "actix-web-codegen" version = "4.3.0" @@ -274,14 +231,17 @@ dependencies = [ ] [[package]] -name = "actix_derive" -version = "0.6.2" +name = "actix-ws" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6ac1e58cded18cb28ddc17143c4dea5345b3ad575e14f32f66e4054a56eb271" +checksum = "12d4f2fbee3ef7a22fa6cb0e416b962237a167ed0419f22d4e451da2d7f082f8" dependencies = [ - "proc-macro2", - "quote", - "syn", + "actix-codec", + "actix-http", + "actix-web", + "bytestring", + "futures-core", + "tokio", ] [[package]] @@ -707,21 +667,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" -dependencies = [ - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-utils" -version = "0.8.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" - [[package]] name = "crossterm" version = "0.28.1" @@ -1189,7 +1134,6 @@ dependencies = [ name = "et-ws-server" version = "0.1.0" dependencies = [ - "actix", "actix-rt", "actix-web", "chrono", @@ -1230,15 +1174,16 @@ dependencies = [ name = "et-ws-service" version = "0.1.0" dependencies = [ - "actix", "actix-web", - "actix-web-actors", + "actix-ws", "chrono", "edge-toolkit", + "futures-util", "opentelemetry", "serde", "serde_json", "serde_yaml", + "tokio", "tracing", "uuid", ] diff --git a/services/ws-server/Cargo.toml b/services/ws-server/Cargo.toml index 10fd6dd..69b0a6e 100644 --- a/services/ws-server/Cargo.toml +++ b/services/ws-server/Cargo.toml @@ -6,7 +6,6 @@ license.workspace = true repository.workspace = true [dependencies] -actix = "0.13" actix-rt = "2" actix-web = { version = "4", features = ["rustls-0_23"] } chrono.workspace = true diff --git a/services/ws-server/src/lib.rs b/services/ws-server/src/lib.rs index 789dd84..2e8b9e4 100644 --- a/services/ws-server/src/lib.rs +++ b/services/ws-server/src/lib.rs @@ -1,5 +1,5 @@ use actix_web::{HttpResponse, web}; -pub use et_ws_service::{WebSocketActor, WsAgentRegistry}; +pub use et_ws_service::{AgentSession, WsAgentRegistry}; pub mod config; @@ -25,7 +25,7 @@ pub fn configure_app(cfg: &mut web::ServiceConfig, agent_registry: web::Data>(cfg, &config.storage); + et_storage_service::configure::(cfg, &config.storage); // Must be last: registers a catch-all Files::new("/", ...) for the root module. et_modules_service::configure(cfg, &config.modules); } diff --git a/services/ws/Cargo.toml b/services/ws/Cargo.toml index fa65fa7..7ff1ef6 100644 --- a/services/ws/Cargo.toml +++ b/services/ws/Cargo.toml @@ -6,14 +6,15 @@ license.workspace = true repository.workspace = true [dependencies] -actix = "0.13" actix-web = "4" -actix-web-actors = "4" +actix-ws = "0.3" chrono = { version = "0.4", features = ["serde"] } edge-toolkit = { path = "../../libs/edge-toolkit" } +futures-util = "0.3" opentelemetry = "0.31" serde.workspace = true serde_json.workspace = true serde_yaml = "0.9" +tokio = { version = "1", features = ["macros", "rt", "sync", "time"] } tracing.workspace = true uuid.workspace = true diff --git a/services/ws/src/lib.rs b/services/ws/src/lib.rs index 9fae7ce..3f5d783 100644 --- a/services/ws/src/lib.rs +++ b/services/ws/src/lib.rs @@ -2,23 +2,25 @@ use std::collections::BTreeMap; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Message, StreamHandler}; use actix_web::{Error, HttpRequest, HttpResponse, web}; -use actix_web_actors::ws; +use actix_ws::{AggregatedMessage, AggregatedMessageStream, CloseCode, CloseReason, Session}; use chrono::Utc; use edge_toolkit::ws::{ConnectStatus, MessageDeliveryStatus, MessageScope, WsMessage}; use edge_toolkit::ws_server::{AgentRecord, AgentRegistry, PendingDirectMessage}; +use futures_util::StreamExt as _; use opentelemetry::{ global, trace::{Span, Tracer}, }; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tracing::{error, info, warn}; use uuid::Uuid; pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(15); pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1); -pub type WsAgentRegistry = AgentRegistry>; +pub type AgentSession = UnboundedSender; +pub type WsAgentRegistry = AgentRegistry; /// Load a registry from disk. Sessions are not persisted, so they are initialised to `None`. pub fn load_registry(path: &std::path::Path) -> Result { @@ -57,83 +59,59 @@ pub fn load_registry(path: &std::path::Path) -> Result, + last_activity: Instant, + client_ip: String, + registry: WsAgentRegistry, + session: Session, + outbox: AgentSession, } -pub struct WebSocketActor { - pub agent_id: Option, - pub last_activity: Instant, - pub client_ip: String, - pub registry: WsAgentRegistry, -} - -impl WebSocketActor { - pub fn new(registry: WsAgentRegistry, client_ip: String) -> Self { - info!("New WebSocket actor created for client IP {}", client_ip); +impl Connection { + fn new(registry: WsAgentRegistry, client_ip: String, session: Session, outbox: AgentSession) -> Self { + info!("New WebSocket connection for client IP {}", client_ip); Self { agent_id: None, last_activity: Instant::now(), client_ip, registry, + session, + outbox, } } - pub fn current_agent_id(&self) -> &str { + fn current_agent_id(&self) -> &str { self.agent_id.as_deref().unwrap_or("unassigned") } - pub fn assigned_agent_id(&self) -> Option<&str> { + fn assigned_agent_id(&self) -> Option<&str> { self.agent_id.as_deref() } - pub fn mark_activity(&mut self) { + fn mark_activity(&mut self) { self.last_activity = Instant::now(); } - pub fn start_heartbeat(&self, ctx: &mut ws::WebsocketContext) { - ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { - let idle_for = Instant::now().saturating_duration_since(act.last_activity); - if idle_for > CONNECTION_TIMEOUT { - warn!( - "WebSocket connection timed out for client {} after {:?} of inactivity", - act.current_agent_id(), - idle_for - ); - ctx.close(Some(ws::CloseReason { - code: ws::CloseCode::Policy, - description: Some(format!( - "connection timed out after {:?} of inactivity", - CONNECTION_TIMEOUT - )), - })); - ctx.stop(); - } - }); - } - - fn assign_or_reconnect_agent( - &mut self, - requested_id: Option, - session: Addr, - ) -> (String, ConnectStatus) { + fn assign_or_reconnect_agent(&mut self, requested_id: Option) -> (String, ConnectStatus) { let new_id = Uuid::now_v7().to_string(); - let (assigned_id, status) = self - .registry - .connect_agent(requested_id, new_id, &self.client_ip, session); + let (assigned_id, status) = + self.registry + .connect_agent(requested_id, new_id, &self.client_ip, self.outbox.clone()); self.agent_id = Some(assigned_id.clone()); (assigned_id, status) } - fn send_json(ctx: &mut ws::WebsocketContext, response: &WsMessage) { + async fn send_json(&mut self, response: &WsMessage) { match serde_json::to_string(response) { Ok(json) => { - ctx.text(json); - let tracer = global::tracer("ws-server"); - let mut sent_span = tracer.start("ws.message.sent"); - sent_span.end(); + if let Err(err) = self.session.text(json).await { + warn!("Failed to send message to {}: {:?}", self.current_agent_id(), err); + } else { + let tracer = global::tracer("ws-server"); + let mut sent_span = tracer.start("ws.message.sent"); + sent_span.end(); + } } Err(error) => { error!("Failed to serialize websocket response: {}", error); @@ -141,58 +119,51 @@ impl WebSocketActor { } } - fn send_status( - ctx: &mut ws::WebsocketContext, + async fn send_status( + &mut self, message_id: Option, status: MessageDeliveryStatus, detail: impl Into, ) { - Self::send_json( - ctx, - &WsMessage::MessageStatus { - message_id, - status, - detail: detail.into(), - }, - ); + self.send_json(&WsMessage::MessageStatus { + message_id, + status, + detail: detail.into(), + }) + .await; } - fn send_invalid(ctx: &mut ws::WebsocketContext, message_id: Option, detail: impl Into) { - Self::send_json( - ctx, - &WsMessage::Invalid { - message_id, - detail: detail.into(), - }, - ); + async fn send_invalid(&mut self, message_id: Option, detail: impl Into) { + self.send_json(&WsMessage::Invalid { + message_id, + detail: detail.into(), + }) + .await; } - fn deliver_pending_messages(&self, ctx: &mut ws::WebsocketContext) { - let Some(agent_id) = self.assigned_agent_id() else { + async fn deliver_pending_messages(&mut self) { + let Some(agent_id) = self.assigned_agent_id().map(str::to_string) else { return; }; - for pending in self.registry.pending_messages_for(agent_id) { + for pending in self.registry.pending_messages_for(&agent_id) { info!( "Delivering pending message {} to agent {} from {}", pending.message_id, agent_id, pending.from_agent_id ); - Self::send_json( - ctx, - &WsMessage::AgentMessage { - message_id: pending.message_id, - from_agent_id: pending.from_agent_id, - scope: MessageScope::Direct, - server_received_at: pending.server_received_at, - message: pending.message, - }, - ); + self.send_json(&WsMessage::AgentMessage { + message_id: pending.message_id, + from_agent_id: pending.from_agent_id, + scope: MessageScope::Direct, + server_received_at: pending.server_received_at, + message: pending.message, + }) + .await; } } - fn handle_send_direct( - &self, - ctx: &mut ws::WebsocketContext, - span: &mut impl opentelemetry::trace::Span, + async fn handle_send_direct( + &mut self, + span: &mut impl Span, from_agent_id: String, to_agent_id: String, message: serde_json::Value, @@ -207,89 +178,66 @@ impl WebSocketActor { ); let message_id = pending.message_id.clone(); - if let Some(recipient_addr) = recipient_session { + if let Some(recipient) = recipient_session { info!( "Direct message {} delivered from {} to {}", message_id, from_agent_id, to_agent_id ); - recipient_addr.do_send(ServerEnvelope { - message: WsMessage::AgentMessage { - message_id: message_id.clone(), - from_agent_id, - scope: MessageScope::Direct, - server_received_at: pending.server_received_at, - message: pending.message, - }, + let _ = recipient.send(WsMessage::AgentMessage { + message_id: message_id.clone(), + from_agent_id, + scope: MessageScope::Direct, + server_received_at: pending.server_received_at, + message: pending.message, }); - Self::send_status( - ctx, + self.send_status( Some(message_id), MessageDeliveryStatus::Delivered, format!("message delivered to agent {}", to_agent_id), - ); + ) + .await; } else { info!( "Direct message {} queued from {} to disconnected agent {}", message_id, from_agent_id, to_agent_id ); - Self::send_status( - ctx, + self.send_status( Some(message_id), MessageDeliveryStatus::Queued, format!("message queued for agent {}", to_agent_id), - ); + ) + .await; } span.end(); } -} -impl Actor for WebSocketActor { - type Context = ws::WebsocketContext; - - fn started(&mut self, ctx: &mut Self::Context) { - self.start_heartbeat(ctx); - info!( - "WebSocket connection established for client IP {} with agent {}", - self.client_ip, - self.current_agent_id() - ); - let tracer = global::tracer("ws-server"); - let mut span = tracer.start("ws.connect"); - span.end(); - } - - fn stopped(&mut self, _ctx: &mut Self::Context) { - if let Some(agent_id) = self.agent_id.as_deref() { - self.registry.mark_disconnected(agent_id); - info!("Agent {} disconnected; last known IP {}", agent_id, self.client_ip); - } else { - info!( - "WebSocket connection closed before agent assignment for client IP {}", - self.client_ip - ); - } - } -} - -impl Handler for WebSocketActor { - type Result = (); - - fn handle(&mut self, msg: ServerEnvelope, ctx: &mut Self::Context) -> Self::Result { - Self::send_json(ctx, &msg.message); - } -} - -impl StreamHandler> for WebSocketActor { - fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + /// Returns `false` when the connection should terminate. + async fn handle_inbound(&mut self, msg: AggregatedMessage) -> bool { match msg { - Ok(ws::Message::Ping(ping)) => { + AggregatedMessage::Ping(ping) => { self.mark_activity(); - ctx.pong(&ping); + let _ = self.session.pong(&ping).await; } - Ok(ws::Message::Pong(_)) => { + AggregatedMessage::Pong(_) => { self.mark_activity(); } - Ok(ws::Message::Text(text)) => { + AggregatedMessage::Binary(_) => { + self.mark_activity(); + } + AggregatedMessage::Close(reason) => { + self.mark_activity(); + info!( + "WebSocket close request from client: {} reason: {:?}", + self.current_agent_id(), + reason + ); + let tracer = global::tracer("ws-server"); + let mut span = tracer.start("ws.disconnect"); + span.end(); + let _ = self.session.clone().close(reason).await; + return false; + } + AggregatedMessage::Text(text) => { self.mark_activity(); let tracer = global::tracer("ws-server"); let mut span = tracer.start("ws.message.received"); @@ -303,33 +251,29 @@ impl StreamHandler> for WebSocketActor { "Connect message: requested_agent_id={:?} client_ip={}", requested_id, self.client_ip ); - let (assigned_id, status) = self.assign_or_reconnect_agent(agent_id, ctx.address()); + let (assigned_id, status) = self.assign_or_reconnect_agent(agent_id); info!( "Agent {} status {:?}connected from IP {}", assigned_id, status, self.client_ip ); - Self::send_json( - ctx, - &WsMessage::ConnectAck { - agent_id: assigned_id, - status: status.clone(), - }, - ); + self.send_json(&WsMessage::ConnectAck { + agent_id: assigned_id, + status: status.clone(), + }) + .await; info!( "WebSocket connection ready for client {} with status {:?}", self.current_agent_id(), status ); - self.deliver_pending_messages(ctx); + self.deliver_pending_messages().await; } WsMessage::Alive { timestamp } => { info!("Alive message from client {} at {}", self.current_agent_id(), timestamp); - Self::send_json( - ctx, - &WsMessage::Response { - message: format!("Alive message received at {}", Utc::now().to_rfc3339()), - }, - ); + self.send_json(&WsMessage::Response { + message: format!("Alive message received at {}", Utc::now().to_rfc3339()), + }) + .await; } WsMessage::ListAgents => { let agents = self.registry.list_agents(); @@ -338,67 +282,71 @@ impl StreamHandler> for WebSocketActor { self.current_agent_id(), agents.len() ); - Self::send_json(ctx, &WsMessage::ListAgentsResponse { agents }); + self.send_json(&WsMessage::ListAgentsResponse { agents }).await; } WsMessage::SendAgentMessage { to_agent_id, message } => { let Some(from_agent_id) = self.assigned_agent_id().map(str::to_string) else { - Self::send_invalid(ctx, None, "agent must connect before sending messages"); + self.send_invalid(None, "agent must connect before sending messages") + .await; span.end(); - return; + return true; }; if from_agent_id == to_agent_id { - Self::send_invalid(ctx, None, "agent cannot send a direct message to itself"); + self.send_invalid(None, "agent cannot send a direct message to itself") + .await; span.end(); - return; + return true; } if !self.registry.list_agents().iter().any(|a| a.agent_id == to_agent_id) { - Self::send_invalid(ctx, None, format!("unknown target agent {}", to_agent_id)); + self.send_invalid(None, format!("unknown target agent {}", to_agent_id)) + .await; span.end(); - return; + return true; } - self.handle_send_direct(ctx, &mut span, from_agent_id, to_agent_id, message); - return; + self.handle_send_direct(&mut span, from_agent_id, to_agent_id, message) + .await; + return true; } WsMessage::BroadcastMessage { message } => { let Some(from_agent_id) = self.assigned_agent_id().map(str::to_string) else { - Self::send_invalid(ctx, None, "agent must connect before broadcasting messages"); + self.send_invalid(None, "agent must connect before broadcasting messages") + .await; span.end(); - return; + return true; }; let recipients = self.registry.connected_sessions(&from_agent_id); let message_id = Uuid::now_v7().to_string(); let server_received_at = Utc::now().to_rfc3339(); - for (recipient_id, recipient_addr) in &recipients { + for (recipient_id, recipient) in &recipients { info!( "Broadcast message {} from {} to {}", message_id, from_agent_id, recipient_id ); - recipient_addr.do_send(ServerEnvelope { - message: WsMessage::AgentMessage { - message_id: message_id.clone(), - from_agent_id: from_agent_id.clone(), - scope: MessageScope::Broadcast, - server_received_at: server_received_at.clone(), - message: message.clone(), - }, + let _ = recipient.send(WsMessage::AgentMessage { + message_id: message_id.clone(), + from_agent_id: from_agent_id.clone(), + scope: MessageScope::Broadcast, + server_received_at: server_received_at.clone(), + message: message.clone(), }); } - Self::send_status( - ctx, + self.send_status( Some(message_id), MessageDeliveryStatus::Broadcast, format!("broadcast sent to {} connected agents", recipients.len()), - ); + ) + .await; } WsMessage::MessageAck { message_id } => { let Some(recipient_agent_id) = self.assigned_agent_id().map(str::to_string) else { - Self::send_invalid(ctx, None, "agent must connect before acknowledging messages"); + self.send_invalid(None, "agent must connect before acknowledging messages") + .await; span.end(); - return; + return true; }; match self.registry.acknowledge_message(&recipient_agent_id, &message_id) { @@ -407,25 +355,23 @@ impl StreamHandler> for WebSocketActor { "Agent {} acknowledged direct message {} from {}", recipient_agent_id, message_id, sender_agent_id ); - Self::send_status( - ctx, + self.send_status( Some(message_id.clone()), MessageDeliveryStatus::Acknowledged, "message acknowledged", - ); - if let Some(sender_addr) = sender_session { - sender_addr.do_send(ServerEnvelope { - message: WsMessage::MessageStatus { - message_id: Some(message_id), - status: MessageDeliveryStatus::Acknowledged, - detail: format!("agent {} acknowledged receipt", recipient_agent_id), - }, + ) + .await; + if let Some(sender) = sender_session { + let _ = sender.send(WsMessage::MessageStatus { + message_id: Some(message_id), + status: MessageDeliveryStatus::Acknowledged, + detail: format!("agent {} acknowledged receipt", recipient_agent_id), }); } } Err(detail) => { warn!("Invalid ack from {} for {}: {}", recipient_agent_id, message_id, detail); - Self::send_invalid(ctx, Some(message_id), detail); + self.send_invalid(Some(message_id), detail).await; } } } @@ -461,19 +407,17 @@ impl StreamHandler> for WebSocketActor { ); } WsMessage::StoreFile { filename } => { - let Some(agent_id) = self.assigned_agent_id() else { - Self::send_invalid(ctx, None, "agent must connect before storing files"); + let Some(agent_id) = self.assigned_agent_id().map(str::to_string) else { + self.send_invalid(None, "agent must connect before storing files").await; span.end(); - return; + return true; }; let url = format!("/storage/{}/{}", agent_id, filename); info!("Agent {} requested storage URL for {}: {}", agent_id, filename, url); - Self::send_json( - ctx, - &WsMessage::Response { - message: format!("PUT to {}", url), - }, - ); + self.send_json(&WsMessage::Response { + message: format!("PUT to {}", url), + }) + .await; } WsMessage::FetchFile { agent_id, filename } => { let url = format!("/storage/{}/{}", agent_id, filename); @@ -483,12 +427,10 @@ impl StreamHandler> for WebSocketActor { agent_id, filename ); - Self::send_json( - ctx, - &WsMessage::Response { - message: format!("GET from {}", url), - }, - ); + self.send_json(&WsMessage::Response { + message: format!("GET from {}", url), + }) + .await; } WsMessage::ConnectAck { .. } | WsMessage::ListAgentsResponse { .. } @@ -511,35 +453,80 @@ impl StreamHandler> for WebSocketActor { } span.end(); } - Ok(ws::Message::Close(reason)) => { - self.mark_activity(); - info!( - "WebSocket close request from client: {} reason: {:?}", - self.current_agent_id(), - reason - ); - let tracer = global::tracer("ws-server"); - let mut span = tracer.start("ws.disconnect"); - span.end(); - ctx.close(reason); - ctx.stop(); - } - Ok(ws::Message::Binary(_)) | Ok(ws::Message::Continuation(_)) | Ok(ws::Message::Nop) => { - self.mark_activity(); - } - Err(e) => { - error!("WebSocket error for client {}: {:?}", self.current_agent_id(), e); - let tracer = global::tracer("ws-server"); - let mut span = tracer.start("ws.error"); - span.end(); + } + true + } + + async fn run(mut self, mut stream: AggregatedMessageStream, mut outbound: UnboundedReceiver) { + let tracer = global::tracer("ws-server"); + let mut connect_span = tracer.start("ws.connect"); + info!( + "WebSocket connection established for client IP {} with agent {}", + self.client_ip, + self.current_agent_id() + ); + connect_span.end(); + + let mut heartbeat = tokio::time::interval(HEARTBEAT_INTERVAL); + heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + tokio::select! { + msg = stream.next() => { + match msg { + Some(Ok(msg)) => { + if !self.handle_inbound(msg).await { + break; + } + } + Some(Err(e)) => { + error!("WebSocket error for client {}: {:?}", self.current_agent_id(), e); + let mut err_span = tracer.start("ws.error"); + err_span.end(); + break; + } + None => break, + } + } + Some(envelope) = outbound.recv() => { + self.send_json(&envelope).await; + } + _ = heartbeat.tick() => { + let idle_for = Instant::now().saturating_duration_since(self.last_activity); + if idle_for > CONNECTION_TIMEOUT { + warn!( + "WebSocket connection timed out for client {} after {:?} of inactivity", + self.current_agent_id(), + idle_for + ); + let _ = self.session.clone().close(Some(CloseReason { + code: CloseCode::Policy, + description: Some(format!( + "connection timed out after {:?} of inactivity", + CONNECTION_TIMEOUT + )), + })).await; + break; + } + } } } + + if let Some(agent_id) = self.agent_id.as_deref() { + self.registry.mark_disconnected(agent_id); + info!("Agent {} disconnected; last known IP {}", agent_id, self.client_ip); + } else { + info!( + "WebSocket connection closed before agent assignment for client IP {}", + self.client_ip + ); + } } } pub async fn ws_handler( req: HttpRequest, - stream: web::Payload, + body: web::Payload, registry: web::Data, ) -> Result { let tracer = global::tracer("ws-server"); @@ -555,11 +542,18 @@ pub async fn ws_handler( }) .unwrap_or_else(|| "unknown".to_string()); - let actor = WebSocketActor::new(registry.get_ref().clone(), client_ip); - let result = ws::start(actor, &req, stream); + let (response, session, msg_stream) = actix_ws::handle(&req, body)?; + let stream = msg_stream.max_frame_size(64 * 1024).aggregate_continuations(); + + let (tx, rx) = mpsc::unbounded_channel::(); + let conn = Connection::new(registry.get_ref().clone(), client_ip, session, tx); + + actix_web::rt::spawn(async move { + conn.run(stream, rx).await; + }); span.end(); - result + Ok(response) } pub fn configure(cfg: &mut web::ServiceConfig) { From 944508455bfb0b7c9f654a569c3a8b57e8179538 Mon Sep 17 00:00:00 2001 From: John Vandenberg Date: Mon, 11 May 2026 09:42:19 +0800 Subject: [PATCH 2/2] cargo update --- Cargo.lock | 114 ++++++++++++++++++++++++----------------------------- 1 file changed, 52 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33f4555..5fab8ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -485,9 +485,9 @@ checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "bytestring" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "113b4343b5f6617e7ad401ced8de3cc8b012e73a594347c307b90db3e9271289" +checksum = "86566c496f2f47d9b8147a4c8b02ffdb69c919fe0c2b2e7195d22cbba0e635c9" dependencies = [ "bytes", ] @@ -500,9 +500,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.61" +version = "1.2.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16d90359e986641506914ba71350897565610e87ce0ad9e6f28569db3dd5c6d" +checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" dependencies = [ "find-msvc-tools", "jobserver", @@ -598,9 +598,9 @@ checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" [[package]] name = "const-hex" -version = "1.18.1" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "531185e432bb31db1ecda541e9e7ab21468d4d844ad7505e0546a49b4945d49b" +checksum = "20d9a563d167a9cce0f94153382b33cb6eded6dfabff03c69ad65a28ea1514e0" dependencies = [ "cfg-if", "cpufeatures 0.2.17", @@ -818,9 +818,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4850db49bf08e663084f7fb5c87d202ef91a3907271aff24a94eb97ff039153c" +checksum = "f1dd6dbb5841937940781866fa1281a1ff7bd3bf827091440879f9994983d5c2" dependencies = [ "block-buffer", "const-oid", @@ -1457,9 +1457,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.17.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" +checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" [[package]] name = "heck" @@ -1542,9 +1542,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hybrid-array" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d46837a0ed51fe95bd3b05de33cd64a1ee88fc797477ca48446872504507c5" +checksum = "9155a582abd142abc056962c29e3ce5ff2ad5469f4246b537ed42c5deba857da" dependencies = [ "typenum", ] @@ -1723,9 +1723,9 @@ dependencies = [ [[package]] name = "idna_adapter" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +checksum = "cb68373c0d6620ef8105e855e7745e18b0d00d3bdb07fb532e434244cdb9a714" dependencies = [ "icu_normalizer", "icu_properties", @@ -1744,7 +1744,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" dependencies = [ "equivalent", - "hashbrown 0.17.0", + "hashbrown 0.17.1", "serde", "serde_core", ] @@ -1755,16 +1755,6 @@ version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" -[[package]] -name = "iri-string" -version = "0.7.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -1798,9 +1788,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.95" +version = "0.3.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" +checksum = "67df7112613f8bfd9150013a0314e196f4800d3201ae742489d999db2f979f08" dependencies = [ "cfg-if", "futures-util", @@ -2101,9 +2091,9 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "onnx-extractor" -version = "0.3.5" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dcef162d82101e4b8a54631b983d5aee84f7657a113c62a322b4e5de46a417c" +checksum = "7f8a7b1752ce77203505f0939b9861ec10905100d017720bc66400502ce1e531" dependencies = [ "memmap2", "prost", @@ -2256,18 +2246,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.11" +version = "1.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" +checksum = "cbf0d9e68100b3a7989b4901972f265cd542e560a3a8a724e1e20322f4d06ce9" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.11" +version = "1.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" +checksum = "a990e22f43e84855daf260dded30524ef4a9021cc7541c26540500a50b624389" dependencies = [ "proc-macro2", "quote", @@ -2705,9 +2695,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.39" +version = "0.23.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c2c118cb077cca2822033836dfb1b975355dfb784b5e8da48f7b6c5db74e60e" +checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" dependencies = [ "aws-lc-rs", "log", @@ -3110,9 +3100,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.52.1" +version = "1.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6" +checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" dependencies = [ "bytes", "libc", @@ -3243,9 +3233,9 @@ checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" [[package]] name = "tonic" -version = "0.14.5" +version = "0.14.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fec7c61a0695dc1887c1b53952990f3ad2e3a31453e1f49f10e75424943a93ec" +checksum = "ac2a5518c70fa84342385732db33fb3f44bc4cc748936eb5833d2df34d6445ef" dependencies = [ "async-trait", "base64", @@ -3264,9 +3254,9 @@ dependencies = [ [[package]] name = "tonic-prost" -version = "0.14.5" +version = "0.14.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a55376a0bbaa4975a3f10d009ad763d8f4108f067c7c2e74f3001fb49778d309" +checksum = "50849f68853be452acf590cde0b146665b8d507b3b8af17261df47e02c209ea0" dependencies = [ "bytes", "prost", @@ -3290,20 +3280,20 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.8" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +checksum = "68d6fdd9f81c2819c9a8b0e0cd91660e7746a8e6ea2ba7c6b2b057985f6bcb51" dependencies = [ "bitflags", "bytes", "futures-util", "http 1.4.0", "http-body", - "iri-string", "pin-project-lite", "tower", "tower-layer", "tower-service", + "url", ] [[package]] @@ -3575,9 +3565,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.118" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" +checksum = "49ace1d07c165b0864824eee619580c4689389afa9dc9ed3a4c75040d82e6790" dependencies = [ "cfg-if", "once_cell", @@ -3588,9 +3578,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.68" +version = "0.4.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f371d383f2fb139252e0bfac3b81b265689bf45b6874af544ffa4c975ac1ebf8" +checksum = "96492d0d3ffba25305a7dc88720d250b1401d7edca02cc3bcd50633b424673b8" dependencies = [ "js-sys", "wasm-bindgen", @@ -3598,9 +3588,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.118" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" +checksum = "8e68e6f4afd367a562002c05637acb8578ff2dea1943df76afb9e83d177c8578" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3608,9 +3598,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.118" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" +checksum = "d95a9ec35c64b2a7cb35d3fead40c4238d0940c86d107136999567a4703259f2" dependencies = [ "bumpalo", "proc-macro2", @@ -3621,18 +3611,18 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.118" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" +checksum = "c4e0100b01e9f0d03189a92b96772a1fb998639d981193d7dbab487302513441" dependencies = [ "unicode-ident", ] [[package]] name = "wasm-bindgen-test" -version = "0.3.68" +version = "0.3.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bb55e2540ad1c56eec35fd63e2aea15f83b11ce487fd2de9ad11578dfc047ea" +checksum = "af5ec93229ad9ccd0a545a516dec76dc276613f278f6a91aa6b463d5b33d42d0" dependencies = [ "async-trait", "cast", @@ -3652,9 +3642,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-test-macro" -version = "0.3.68" +version = "0.3.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf0ca1bd612b988616bac1ab34c4e4290ef18f7148a1d8b7f31c150080e9295" +checksum = "3c81b9fef827e575e0e54431736d1baa0d700315d8c62cfef1f61fa3aad0cbeb" dependencies = [ "proc-macro2", "quote", @@ -3663,9 +3653,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-test-shared" -version = "0.2.118" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23cda5ecc67248c48d3e705d3e03e00af905769b78b9d2a1678b663b8b9d4472" +checksum = "4f4d8ae7ad5440360e9799dfd42857d126454a88441ddf72d288ef83fa47f527" [[package]] name = "wasm-encoder" @@ -3703,9 +3693,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.95" +version = "0.3.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f2dfbb17949fa2088e5d39408c48368947b86f7834484e87b73de55bc14d97d" +checksum = "4b572dff8bcf38bad0fa19729c89bb5748b2b9b1d8be70cf90df697e3a8f32aa" dependencies = [ "js-sys", "wasm-bindgen",