Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 49 additions & 16 deletions crates/terraphim_orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ use std::collections::HashMap;
use std::path::Path;
use std::time::{Duration, Instant};

use std::sync::{Arc, Mutex};

use terraphim_router::RoutingEngine;
use terraphim_spawner::health::HealthStatus;
use terraphim_spawner::health::{CircuitBreaker, HealthStatus};
use terraphim_spawner::output::OutputEvent;
use terraphim_spawner::{AgentHandle, AgentSpawner};
use terraphim_spawner::{AgentHandle, AgentSpawner, SpawnRequest};
use tokio::sync::broadcast;
use tracing::{error, info, warn};

Expand Down Expand Up @@ -123,6 +125,9 @@ pub struct AgentOrchestrator {
persona_registry: PersonaRegistry,
/// Renderer for persona metaprompts.
metaprompt_renderer: MetapromptRenderer,
/// Circuit breakers for each provider to prevent cascading failures.
#[allow(dead_code)]
circuit_breakers: Arc<Mutex<HashMap<String, CircuitBreaker>>>,
}

/// Validate agent name for safe use in file paths.
Expand Down Expand Up @@ -209,6 +214,7 @@ impl AgentOrchestrator {
cost_tracker,
persona_registry,
metaprompt_renderer,
circuit_breakers: Arc::new(Mutex::new(HashMap::new())),
})
}

Expand Down Expand Up @@ -500,8 +506,8 @@ impl AgentOrchestrator {
const STDIN_THRESHOLD: usize = 32_768; // 32 KB
let use_stdin = persona_found || composed_task.len() > STDIN_THRESHOLD;

// Build a Provider from the agent definition for the spawner
let provider = terraphim_types::capability::Provider {
// Build primary Provider from the agent definition for the spawner
let primary_provider = terraphim_types::capability::Provider {
id: def.name.clone(),
name: def.name.clone(),
provider_type: terraphim_types::capability::ProviderType::Agent {
Expand All @@ -515,19 +521,46 @@ impl AgentOrchestrator {
keywords: def.capabilities.clone(),
};

let handle = if use_stdin {
self.spawner
.spawn_with_model_stdin(&provider, &composed_task, model.as_deref())
.await
} else {
self.spawner
.spawn_with_model(&provider, &composed_task, model.as_deref())
.await
// Build fallback Provider if fallback_provider is configured
let fallback_provider = def.fallback_provider.as_ref().map(|fallback_cli| {
terraphim_types::capability::Provider {
id: format!("{}-fallback", def.name),
name: format!("{} (fallback)", def.name),
provider_type: terraphim_types::capability::ProviderType::Agent {
agent_id: format!("{}-fallback", def.name),
cli_command: fallback_cli.clone(),
working_dir: self.config.working_dir.clone(),
},
capabilities: vec![],
cost_level: terraphim_types::capability::CostLevel::Cheap,
latency: terraphim_types::capability::Latency::Medium,
keywords: def.capabilities.clone(),
}
});

// Build the spawn request with primary and fallback
let mut request = SpawnRequest::new(primary_provider, &composed_task)
.with_primary_model(model.as_deref().unwrap_or(""));

if let Some(fallback) = fallback_provider {
request = request.with_fallback_provider(fallback);
if let Some(fallback_model) = &def.fallback_model {
request = request.with_fallback_model(fallback_model);
}
}

if use_stdin {
request = request.with_stdin();
}
.map_err(|e| OrchestratorError::SpawnFailed {
agent: def.name.clone(),
reason: e.to_string(),
})?;

let handle = self
.spawner
.spawn_with_fallback(&request)
.await
.map_err(|e| OrchestratorError::SpawnFailed {
agent: def.name.clone(),
reason: e.to_string(),
})?;

// Subscribe to the output broadcast for nightwatch drain
let output_rx = handle.subscribe_output();
Expand Down
142 changes: 142 additions & 0 deletions crates/terraphim_spawner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,64 @@ pub enum SpawnerError {
ConfigValidation(#[from] ValidationError),
}

/// Request to spawn an agent with primary and fallback configuration.
///
/// If the primary provider fails to spawn, the spawner will automatically
/// retry with the fallback provider (if configured).
#[derive(Debug, Clone)]
pub struct SpawnRequest {
/// Primary provider configuration
pub primary_provider: Provider,
/// Primary model to use (if applicable)
pub primary_model: Option<String>,
/// Fallback provider configuration (if primary fails)
pub fallback_provider: Option<Provider>,
/// Fallback model to use (if applicable)
pub fallback_model: Option<String>,
/// Task/prompt to give the agent
pub task: String,
/// Whether to deliver task via stdin (for large prompts)
pub use_stdin: bool,
}

impl SpawnRequest {
/// Create a new spawn request with primary provider and task.
pub fn new(primary_provider: Provider, task: impl Into<String>) -> Self {
Self {
primary_provider,
primary_model: None,
fallback_provider: None,
fallback_model: None,
task: task.into(),
use_stdin: false,
}
}

/// Set the primary model.
pub fn with_primary_model(mut self, model: impl Into<String>) -> Self {
self.primary_model = Some(model.into());
self
}

/// Set the fallback provider.
pub fn with_fallback_provider(mut self, provider: Provider) -> Self {
self.fallback_provider = Some(provider);
self
}

/// Set the fallback model.
pub fn with_fallback_model(mut self, model: impl Into<String>) -> Self {
self.fallback_model = Some(model.into());
self
}

/// Use stdin for task delivery (for large prompts).
pub fn with_stdin(mut self) -> Self {
self.use_stdin = true;
self
}
}

/// Handle to a spawned agent process
#[derive(Debug)]
pub struct AgentHandle {
Expand Down Expand Up @@ -381,6 +439,90 @@ impl AgentSpawner {
self.spawn_config(provider, &config, task, false).await
}

/// Spawn an agent with primary and fallback configuration.
///
/// Attempts to spawn with the primary provider first. If that fails,
/// falls back to the fallback provider (if configured).
pub async fn spawn_with_fallback(
&self,
request: &SpawnRequest,
) -> Result<AgentHandle, SpawnerError> {
// Try primary first
let primary_result = if request.use_stdin {
self.spawn_with_model_stdin(
&request.primary_provider,
&request.task,
request.primary_model.as_deref(),
)
.await
} else {
self.spawn_with_model(
&request.primary_provider,
&request.task,
request.primary_model.as_deref(),
)
.await
};

// If primary succeeds, return the handle
match primary_result {
Ok(handle) => Ok(handle),
Err(primary_err) => {
tracing::warn!(
primary_provider = %request.primary_provider.id,
error = %primary_err,
"Primary spawn failed, attempting fallback"
);

// Try fallback if configured
if let Some(ref fallback) = request.fallback_provider {
tracing::info!(
fallback_provider = %fallback.id,
"Attempting fallback spawn"
);

let fallback_result = if request.use_stdin {
self.spawn_with_model_stdin(
fallback,
&request.task,
request.fallback_model.as_deref(),
)
.await
} else {
self.spawn_with_model(
fallback,
&request.task,
request.fallback_model.as_deref(),
)
.await
};

match fallback_result {
Ok(handle) => {
tracing::info!(
fallback_provider = %fallback.id,
"Fallback spawn succeeded"
);
Ok(handle)
}
Err(fallback_err) => {
tracing::error!(
fallback_provider = %fallback.id,
error = %fallback_err,
"Fallback spawn also failed"
);
// Return the primary error since that's the original failure
Err(primary_err)
}
}
} else {
// No fallback configured, return primary error
Err(primary_err)
}
}
}
}

/// Internal spawn implementation shared by spawn() and spawn_with_model().
async fn spawn_config(
&self,
Expand Down
Loading