Prodigy is a workflow orchestration tool that executes Claude commands through structured YAML workflows. It provides session management, state tracking, and supports parallel execution through MapReduce patterns.
Prodigy follows the "Functional Core, Imperative Shell" pattern to achieve:
- Testability: Pure functions are easy to test without mocks or complex setup
- Maintainability: Clear separation between business logic and I/O operations
- Predictability: Pure functions always produce the same output for the same input
- Composability: Small, focused functions that combine to create complex behavior
The functional core contains all business logic as pure functions:
- Pure Functions Only: No side effects, no I/O operations
- Data Transformations: Functions take inputs and return outputs
- Immutable Data Flow: Data is transformed, not mutated
- Easy Testing: No mocks required, just input/output assertions
src/core/config/: Configuration validation and transformation logicsrc/core/session/: Session state calculations and transitionssrc/core/workflow/: Workflow parsing and validation logicsrc/core/mapreduce/: MapReduce work distribution calculationssrc/core/validation/: Data validation and constraint checking
The imperative shell handles all I/O and side effects, delegating business logic to the core:
- I/O Operations: File system, network, database access
- Side Effects: Logging, metrics, external system calls
- Thin Layer: Minimal logic, primarily orchestration
- Core Delegation: Business logic delegated to pure functions
src/storage/: Persistence layer wrapping core data structuressrc/worktree/: Git operations wrapping core worktree logicsrc/cook/execution/: Command execution wrapping core workflow logicsrc/cli/: User interface wrapping core command processing
Look for functions that:
- Mix I/O with Logic: File operations interleaved with business rules
- Have Multiple Responsibilities: Both calculating and persisting data
- Are Hard to Test: Require mocks, stubs, or complex test setup
- Have Side Effects: Modify global state, write files, or make network calls
- Are Large: Functions over 20 lines often mix concerns
// BEFORE: Mixed function
fn process_workflow(path: &Path) -> Result<()> {
let content = fs::read_to_string(path)?; // I/O
let workflow = serde_yaml::from_str(&content)?; // Logic
// Validation logic mixed with I/O
if workflow.steps.is_empty() {
log::error!("Empty workflow"); // Side effect
return Err(anyhow!("Invalid workflow"));
}
fs::write("output.json", serde_json::to_string(&workflow)?)?; // I/O
Ok(())
}
// AFTER: Separated concerns
// Pure function in src/core/workflow/
pub fn validate_workflow(workflow: &Workflow) -> Result<ValidatedWorkflow> {
if workflow.steps.is_empty() {
return Err(ValidationError::EmptyWorkflow);
}
Ok(ValidatedWorkflow::from(workflow))
}
// I/O wrapper in src/cook/workflow/
fn process_workflow(path: &Path) -> Result<()> {
let content = fs::read_to_string(path)?; // I/O
let workflow = serde_yaml::from_str(&content)?;
let validated = core::workflow::validate_workflow(&workflow)?; // Pure logic
log::info!("Workflow validated"); // Side effect
fs::write("output.json", serde_json::to_string(&validated)?)?; // I/O
Ok(())
}// Pure transformation pipeline in core
pub fn transform_session(
session: Session,
event: Event
) -> Result<Session> {
session
.apply_event(event)
.and_then(validate_transition)
.map(calculate_metrics)
.map(update_timestamps)
}
// I/O shell uses the pipeline
async fn handle_event(event: Event) -> Result<()> {
let session = storage.load_session()?; // I/O
let updated = core::session::transform_session(session, event)?; // Pure
storage.save_session(updated)?; // I/O
Ok(())
}// BEFORE: Complex inline logic
if item.score > 5 && item.category == "critical" &&
(item.retry_count < 3 || item.override_retry) {
// process item
}
// AFTER: Named predicate function
fn should_process_item(item: &Item) -> bool {
is_high_priority(item) && has_retries_available(item)
}
fn is_high_priority(item: &Item) -> bool {
item.score > 5 && item.category == "critical"
}
fn has_retries_available(item: &Item) -> bool {
item.retry_count < 3 || item.override_retry
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_workflow() {
// Simple input/output test, no mocks needed
let workflow = Workflow { steps: vec![] };
let result = validate_workflow(&workflow);
assert!(result.is_err());
}
}#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_process_workflow() {
let dir = tempdir().unwrap();
let path = dir.path().join("workflow.yaml");
// Test only the I/O orchestration
// Business logic is tested in core module tests
fs::write(&path, "steps: [test]").unwrap();
let result = process_workflow(&path);
assert!(result.is_ok());
}
}The unified session management system provides a single, consolidated approach to handling all session-related functionality across the application. This replaces the previous multi-module session approach with a centralized, consistent model.
-
SessionManager (
manager.rs): Central interface for all session operations- Creates, updates, and manages session lifecycle
- Direct storage integration (no abstract traits)
- Handles session persistence and recovery
- Provides filtering and listing capabilities
- In-memory cache for active sessions with persistent backing
-
SessionState (
state.rs): Session state and metadataUnifiedSession: Core session representation for all session typesSessionStatus: Running, Completed, Failed, Paused, InitializingSessionType: Workflow or MapReduceWorkflowSession: Workflow-specific data (iterations, files changed, steps)MapReduceSession: MapReduce-specific data (items, agents, phases)- Support for checkpointing and resumption
- Built-in timing tracking
-
CookSessionAdapter (
cook_adapter.rs): Transitional bridge for cook module- Implements cook's existing SessionManager trait
- Maps cook session operations to unified session operations
- Handles special metadata keys for incremental updates
- Maintains backward compatibility during migration
-
Migration (
migration.rs): Handles migration from legacy session formats- Auto-detects legacy session data
- One-time migration to new unified format
- Preserves historical session data
- Archives old data after successful migration
-
Timing (
timing.rs): Performance tracking utilitiesTimingTracker: Tracks iteration and command durationsformat_duration: Human-readable duration formatting
The cook module handles workflow execution and orchestration.
-
Orchestrator (
orchestrator.rs): Main workflow execution engine- Manages workflow lifecycle
- Coordinates between different executors
- Handles error recovery and retries
-
Workflow (
workflow/): Workflow definition and executionWorkflowExecutor: Executes workflow steps sequentially- Checkpoint support for resumption
- Validation and error handling
-
MapReduce (
execution/mapreduce/): Parallel execution framework- Distributes work across multiple agents
- Handles map phase, reduce phase, and aggregation
- Dead Letter Queue (DLQ) for failed items
-
Session (
session/): Legacy session abstractions (transitional)- SessionManager trait definition (being phased out)
- Session state and status tracking (migrating to unified)
- Integration via CookSessionAdapter bridge
Global storage architecture for persistent data.
-
GlobalStorage: Centralized storage management
- Events, state, and DLQ storage
- Cross-worktree data sharing
- Efficient deduplication
-
Event Logging: Comprehensive event tracking
- Structured event storage
- Support for querying and filtering
- Integration with Claude streaming
Git worktree-based isolation for parallel execution.
- Creates isolated environments for each session
- Manages worktree lifecycle
- Handles merge operations back to main branch
Simple JSON-based state persistence for project metadata.
- Human-readable JSON files
- Git-friendly text format
- Zero configuration required
- Atomic operations for concurrent access
-
Session Creation:
User Command → Cook Module → CookSessionAdapter → UnifiedSessionManager → Storage -
Workflow Execution:
Workflow Config → Orchestrator → WorkflowExecutor → Session Updates → Storage -
MapReduce Processing:
MapReduce Config → MapReduceExecutor → Agent Spawning → Parallel Execution → Aggregation
The unified session system consolidates all session management into a single, consistent model that serves both workflow and MapReduce executions:
UnifiedSession {
id: SessionId,
session_type: SessionType (Workflow | MapReduce),
status: SessionStatus,
metadata: HashMap<String, Value>,
started_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
completed_at: Option<DateTime<Utc>>,
checkpoints: Vec<Checkpoint>,
timings: BTreeMap<String, Duration>,
error: Option<String>,
// Type-specific data
workflow_data: Option<WorkflowSession>,
mapreduce_data: Option<MapReduceSession>,
}- Initialization: Session created with type-specific configuration
- Running: Active execution with real-time progress tracking
- Updates: Incremental updates via SessionUpdate enum
- Checkpointing: State snapshots for fault tolerance
- Completion: Final state with success/failure status
- Persistence: Automatic save to GlobalStorage
The unified session system is now the primary session management layer:
- Direct Usage: New code uses
UnifiedSessionManagerdirectly - No Abstract Traits: Removed
SessionStoragetrait for simplicity - Cook Compatibility:
CookSessionAdapterprovides temporary bridge - Single Source of Truth: All session data flows through unified system
- Migration Path: Legacy sessions auto-migrated on first access
Prodigy tracks the original branch when creating worktrees to enable intelligent merge behavior:
State Tracking:
WorktreeState {
original_branch: String, // Branch at worktree creation time
branch: String, // Current worktree branch (prodigy-session-*)
// ... other fields
}Branch Resolution Logic:
- Capture:
create_session()captures current branch viagit rev-parse --abbrev-ref HEAD - Storage: Original branch stored in
WorktreeStatefor session lifetime - Merge Target:
get_merge_target()returns original branch or falls back to default - Fallback: If original branch deleted, uses default branch (main/master)
Design Rationale:
- Supports feature branch workflows where worktrees should merge back to source branch
- Provides safe fallback when original branch is deleted
- Enables flexible merge target selection based on workflow context
- Improves user experience by showing merge target in confirmation prompts
~/.prodigy/
├── events/
│ └── {repo_name}/
│ └── {job_id}/
│ └── events-{timestamp}.jsonl
├── state/
│ └── {repo_name}/
│ └── sessions/
│ └── {session_id}.json
├── dlq/
│ └── {repo_name}/
│ └── {job_id}/
│ └── failed_items.json
└── worktrees/
└── {repo_name}/
└── {session_id}/
- Consistency: Single source of truth for all session data
- Reliability: Automatic persistence and recovery
- Scalability: Support for parallel execution and cross-worktree coordination
- Maintainability: Clear separation of concerns and modular design
- Migration: Seamless transition from legacy formats
- Observability: Comprehensive event logging and metrics
The checkpoint storage strategy system provides type-safe, deterministic path resolution for workflow checkpoints using a pure functional approach. This system replaces ad-hoc path logic with an explicit, composable design that supports multiple storage strategies.
The CheckpointStorage enum in src/cook/workflow/checkpoint_path.rs defines three explicit storage strategies:
CheckpointStorage::Local(PathBuf)- Purpose: Project-local checkpoint storage in
.prodigy/checkpoints/ - Use Cases: Testing, backwards compatibility, isolated project workflows
- Path Resolution: Uses provided path directly (pure function behavior)
- Example: Local checkpoints that stay within project directory
CheckpointStorage::Global { repo_name: String }- Purpose: Repository-scoped storage in
~/.prodigy/state/{repo}/checkpoints/ - Use Cases: Repository-level metadata, shared across all sessions
- Path Resolution:
~/.prodigy/state/{repo_name}/checkpoints/ - Example: Shared checkpoint data for all sessions of a project
CheckpointStorage::Session { session_id: String }- Purpose: Session-scoped storage in
~/.prodigy/state/{session_id}/checkpoints/ - Use Cases: Normal workflow checkpoints (recommended for most workflows)
- Path Resolution:
~/.prodigy/state/{session_id}/checkpoints/ - Benefits:
- Isolation between sessions
- Survives worktree cleanup
- Clean session-scoped organization
All path resolution functions follow functional programming principles:
-
resolve_base_dir() -> Result<PathBuf>- Pure function: Same inputs always produce same output
- No side effects (no I/O, no state mutation)
- Returns base directory based on storage strategy
-
checkpoint_file_path(checkpoint_id: &str) -> Result<PathBuf>- Composes
resolve_base_dir()with filename construction - Deterministic: Same strategy + ID = Same path
- Pattern:
{base_dir}/{checkpoint_id}.checkpoint.json
- Composes
-
resolve_global_base_dir() -> Result<PathBuf>- Helper function for global/session path resolution
- Returns
~/.prodigyfrom system home directory - Pure derivation from environment (home directory)
- Immutability:
CheckpointStorageenum is immutable once constructed - Explicit Configuration: Storage strategy is always explicit, never inferred
- Error as Values: Returns
Result<T>instead of panicking - Composition: Small pure functions compose to build complex paths
- Determinism: Property-based tests verify invariants
- Testing environments: Keep test artifacts in project directory
- Backwards compatibility: Existing workflows expecting local paths
- Isolated workflows: Project-specific checkpoints that shouldn't be shared
- Development: Quick iteration without polluting global storage
- Repository metadata: Data shared across all sessions
- Cross-session analysis: Aggregating data from multiple workflow runs
- Persistent state: Data that should survive project cleanup
- CI/CD environments: Shared state across pipeline runs
- Normal workflows: Standard workflow execution (recommended)
- Parallel execution: Isolated checkpoints per session
- Fault tolerance: Session-specific recovery without conflicts
- Clean separation: Clear boundaries between workflow runs
// Local: Direct path usage
let local = CheckpointStorage::Local(PathBuf::from("/tmp/checkpoints"));
let path = local.checkpoint_file_path("cp-1")?;
// Result: /tmp/checkpoints/cp-1.checkpoint.json
// Global: Repository-scoped
let global = CheckpointStorage::Global {
repo_name: "prodigy".to_string()
};
let path = global.checkpoint_file_path("cp-1")?;
// Result: ~/.prodigy/state/prodigy/checkpoints/cp-1.checkpoint.json
// Session: Session-scoped (recommended)
let session = CheckpointStorage::Session {
session_id: "session-abc123".to_string()
};
let path = session.checkpoint_file_path("cp-1")?;
// Result: ~/.prodigy/state/session-abc123/checkpoints/cp-1.checkpoint.jsonThe checkpoint path system uses both unit tests and property-based tests:
The system includes comprehensive property-based tests that verify invariants across arbitrary inputs:
- Determinism: Same strategy + ID always produces same path
- Isolation: Different session IDs produce different paths
- Conventions: All paths end with
.checkpoint.json - ID Preservation: Checkpoint ID is always in the filename
- Scoping: Storage paths always contain their scope identifier
- Pure Function Behavior: Local storage returns exact path provided
These tests run on randomly generated inputs to verify the system behaves correctly across all possible valid inputs, not just hand-picked test cases.
The checkpoint storage strategy integrates with the broader workflow system:
- Orchestrator: Selects storage strategy based on workflow configuration
- Checkpointer: Uses pure path functions to determine checkpoint locations
- Recovery: Reconstructs paths deterministically from session/workflow ID
- Migration: Legacy paths automatically detected and migrated
All path resolution functions:
- Return
Result<PathBuf>for error propagation - Provide context via
anyhow::Context - Never panic in production code
- Handle missing home directory gracefully
Prodigy's validation system is built on the stillwater library's Validation type, which provides error accumulation and functional composition patterns. This architecture replaces traditional fail-fast validation with a comprehensive approach that reports all errors at once.
- Error Accumulation: Collect all validation errors before failing
- Pure Functions: Validation logic separated from I/O operations
- Composable Validators: Build complex validators from simple, reusable functions
- Error Classification: Distinguish between errors (blocking) and warnings (informational)
The validation module is located in src/core/validation/ and contains pure validation functions:
// Core validation type from stillwater
use stillwater::Validation;
// All validators return Validation<T, Vec<ValidationError>>
pub fn validate_command(command: &str) -> ValidationResult {
// Accumulates multiple errors in single pass
}
pub fn validate_paths(
paths: &[&Path],
exists_check: FileExistsCheck
) -> ValidationResult {
// I/O injected as parameter - pure function
}
pub fn validate_environment(
required_vars: &[&str],
env_vars: &HashMap<String, String>
) -> ValidationResult {
// Validates all variables, accumulates all errors
}Traditional validation stops at the first error:
// ❌ Fail-fast approach (old pattern)
fn validate(items: &[Item]) -> Result<()> {
for item in items {
if item.is_invalid() {
return Err(error); // Stops here - user only sees first error
}
}
Ok(())
}Stillwater validation accumulates all errors:
// ✅ Error accumulation (stillwater pattern)
fn validate(items: &[Item]) -> ValidationResult {
let mut all_errors = Vec::new();
for item in items {
match validate_item(item).into_result() {
Ok(_) => {},
Err(errors) => all_errors.extend(errors), // Collect all errors
}
}
ValidationResult::from_validation(
if all_errors.is_empty() {
Validation::success(())
} else {
Validation::failure(all_errors)
}
)
}All validation errors are defined in the ValidationError enum with severity classification:
#[derive(Debug, Clone, PartialEq)]
pub enum ValidationError {
// Path validation
PathNotFound(PathBuf),
PathInParentDir(PathBuf), // Warning
PathInTempDir(PathBuf), // Warning
// Environment validation
EnvVarMissing(String),
EnvVarEmpty(String), // Warning
// Command validation
CommandEmpty,
CommandDangerous { cmd: String, pattern: String },
CommandSuspicious { cmd: String, reason: String }, // Warning
// Resource validation
IterationCountZero,
IterationCountHigh(usize), // Warning
MemoryLimitZero,
MemoryLimitLow(usize), // Warning
TimeoutZero,
}
pub enum ErrorSeverity {
Error, // Blocks execution
Warning, // Reported but non-blocking
}Validators compose to build complex validation logic:
// Small, focused validators
fn check_dangerous_patterns(cmd: &str) -> Validation<(), Vec<ValidationError>> {
// Returns errors for dangerous patterns
}
fn check_suspicious_patterns(cmd: &str) -> Validation<(), Vec<ValidationError>> {
// Returns warnings for suspicious patterns
}
// Composed validator
pub fn validate_command(command: &str) -> ValidationResult {
let mut all_errors = Vec::new();
// Compose multiple validators
if let Err(errors) = check_dangerous_patterns(command).into_result() {
all_errors.extend(errors);
}
if let Err(errors) = check_suspicious_patterns(command).into_result() {
all_errors.extend(errors);
}
ValidationResult::from_validation(
if all_errors.is_empty() {
Validation::success(())
} else {
Validation::failure(all_errors)
}
)
}Validators use dependency injection to remain pure:
// File existence check passed as parameter
pub type FileExistsCheck = fn(&Path) -> bool;
pub fn validate_paths(
paths: &[&Path],
exists_check: FileExistsCheck // I/O injected here
) -> ValidationResult {
// Pure logic - no filesystem access
for path in paths {
if !exists_check(path) {
errors.push(ValidationError::PathNotFound(path.to_path_buf()));
}
}
// ...
}
// Shell code provides I/O implementation
fn validate_workflow_paths(paths: &[&Path]) -> ValidationResult {
validate_paths(paths, |p| p.exists()) // I/O at the edge
}
// Test code provides mock implementation
#[test]
fn test_validate_paths() {
fn mock_exists(path: &Path) -> bool {
path.to_str().unwrap().contains("exists")
}
let result = validate_paths(&paths, mock_exists); // No filesystem access
assert_eq!(result.errors.len(), 2);
}The ValidationResult type provides backward compatibility with existing code:
#[derive(Debug, Clone, Default)]
pub struct ValidationResult {
pub is_valid: bool,
pub errors: Vec<String>,
pub warnings: Vec<String>,
}
impl ValidationResult {
/// Convert from stillwater Validation
pub fn from_validation<T>(v: Validation<T, Vec<ValidationError>>) -> Self {
match v.into_result() {
Ok(_) => Self::valid(),
Err(errors) => {
let mut result = Self::valid();
for error in errors {
match error.severity() {
ErrorSeverity::Error => result.add_error(error.to_string()),
ErrorSeverity::Warning => result.add_warning(error.to_string()),
}
}
result
}
}
}
}- Better User Experience: Users see ALL validation errors at once
- Improved Testing: Pure functions with no I/O dependencies
- Composability: Build complex validators from simple building blocks
- Consistency: Uniform error types and handling across the codebase
- Performance: Single-pass validation with no additional overhead
The stillwater validation migration maintains zero performance regression:
- No allocation overhead: Error vectors are allocated once
- Single-pass validation: All errors found in one traversal
- Compiler optimization: Small validators inline effectively
- Benchmark verification: See
benches/execution_benchmarks.rs::bench_validation_performance
Run validation benchmarks:
cargo bench --bench execution_benchmarks -- validation_performanceFor detailed migration examples and patterns, see:
docs/stillwater-validation-migration.md- Before/after code examplessrc/core/validation/mod.rs- Reference implementationspecs/163-stillwater-validation-library.md- Original specification
- Unit Tests: Each module has comprehensive unit tests
- Integration Tests: Test interaction between components
- Migration Tests: Verify legacy data migration
- Mock Implementations: Testing abstractions for isolated testing
- Property-Based Tests: Verify system invariants across arbitrary inputs using proptest
- Validation Tests: Error accumulation tests verify all errors are reported
As part of continuous improvement, Prodigy underwent systematic refactoring to reduce function complexity and improve maintainability following functional programming principles.
- Function Size: Reduce all functions to < 20 lines (prefer 5-10)
- Nesting Depth: Maximum 2 levels of nesting
- Single Responsibility: Each function does one thing well
- Pure Function Extraction: Separate business logic from I/O
- Composability: Build complex behavior from small, testable functions
- setup_environment: 98 lines → 27 lines + 3 helper functions
- cleanup: 71 lines → 8 lines + 4 helper functions
- execute_and_validate_command: 95 lines → 33 lines + 4 pure validators
- Extracted 13 pure functions to
cook/orchestrator/construction.rs - All pure functions have comprehensive unit tests
####2. cook/execution/mapreduce/checkpoint_integration.rs (2515 lines)
- initialize_checkpoint_state: 65 lines → 5 lines + 7 helper functions
- get_next_batch: 34 lines (3-level nesting) → 8 lines (1-level nesting)
- process_batch: 32 lines → 5 lines + 2 pure functions
- update_checkpoint_with_results: 51 lines → 16 lines + 3 handlers
- resume_from_checkpoint: 100 lines → 24 lines + 3 phase handlers
- Created 24 new focused functions, all < 20 lines
- determine_command_type: 77 lines → 7 lines + 5 pure helpers
- save_workflow_state: 22 lines → 14 lines (pure extraction)
- handle_no_commits_error: 55 lines → 8 lines + 4 message builders
- execute_internal: 293 lines → 3 orchestration functions
- Extracted 9 pure functions to
executor/pure.rs
- extract_json_path: 37 lines, 4-level nesting → 4 lines, 1-level nesting
- resolve_by_type: 38 lines → 13 lines + 5 specialized resolvers
- resolve_json_variable: 38 lines, 6-level nesting → 16 lines, 2-level nesting
- aggregate functions (min, max, median, variance): 24-29 lines → 10-16 lines
- Created 38 new helper functions, all pure and focused
- Eliminated code duplication across aggregation functions
- update_agent_result: 57 lines → 6 lines + 6 helper functions + 3 pure extractors
- save_checkpoint: 80 lines → 14 lines + 4 I/O pipelines + 3 pure helpers
- load_checkpoint_by_version: 51 lines → 10 lines + 4 path resolution helpers
- list_checkpoints: 39 lines → 8 lines + 5 pure parsing functions
- Created reusable
write_file_atomicallyprimitive
// BEFORE: Mixed logic and I/O
fn process_item(item: &Item) -> Result<()> {
if item.validate() { // Pure logic
fs::write("result.txt", "success")?; // I/O
}
Ok(())
}
// AFTER: Separated concerns
fn validate_item(item: &Item) -> bool { // Pure
// Validation logic
}
fn write_result(path: &Path, content: &str) -> Result<()> { // I/O wrapper
fs::write(path, content)
}
fn process_item(item: &Item) -> Result<()> { // Thin orchestration
if validate_item(item) {
write_result(Path::new("result.txt"), "success")?;
}
Ok(())
}// BEFORE: Monolithic function
fn complex_operation(data: Data) -> Result<Output> {
// 100 lines of sequential logic
}
// AFTER: Composed from small functions
fn complex_operation(data: Data) -> Result<Output> {
step1(data)
.and_then(step2)
.and_then(step3)
.map(finalize)
}// BEFORE: Deep nesting
fn process(item: Option<Item>) -> Result<()> {
if let Some(item) = item {
if item.valid {
if let Some(data) = item.data {
// Process...
}
}
}
Ok(())
}
// AFTER: Early returns + functional chains
fn process(item: Option<Item>) -> Result<()> {
let item = item.ok_or(Error::MissingItem)?;
if !item.valid {
return Ok(());
}
let data = item.data.ok_or(Error::MissingData)?;
process_data(&data)
}- Testability: Pure functions testable without I/O mocks
- Readability: Small functions self-document through clear names
- Maintainability: Easy to locate and modify specific behavior
- Reusability: Helper functions composable across modules
- Debugging: Reduced complexity simplifies troubleshooting
- Code Review: Smaller units easier to review thoroughly
- Functions Refactored: 14 major complex functions
- Helper Functions Created: 90+ new focused functions
- Average Function Length: Reduced from ~50 lines to ~12 lines
- Max Nesting Depth: Reduced from 4-6 levels to 1-2 levels
- Test Coverage: Pure functions have dedicated unit tests
Some refactored functions are still being integrated:
- Test failures in
cook/orchestrator/core.rs(6 tests) due to behavior changes - These will be addressed in follow-up work to ensure exact behavioral equivalence
When refactoring complex functions in Prodigy:
- Identify functions > 20 lines or complexity > 5
- Extract pure logic to helper functions (< 10 lines each)
- Separate I/O operations into thin wrappers
- Compose at higher level using functional patterns
- Test each pure function independently
- Verify original tests still pass
See spec 166 for detailed refactoring patterns and examples.
The MapReduce state management module (src/cook/execution/state_pure/) implements a pure functional approach to state transitions using the Stillwater Effect library. This design provides a clear separation between pure business logic and I/O operations, enabling comprehensive testing and improved maintainability.
src/cook/execution/state_pure/
├── mod.rs # Public API and module exports
├── pure.rs # Pure state transition functions (40+ unit tests)
├── io.rs # Effect-based I/O operations (20+ integration tests)
└── types.rs # State types and data structures
All MapReduce state transitions are implemented as pure functions:
// Pure function: takes state, returns new state
pub fn apply_agent_result(
state: MapReduceJobState,
result: AgentResult
) -> MapReduceJobState {
// No I/O, no side effects
// Same inputs always produce same outputs
// Easy to test with simple assertions
}Key Pure Functions:
apply_agent_result()- Updates state with agent completionshould_transition_to_reduce()- Determines phase transitionget_retriable_items()- Calculates retry-eligible itemsstart_reduce_phase()- Initializes reduce phase statecomplete_reduce_phase()- Finalizes reduce phasemark_complete()- Marks job as completeupdate_variables()- Updates workflow variablesset_parent_worktree()- Sets worktree reference
All pure functions:
- Return new state instead of mutating
- Contain zero I/O operations
- Are independently testable
- Have comprehensive unit test coverage
I/O operations are wrapped in Stillwater Effect types for lazy evaluation and composition:
// Effect type for state operations
pub type StateEffect<T> = Effect<T, anyhow::Error, StateEnv>;
// Storage backend trait
#[async_trait]
pub trait StorageBackend: Send + Sync {
async fn write_checkpoint(&self, job_id: &str, data: &str) -> Result<()>;
async fn read_checkpoint(&self, job_id: &str) -> Result<String>;
}
// Event log trait
#[async_trait]
pub trait EventLog: Send + Sync {
async fn log_checkpoint_saved(&self, job_id: &str) -> Result<()>;
async fn log_phase_transition(&self, job_id: &str, phase: &str) -> Result<()>;
}Effect-Based Operations:
save_checkpoint()- Persists state to storageload_checkpoint()- Retrieves state from storageupdate_with_agent_result()- Composes pure update + savecomplete_batch()- Processes batch + checkpoint + transitionstart_reduce_phase_with_save()- Reduces boilerplate for common operations
Effects enable lazy evaluation and composition of I/O operations:
// Effect: lazy computation that hasn't run yet
let effect = save_checkpoint(state);
// Compose effects
let composed = effect
.and_then(|_| load_checkpoint(job_id))
.map(|state| transform_state(state));
// Execute with environment
let result = composed.run(&env).await?;Benefits:
- Lazy evaluation: Effects don't run until explicitly invoked
- Composable: Chain operations using
map,and_then,or_else - Testable: Use mock environments for testing
- Type-safe: Compile-time guarantees about effect types
#[test]
fn test_apply_agent_result_success() {
let state = test_state();
let result = test_agent_result("item-0", AgentStatus::Success);
let new_state = apply_agent_result(state, result);
assert_eq!(new_state.successful_count, 1);
assert!(new_state.pending_items.is_empty());
}No mocks required - pure functions test input/output directly.
#[tokio::test]
async fn test_save_checkpoint() {
let env = test_env(); // Mock storage backend
let state = test_state();
save_checkpoint(state).run(&env).await.unwrap();
// Verify through mock storage
}Mock implementations provide testable I/O without actual file system access.
// Pure logic in pure.rs
pub fn calculate_next_state(state: State, event: Event) -> State {
// Pure transformation
}
// I/O wrapper in io.rs
pub fn apply_event_with_save(state: State, event: Event) -> StateEffect<State> {
let new_state = pure::calculate_next_state(state, event);
save_checkpoint(new_state.clone()).map(|_| new_state)
}pub fn complete_batch(
state: MapReduceJobState,
results: Vec<AgentResult>
) -> StateEffect<MapReduceJobState> {
// Pure: apply all results
let mut new_state = state;
for result in results {
new_state = pure::apply_agent_result(new_state, result);
}
// I/O: save checkpoint
save_checkpoint(new_state.clone()).and_then(move |_| {
// Pure: check if transition needed
if pure::should_transition_to_reduce(&new_state) {
transition_to_reduce(new_state)
} else {
Effect::pure(new_state)
}
})
}pub struct StateEnv {
pub storage: Arc<dyn StorageBackend>,
pub event_log: Arc<dyn EventLog>,
}Environment contains all external dependencies, injected at runtime.
- No overhead: Pure functions compile to native code with zero abstraction cost
- Lazy evaluation: Effects only run when executed
- Memory efficient: State updates use clone-on-write patterns
- <5% overhead: Compared to imperative state updates (target met)
Status: ✅ COMPLETE (2025-11-23)
The state_pure module is now fully integrated with the MapReduce executor. All state mutations in src/cook/execution/state.rs now use pure functions from state_pure::pure module.
// Old imperative approach (deprecated):
state.update_agent_result(result); // mutates state
// New pure approach (now in use):
pub fn update_agent_result(&mut self, result: AgentResult) {
let pure_state = to_pure_state(self);
let new_pure_state = state_pure::apply_agent_result(pure_state, result);
*self = from_pure_state(new_pure_state);
}Integrated Methods:
update_agent_result()→state_pure::apply_agent_result()start_reduce_phase()→state_pure::start_reduce_phase()complete_reduce_phase()→state_pure::complete_reduce_phase()mark_complete()→state_pure::mark_complete()is_map_phase_complete()→state_pure::is_map_phase_complete()get_retriable_items()→state_pure::get_retriable_items()
Type Conversion:
Since state.rs and state_pure/types.rs define identical structures, conversion uses serde serialization:
fn to_pure_state(state: &MapReduceJobState) -> state_pure::MapReduceJobState {
let json = serde_json::to_string(state).expect("Failed to serialize state");
serde_json::from_str(&json).expect("Failed to deserialize to pure state")
}Test Coverage:
- Pure functions: 40/40 tests passing
- State integration: 81/81 tests passing
- MapReduce executor: 595/595 tests passing
- Total: 716 passing tests with zero failures
The migration from imperative to pure state transitions is complete. All MapReduce state updates now flow through tested pure functions, providing:
- Better testability (pure functions with no I/O)
- Clear separation of concerns (logic vs I/O)
- Compile-time guarantees (type-safe transformations)
- Comprehensive test coverage (no untested state mutations)
- Testability: 40+ pure function tests with no I/O mocking
- Separation of Concerns: Clear boundary between logic and I/O
- Composability: Effects combine using functional patterns
- Type Safety: Compile-time guarantees about state transitions
- Maintainability: Small, focused functions (all < 20 lines)
See spec 169 for complete implementation details and design rationale.
The orchestrator uses Stillwater's Effect pattern for composable, testable workflow execution.
┌─────────────────────────────────────────────────────────────┐
│ Effect-Based Orchestrator │
├─────────────────────────────────────────────────────────────┤
│ │
│ Pure Core Effects Environment │
│ ┌──────────┐ ┌──────────┐ ┌────────────┐ │
│ │ Classify │ ──────► │ Validate │ ──────► │ I/O │ │
│ │ Validate │ │ Setup │ │ Operations │ │
│ │ Decide │ │ Execute │ └────────────┘ │
│ └──────────┘ └──────────┘ │
│ │
│ No I/O Composed Injected │
│ Testable Effects Dependencies │
└─────────────────────────────────────────────────────────────┘
Pure functions for workflow analysis with no I/O:
// Workflow classification
pub fn classify_workflow(config: &WorkflowConfig) -> WorkflowType
// Workflow validation
pub fn validate_workflow(config: &WorkflowConfig) -> Validation<(), Vec<WorkflowError>>
// Iteration decisions
pub fn should_continue_iteration(iteration: u32, max: u32, files_changed: usize) -> IterationDecisionProperties:
- Zero I/O operations
- Deterministic output
- Easily testable (25+ pure function tests)
- Composable with Effects
Effects wrap I/O operations and compose workflow execution:
// Effect type for orchestrator operations
pub type OrchEffect<T> = Effect<T, anyhow::Error, OrchestratorEnv>
// Workflow validation as Effect
pub fn validate_workflow(config: WorkflowConfig) -> OrchEffect<WorkflowConfig>
// Setup workflow environment
pub fn setup_workflow(config: WorkflowConfig) -> OrchEffect<WorkflowSession>
// Execute complete workflow
pub fn execute_workflow(config: WorkflowConfig) -> OrchEffect<WorkflowResult>Effect Composition:
// Effects compose using and_then_auto and map_auto
let workflow_effect = validate_workflow(config)
.and_then_auto(|validated| setup_workflow(validated))
.and_then_auto(|session| execute_steps(session))
.map_auto(|result| result.summary());Dependency injection for testability:
pub struct OrchestratorEnv {
pub session_manager: Arc<dyn SessionManager>,
pub command_executor: Arc<dyn CommandExecutor>,
pub claude_executor: Arc<dyn ClaudeExecutor>,
pub user_interaction: Arc<dyn UserInteraction>,
pub git_operations: Arc<dyn GitOperations>,
pub subprocess_manager: SubprocessManager,
}Testing Support:
- Mock implementations for all traits
OrchestratorEnv::test()constructor (partial)- Pure logic tested without I/O
- Effect composition verified separately
-
Separation of Concerns
- Pure logic in
pure.rs(no I/O) - I/O wrapped in Effects
- Clear boundaries
- Pure logic in
-
Testability
- Pure functions: simple input/output tests
- Effects: testable with mocks
- No hidden dependencies
-
Composability
- Effects combine functionally
- Reusable workflow components
- Type-safe composition
-
Explicit Dependencies
- All I/O through
OrchestratorEnv - No hidden global state
- Easy to trace data flow
- All I/O through
- Pure Functions: 25+ tests
- Data Structures: 10+ tests
- Effect Validation: Pure validation tests
- Total: 35+ orchestrator tests
- Pure Core: All business logic in pure functions
- Effect Composition: I/O operations as composable Effects
- Environment Injection: Dependencies passed explicitly
- Incremental Migration: Non-breaking additions to existing code
The Effect-based orchestrator coexists with the existing orchestrator:
- Pure functions extract validation/classification logic
- Effect wrappers provide composable I/O operations
- Environment injection enables testing
- Gradual adoption as features require it
No changes to existing workflow execution until Effects are fully proven.
See spec 170 for complete implementation details and rationale.
- Distributed Execution: Support for multi-machine orchestration
- Enhanced Monitoring: Real-time metrics and dashboards
- Plugin System: Extensible command and executor architecture
- Cloud Storage: Optional cloud-based storage backends
- Advanced Scheduling: Cron-based and event-driven workflows