diff --git a/crates/terraphim_orchestrator/src/lib.rs b/crates/terraphim_orchestrator/src/lib.rs index f5c67b5da..0b9c53a1b 100644 --- a/crates/terraphim_orchestrator/src/lib.rs +++ b/crates/terraphim_orchestrator/src/lib.rs @@ -65,10 +65,12 @@ use std::collections::HashMap; use std::path::Path; use std::time::{Duration, Instant}; +use std::sync::{Arc, Mutex}; + use terraphim_router::RoutingEngine; -use terraphim_spawner::health::HealthStatus; +use terraphim_spawner::health::{CircuitBreaker, HealthStatus}; use terraphim_spawner::output::OutputEvent; -use terraphim_spawner::{AgentHandle, AgentSpawner}; +use terraphim_spawner::{AgentHandle, AgentSpawner, SpawnRequest}; use tokio::sync::broadcast; use tracing::{error, info, warn}; @@ -123,6 +125,9 @@ pub struct AgentOrchestrator { persona_registry: PersonaRegistry, /// Renderer for persona metaprompts. metaprompt_renderer: MetapromptRenderer, + /// Circuit breakers for each provider to prevent cascading failures. + #[allow(dead_code)] + circuit_breakers: Arc>>, } /// Validate agent name for safe use in file paths. @@ -209,6 +214,7 @@ impl AgentOrchestrator { cost_tracker, persona_registry, metaprompt_renderer, + circuit_breakers: Arc::new(Mutex::new(HashMap::new())), }) } @@ -500,8 +506,8 @@ impl AgentOrchestrator { const STDIN_THRESHOLD: usize = 32_768; // 32 KB let use_stdin = persona_found || composed_task.len() > STDIN_THRESHOLD; - // Build a Provider from the agent definition for the spawner - let provider = terraphim_types::capability::Provider { + // Build primary Provider from the agent definition for the spawner + let primary_provider = terraphim_types::capability::Provider { id: def.name.clone(), name: def.name.clone(), provider_type: terraphim_types::capability::ProviderType::Agent { @@ -515,19 +521,46 @@ impl AgentOrchestrator { keywords: def.capabilities.clone(), }; - let handle = if use_stdin { - self.spawner - .spawn_with_model_stdin(&provider, &composed_task, model.as_deref()) - .await - } else { - self.spawner - .spawn_with_model(&provider, &composed_task, model.as_deref()) - .await + // Build fallback Provider if fallback_provider is configured + let fallback_provider = def.fallback_provider.as_ref().map(|fallback_cli| { + terraphim_types::capability::Provider { + id: format!("{}-fallback", def.name), + name: format!("{} (fallback)", def.name), + provider_type: terraphim_types::capability::ProviderType::Agent { + agent_id: format!("{}-fallback", def.name), + cli_command: fallback_cli.clone(), + working_dir: self.config.working_dir.clone(), + }, + capabilities: vec![], + cost_level: terraphim_types::capability::CostLevel::Cheap, + latency: terraphim_types::capability::Latency::Medium, + keywords: def.capabilities.clone(), + } + }); + + // Build the spawn request with primary and fallback + let mut request = SpawnRequest::new(primary_provider, &composed_task) + .with_primary_model(model.as_deref().unwrap_or("")); + + if let Some(fallback) = fallback_provider { + request = request.with_fallback_provider(fallback); + if let Some(fallback_model) = &def.fallback_model { + request = request.with_fallback_model(fallback_model); + } + } + + if use_stdin { + request = request.with_stdin(); } - .map_err(|e| OrchestratorError::SpawnFailed { - agent: def.name.clone(), - reason: e.to_string(), - })?; + + let handle = self + .spawner + .spawn_with_fallback(&request) + .await + .map_err(|e| OrchestratorError::SpawnFailed { + agent: def.name.clone(), + reason: e.to_string(), + })?; // Subscribe to the output broadcast for nightwatch drain let output_rx = handle.subscribe_output(); diff --git a/crates/terraphim_spawner/src/lib.rs b/crates/terraphim_spawner/src/lib.rs index f0b8b52a9..06b35525d 100644 --- a/crates/terraphim_spawner/src/lib.rs +++ b/crates/terraphim_spawner/src/lib.rs @@ -53,6 +53,64 @@ pub enum SpawnerError { ConfigValidation(#[from] ValidationError), } +/// Request to spawn an agent with primary and fallback configuration. +/// +/// If the primary provider fails to spawn, the spawner will automatically +/// retry with the fallback provider (if configured). +#[derive(Debug, Clone)] +pub struct SpawnRequest { + /// Primary provider configuration + pub primary_provider: Provider, + /// Primary model to use (if applicable) + pub primary_model: Option, + /// Fallback provider configuration (if primary fails) + pub fallback_provider: Option, + /// Fallback model to use (if applicable) + pub fallback_model: Option, + /// Task/prompt to give the agent + pub task: String, + /// Whether to deliver task via stdin (for large prompts) + pub use_stdin: bool, +} + +impl SpawnRequest { + /// Create a new spawn request with primary provider and task. + pub fn new(primary_provider: Provider, task: impl Into) -> Self { + Self { + primary_provider, + primary_model: None, + fallback_provider: None, + fallback_model: None, + task: task.into(), + use_stdin: false, + } + } + + /// Set the primary model. + pub fn with_primary_model(mut self, model: impl Into) -> Self { + self.primary_model = Some(model.into()); + self + } + + /// Set the fallback provider. + pub fn with_fallback_provider(mut self, provider: Provider) -> Self { + self.fallback_provider = Some(provider); + self + } + + /// Set the fallback model. + pub fn with_fallback_model(mut self, model: impl Into) -> Self { + self.fallback_model = Some(model.into()); + self + } + + /// Use stdin for task delivery (for large prompts). + pub fn with_stdin(mut self) -> Self { + self.use_stdin = true; + self + } +} + /// Handle to a spawned agent process #[derive(Debug)] pub struct AgentHandle { @@ -381,6 +439,90 @@ impl AgentSpawner { self.spawn_config(provider, &config, task, false).await } + /// Spawn an agent with primary and fallback configuration. + /// + /// Attempts to spawn with the primary provider first. If that fails, + /// falls back to the fallback provider (if configured). + pub async fn spawn_with_fallback( + &self, + request: &SpawnRequest, + ) -> Result { + // Try primary first + let primary_result = if request.use_stdin { + self.spawn_with_model_stdin( + &request.primary_provider, + &request.task, + request.primary_model.as_deref(), + ) + .await + } else { + self.spawn_with_model( + &request.primary_provider, + &request.task, + request.primary_model.as_deref(), + ) + .await + }; + + // If primary succeeds, return the handle + match primary_result { + Ok(handle) => Ok(handle), + Err(primary_err) => { + tracing::warn!( + primary_provider = %request.primary_provider.id, + error = %primary_err, + "Primary spawn failed, attempting fallback" + ); + + // Try fallback if configured + if let Some(ref fallback) = request.fallback_provider { + tracing::info!( + fallback_provider = %fallback.id, + "Attempting fallback spawn" + ); + + let fallback_result = if request.use_stdin { + self.spawn_with_model_stdin( + fallback, + &request.task, + request.fallback_model.as_deref(), + ) + .await + } else { + self.spawn_with_model( + fallback, + &request.task, + request.fallback_model.as_deref(), + ) + .await + }; + + match fallback_result { + Ok(handle) => { + tracing::info!( + fallback_provider = %fallback.id, + "Fallback spawn succeeded" + ); + Ok(handle) + } + Err(fallback_err) => { + tracing::error!( + fallback_provider = %fallback.id, + error = %fallback_err, + "Fallback spawn also failed" + ); + // Return the primary error since that's the original failure + Err(primary_err) + } + } + } else { + // No fallback configured, return primary error + Err(primary_err) + } + } + } + } + /// Internal spawn implementation shared by spawn() and spawn_with_model(). async fn spawn_config( &self,