Architecture, module reference, and step-by-step guides for adding model providers, analysis reports, benchmark loaders, and new pipeline phases.
- Repository Layout
- Architecture Overview
- Module Reference
- 3.1
Code/runner/config.py - 3.2
Code/runner/storage.py - 3.3
Code/runner/prompts.py - 3.4
Code/runner/logger.py - 3.5
Code/runner/runner.py - 3.6
Code/runner/cli.py - 3.7
Code/runner/commands/ - 3.8
Code/runner/label_eval.py - 3.9
Code/runner/interfaces/base.py - 3.10 Real-time interfaces
- 3.11 Batch runners
- 3.12
Code/runner/interfaces/probe.py - 3.13
Code/runner/interfaces/cost_estimator.py - 3.14
Code/runner/interfaces/pool.py - 3.15
Code/runner/interfaces/registry.py - 3.16
Code/runner/phases/utils.py - 3.17
Code/runner/phases/phase{1–5}.py - 3.18
Code/analyzer/main.py - 3.19
Code/analyzer/loader.py - 3.20
Code/analyzer/reports/
- 3.1
- Data Flow Walkthrough
- ID and File Naming Conventions
- Error Handling Strategy
- How To: Add a Real-Time Model Provider
- How To: Add a Batch Runner
- How To: Add an Analysis Report
- How To: Add a Benchmark Loader
- How To: Add a Pipeline Phase
- Testing
- Frequently Asked Questions
Code/runner/ ← pipeline package (runner.* namespace)
├── __init__.py
├── cli.py # CLI entry point — all coeval subcommands
├── config.py # Config dataclasses, YAML load, V-01..V-17 validation
├── logger.py # RunLogger: timestamped log to file + console
├── prompts.py # Canonical prompt templates + resolution logic
├── runner.py # Orchestrator: iterates phases, wires storage/logger/pool
├── storage.py # ExperimentStorage: all filesystem I/O
├── label_eval.py # LabelEvaluator: exact-match metrics for classification tasks
│
├── commands/ # Standalone CLI command implementations
│ ├── describe_cmd.py # coeval describe — HTML planning view
│ ├── generate_cmd.py # coeval generate — single-model generation
│ ├── ingest_cmd.py # coeval ingest — benchmark JSONL ingest
│ ├── models_cmd.py # coeval models — list provider models
│ ├── plan_cmd.py # coeval plan — cost/phase dry run
│ ├── probe_cmd.py # coeval probe — standalone availability probe
│ ├── repair_cmd.py # coeval repair — fix corrupted artifacts
│ ├── status_cmd.py # coeval status — experiment dashboard + batch fetching
│ └── wizard_cmd.py # coeval wizard — interactive config setup
│
├── interfaces/
│ ├── __init__.py # Re-exports + create_batch_runner() factory
│ ├── base.py # Abstract ModelInterface (generate method)
│ ├── openai_iface.py # OpenAI Chat Completions
│ ├── anthropic_iface.py # Anthropic Messages API
│ ├── gemini_iface.py # Google Gemini (generativeai)
│ ├── huggingface_iface.py # Local HuggingFace transformers.pipeline (GPU)
│ ├── openai_compat_iface.py # OpenAI-compatible: groq, deepseek, cohere, etc.
│ ├── azure_openai_iface.py # Azure OpenAI
│ ├── azure_ai_iface.py # Azure AI Inference (models.ai.azure.com)
│ ├── bedrock_iface.py # AWS Bedrock real-time (native API key)
│ ├── vertex_iface.py # Google Vertex AI real-time
│ ├── openai_batch.py # OpenAI Batch API runner
│ ├── anthropic_batch.py # Anthropic Message Batches runner
│ ├── gemini_batch.py # Gemini concurrent pseudo-batch runner
│ ├── azure_batch.py # Azure OpenAI Batch API runner
│ ├── bedrock_batch.py # AWS Bedrock Model Invocation Jobs runner
│ ├── vertex_batch.py # Google Vertex AI Batch Prediction runner
│ ├── mistral_batch.py # Mistral Batch API runner
│ ├── probe.py # run_probe(): lightweight availability check
│ ├── cost_estimator.py # estimate_experiment_cost(), PRICE_TABLE
│ ├── pool.py # ModelPool: lazy-load + cache interface instances
│ └── registry.py # Credential resolution, auto-routing, model listing
│
├── phases/
│ ├── __init__.py
│ ├── utils.py # _extract_json, call_llm_json/word, mergers, QuotaTracker
│ ├── phase1.py # Attribute Mapping
│ ├── phase2.py # Rubric Mapping
│ ├── phase3.py # Data Generation
│ ├── phase4.py # Response Collection
│ └── phase5.py # Evaluation
│
└── benchmarks/ # Benchmark adapter helpers (not benchmark loaders)
Code/analyzer/ ← analysis & reporting package (analyzer.* namespace)
├── __init__.py
├── main.py # run_analyze() entry point for coeval analyze
├── loader.py # load_ees(): read phase 5 JSONL → EESDataModel
├── metrics.py # ICC, kappa, score normalisation
├── calibration.py # Judge calibration and drift analysis
├── paper_tables.py # LaTeX / CSV paper table generators
└── reports/
├── html_base.py # Shared Plotly HTML utilities, get_plotly_js()
├── index_page.py # Main dashboard HTML (links to all sub-reports)
├── summary_report.py # Run metadata summary
├── coverage.py # Attribute coverage stacked-bar charts
├── student_report.py # Per-student score breakdowns and heatmaps
├── teacher_report.py # Per-teacher source quality and coverage
├── judge_report.py # Judge bias, calibration, inter-rater reliability
├── consistency.py # Inter-judge ICC agreement and drift
├── robust.py # Robust summary: rankings + confidence bounds
├── score_dist.py # Score distribution histograms
├── interaction.py # Teacher × Student pair quality heatmap
└── excel.py # Complete Excel workbook export
Public/benchmark/ ← benchmark Python package (benchmark.* namespace)
├── __init__.py
├── setup_mixed.py # One-time setup: ingest all mixed-benchmark datasets
├── setup_education.py # One-time setup: ingest education datasets
├── loaders/
│ ├── __init__.py # Loader registry + load_benchmark() dispatcher
│ ├── base.py # BenchmarkLoader abstract base
│ ├── xsum.py # XSum summarisation (EdinburghNLP/xsum)
│ ├── aeslc.py # Email subject line (aeslc)
│ ├── arc_challenge.py # ARC Challenge QA
│ ├── race.py # RACE reading comprehension
│ ├── sciq.py # SciQ science QA
│ ├── codesearchnet.py # CodeSearchNet code search
│ └── wikitablequestions.py # WikiTableQuestions table QA
└── compute_scores.py # Populate benchmark_native_score in Phase 3 JSONL
Config/
└── provider_pricing.yaml # Pricing tables and auto_routing for all 18 interfaces
Runs/ # One subfolder per experiment config
├── mixed/mixed.yaml
├── education/education.yaml
├── medium-benchmark/medium_benchmark.yaml
└── ...
Tests/
├── runner/ # Unit tests for runner.* package
├── benchmark/ # Unit tests for benchmark.* package
├── analyzer/ # Unit + Playwright tests for analyzer.* package
└── test_structural_integrity.py # Layout, imports, path constant verification
docs/
├── concepts.md # Concept glossary
├── developer_guide.md # This file
├── cli_reference.md # CLI option reference
├── tutorial.md # End-to-end walkthrough
└── README/ # 13-section user guide (01-overview … 13-documentation)
CLI (cli.py)
├─► coeval run → load_config() → validate_config() → run_experiment()
├─► coeval probe → commands/probe_cmd.py → run_probe()
├─► coeval plan → commands/plan_cmd.py → estimate_experiment_cost()
├─► coeval status → commands/status_cmd.py → reads experiment folder
├─► coeval repair → commands/repair_cmd.py → validates + patches artifacts
├─► coeval describe → commands/describe_cmd.py → HTML planning view
├─► coeval wizard → commands/wizard_cmd.py → interactive config builder
└─► coeval analyze → Code/analyzer/main.py → run_analyze()
run_experiment() [runner.py]
│
├─ ExperimentStorage.initialize() [storage.py]
├─ RunLogger() [logger.py]
├─ ModelPool(provider_keys) [interfaces/pool.py]
├─ QuotaTracker() [phases/utils.py]
├─ run_probe() [interfaces/probe.py] pre-flight
├─ estimate_experiment_cost() [interfaces/cost_estimator.py] optional
│
└─ for phase_id in PHASE_IDS:
runner_fn(cfg, storage, logger, pool, quota, phase_mode)
│
├─ Phase 1: run_phase1() → phase1_attributes/{task}.{kind}_attrs.json
├─ Phase 2: run_phase2() → phase2_rubric/{task}.rubric.json
├─ Phase 3: run_phase3() → phase3_datapoints/{task}__{teacher}.datapoints.jsonl
├─ Phase 4: run_phase4() → phase4_responses/{task}__{teacher}__{student}.responses.jsonl
└─ Phase 5: run_phase5() → phase5_evaluations/{task}__{teacher}__{judge}.evaluations.jsonl
run_analyze() [Code/analyzer/main.py]
├─ load_ees(run_path) → EESDataModel
└─ report writers (one per subcommand):
all / dashboard / student-report / teacher-report / judge-report /
judge-consistency / coverage-summary / score-distribution /
interaction-matrix / robust-summary / complete-report
Every phase function has the same signature:
def run_phaseN(
cfg: CoEvalConfig,
storage: ExperimentStorage,
logger: RunLogger,
pool: ModelPool,
quota: QuotaTracker,
phase_mode: str,
) -> None: ...This uniformity lets runner.py iterate over phases in a loop and makes adding a new phase zero-cost to the orchestrator.
Purpose: Parse the YAML experiment config into typed dataclasses and enforce all 17 validation rules.
CoEvalConfig
├── models: list[ModelConfig]
├── tasks: list[TaskConfig]
└── experiment: ExperimentConfig
ModelConfig
name, interface, parameters, roles, access_key, role_parameters, batch_enabled
TaskConfig
name, description, output_description, target_attributes, nuanced_attributes,
sampling (SamplingConfig), rubric, evaluation_mode, prompt_library, label_attributes
SamplingConfig
target ([min,max] | "all"), nuance ([min,max]), total (int)
ExperimentConfig
id, storage_folder, resume_from, phases, log_level, quota,
probe_mode, probe_on_fail, estimate_cost, estimate_samples, batch
load_config(path) → CoEvalConfig
Opens the YAML, calls _parse_config(), stores the raw dict on cfg._raw, returns the config.
validate_config(cfg, continue_in_place, _skip_folder_validation) → list[str]
Applies V-01 through V-17. Returns error strings; never raises.
continue_in_place=True— suppresses V-11 (folder must not exist), activates V-14 (meta.json must exist)._skip_folder_validation=True— suppresses V-11 and V-14; used by standalone commands.
CoEvalConfig.get_models_by_role(role) → list[ModelConfig]
Returns all models carrying the given role. Used by every phase.
ModelConfig.get_parameters_for_role(role) → dict
Returns base parameters deep-merged with role_parameters[role].
VALID_INTERFACES # all 18 supported interface names
BATCH_CAPABLE_INTERFACES # openai, anthropic, gemini, azure_openai, bedrock, vertex, mistral
PHASE_IDS # ['attribute_mapping', 'rubric_mapping', 'data_generation',
# 'response_collection', 'evaluation']
VALID_PHASE_MODES # New | Keep | Extend | ModelPurpose: All filesystem I/O for one experiment. Phases never touch the disk directly.
Sets self.run_path = Path(storage_folder) / experiment_id and pre-computes all sub-paths.
initialize(config_raw, resume_from_id, source_storage_folder, continue_in_place)
Creates the folder tree (phase1_attributes/ … phase5_evaluations/), writes config.yaml and meta.json.
resume_from_id— copies Phase 1 and Phase 2 artifacts from a source experiment.continue_in_place=True—exist_ok=Trueon allmkdircalls; skips overwriting existingconfig.yaml/meta.json.
| Phase | Write | Read | Exists? |
|---|---|---|---|
| 1 | write_target_attrs(task, attrs) |
read_target_attrs(task) |
target_attrs_exist(task) |
| 1 | write_nuanced_attrs(task, attrs) |
read_nuanced_attrs(task) |
nuanced_attrs_exist(task) |
| 2 | write_rubric(task, rubric) |
read_rubric(task) |
rubric_exists(task) |
| 3 | append_datapoint(task, teacher, record) |
read_datapoints(task, teacher) |
count_datapoints(task, teacher) |
| 4 | append_response(task, teacher, student, record) |
read_responses(...) |
response_file_exists(...) |
| 5 | append_evaluation(task, teacher, judge, record) |
read_evaluations(...) |
evaluation_file_exists(...) |
update_meta(phase_started, phase_completed, status) — Reads, patches, and rewrites meta.json.
read_meta() → dict — Used by --continue and coeval status.
Batch tracking: add_pending_batch(), read_pending_batches(), remove_pending_batch(), update_pending_batch_status().
Purpose: Canonical prompt templates and resolution logic.
| ID | Used in phase | Key slots |
|---|---|---|
map_target_attrs |
1 | {task_description} |
map_nuanced_attrs |
1 | {task_description} |
autorubric |
2 | {task_description}, {output_description} |
sample |
3 | {task_description}, {output_description}, {target_attributes}, {nuanced_attributes} |
test |
4 | {input}, {task_description}, {output_description} |
evaluate_single |
5 | {task_description}, {output_description}, {input}, {target_attributes}, {reference_response}, {response}, {rubric} |
evaluate_per_factor |
5 | same minus {rubric}, plus {rubric_factor_name}, {rubric_factor_description} |
Resolution order:
task_prompt_library[f"{prompt_id}.{model_name}"]— model-specific overridetask_prompt_library[prompt_id]— task-level overrideTEMPLATES[prompt_id]— canonical fallback
Then calls template.format(**variables). Literal { / } in YAML overrides must be doubled: {{ / }}.
Purpose: Timestamped logger writing to run.log and optionally to the console.
Pass os.devnull as log_path for commands that have no run folder.
Lines are formatted as {ISO-UTC} [{LEVEL}] {message}.
WARNING and ERROR go to sys.stderr; all others to sys.stdout.
On Windows, UnicodeEncodeError on console output is caught and re-emitted with errors='replace'.
Purpose: Top-level experiment orchestrator.
run_experiment(cfg, dry_run, continue_in_place, only_models, skip_probe, probe_mode, probe_on_fail, estimate_only, estimate_samples) → int
- Creates
ExperimentStorageand callsinitialize(). - Creates
RunLogger,ModelPool(cfg._provider_keys),QuotaTracker(cfg.experiment.quota). - Runs the pre-flight probe unless disabled.
- Optionally runs cost estimation.
- Iterates
PHASE_IDS; for each phase calls_PHASE_RUNNERS[phase_id](cfg, storage, logger, pool, quota, mode). - Updates
meta.jsonphase-by-phase; writesstatus: completedon success.
_PHASE_RUNNERS maps each phase ID to its run_phaseN() function.
_CONTINUE_MODE maps phase IDs to the mode used by --continue (Keep or Extend).
Prints a formatted summary of models, tasks, phases, and estimated call counts — used by coeval plan before cost estimation.
Purpose: argparse-based entry point for all coeval subcommands.
The main() function:
- Resolves the provider key file path (flag → env var → project root → home dir).
- Dispatches to the appropriate command handler.
Subcommands and their handlers:
| Subcommand | Handler module |
|---|---|
run |
runner.py::run_experiment |
probe |
commands/probe_cmd.py::cmd_probe |
plan |
commands/plan_cmd.py::cmd_plan |
status |
commands/status_cmd.py::cmd_status |
repair |
commands/repair_cmd.py::cmd_repair |
describe |
commands/describe_cmd.py::cmd_describe |
wizard |
commands/wizard_cmd.py::cmd_wizard |
generate |
commands/generate_cmd.py::cmd_generate |
ingest |
commands/ingest_cmd.py::cmd_ingest |
models |
commands/models_cmd.py::cmd_models |
analyze |
Code/analyzer/main.py::run_analyze |
Each file in this directory implements one cmd_* function.
| File | Key function | What it does |
|---|---|---|
probe_cmd.py |
cmd_probe |
Runs run_probe() against a config; prints per-model pass/fail |
plan_cmd.py |
cmd_plan |
Runs estimate_experiment_cost(); prints phase plan + cost table |
status_cmd.py |
cmd_status |
Reads meta.json and JSONL counts; shows a live experiment dashboard |
repair_cmd.py |
cmd_repair |
Scans experiment artifacts; patches truncated JSONL, missing phase dirs |
describe_cmd.py |
cmd_describe |
Generates a standalone HTML planning document from a config |
wizard_cmd.py |
cmd_wizard |
Interactive questionnaire that writes a ready-to-run YAML config |
generate_cmd.py |
cmd_generate |
Single-model generation utility for prompt testing |
ingest_cmd.py |
cmd_ingest |
Converts a benchmark dataset into Phase 3 JSONL format |
models_cmd.py |
cmd_models |
Lists available models for a provider, optionally filtered |
Purpose: Exact-match evaluation for classification and information-extraction tasks. Used after Phase 5 when the teacher's sampled_target_attributes represent ground-truth labels.
evaluate(predictions, ground_truth) → dict
Multiclass accuracy, per-label precision/recall/F1. Accepts both string and single-element-list predictions.
evaluate_multilabel(predictions, ground_truth) → dict
Hamming accuracy plus per-attribute metrics for multi-label tasks.
class ModelInterface(ABC):
@abstractmethod
def generate(self, prompt: str, parameters: dict) -> str:
"""Call the model and return the text response."""This is the only contract that all 18 provider adapters must fulfil. Role-specific parameter overrides are already merged into parameters before generate() is called — the interface does not need to know about roles.
Each file in Code/runner/interfaces/ that ends in _iface.py is a real-time provider adapter.
| File | Class | Auth | Notes |
|---|---|---|---|
openai_iface.py |
OpenAIInterface |
OPENAI_API_KEY |
Chat Completions; retries on transient errors |
anthropic_iface.py |
AnthropicInterface |
ANTHROPIC_API_KEY |
Messages API; requires max_tokens |
gemini_iface.py |
GeminiInterface |
GEMINI_API_KEY |
google-generativeai SDK |
huggingface_iface.py |
HuggingFaceInterface |
HF_TOKEN |
transformers.pipeline; requires CUDA GPU |
openai_compat_iface.py |
OpenAICompatInterface |
provider-specific | Covers groq, deepseek, mistral, deepinfra, cerebras, cohere, ollama, huggingface_api, openrouter |
azure_openai_iface.py |
AzureOpenAIInterface |
AZURE_OPENAI_API_KEY + endpoint |
Azure OpenAI resource |
azure_ai_iface.py |
AzureAIInterface |
AZURE_AI_API_KEY + endpoint |
Azure AI Inference (models.ai.azure.com) |
bedrock_iface.py |
BedrockInterface |
native Bedrock API key OR IAM | Converse API |
vertex_iface.py |
VertexInterface |
ADC / service account | Vertex AI Gemini real-time |
All real-time interfaces implement the same retry pattern:
- Call the provider API.
- On transient errors (rate limit, 502/503/504, timeout) — exponential backoff, up to 3 retries.
- On fatal errors (invalid API key, model not found) — re-raise immediately.
Batch runners handle the asynchronous submit → poll → download cycle for each provider's batch API.
| File | Class | Provider | Discount |
|---|---|---|---|
openai_batch.py |
OpenAIBatchRunner |
OpenAI Batch API | ~50% |
anthropic_batch.py |
AnthropicBatchRunner |
Message Batches API | ~50% |
gemini_batch.py |
GeminiBatchRunner |
Concurrent thread pool | simulated |
azure_batch.py |
AzureBatchRunner |
Azure OpenAI Batch | ~50% |
bedrock_batch.py |
BedrockBatchRunner |
Model Invocation Jobs | ~50% |
vertex_batch.py |
VertexBatchRunner |
Batch Prediction Jobs | ~50% |
mistral_batch.py |
MistralBatchRunner |
Mistral Batch API | ~50% |
All batch runners expose the same interface:
runner.add(key: str, prompt: str, params: dict) → None
runner.run(description, logger, storage, phase) → dict[str, str]
len(runner) # number of pending requests
runner.clear() # discard without submittingrun() returns {user_key: response_text}. Failed individual records map to ''.
The factory create_batch_runner(interface, access_key, **kwargs) in interfaces/__init__.py returns the correct runner for any batch-capable interface name.
Purpose: Verify model reachability before committing to a full run.
run_probe(cfg, logger, mode, on_fail, phases_completed, only_models, probe_results_path) → (dict, set)
mode='full'— probe every model in the config.mode='resume'— probe only models needed for incomplete phases.mode='disable'— skip probing entirely.
Returns (results_dict, set_of_needed_models).
results_dict maps model_name → {'ok': bool, 'latency_ms': int, 'error': str|None}.
Written to {run_path}/probe_results.json.
For network interfaces, probes are lightweight single-token calls. For HuggingFace, probes query Hub metadata without loading weights.
Purpose: Pre-run cost and time estimation.
_PRICING_YAML_PATH = Path(__file__).parent.parent.parent.parent / 'Config' / 'provider_pricing.yaml'(4 parent levels: interfaces/ → runner/ → Code/ → project root → Config/)
PRICE_TABLE: dict — {fragment: {input_per_1M, output_per_1M}} built at import time from the YAML.
BATCH_DISCOUNT: dict — {interface: discount_factor} (0.50 = 50% off).
get_prices(model_id) → (input_price, output_price) — Longest-fragment match in PRICE_TABLE.
estimate_experiment_cost(cfg, storage, logger) → dict — Runs estimate_samples live sample calls per model, then extrapolates to the full experiment. Writes result to cost_estimate.json.
Purpose: Lazy-load and cache ModelInterface instances.
get(model_cfg: ModelConfig) → ModelInterface
Returns the cached instance for model_cfg.name, or creates one on the first call.
HuggingFace instances are cached in their own slot so GPU weights are loaded exactly once.
Purpose: Credential resolution, key file loading, auto-routing, and model listing.
_PROJECT_KEYS_FILE = Path(__file__).parent.parent.parent.parent / 'keys.yaml'load_keys_file(path=None) → dict
Lookup order: explicit path → COEVAL_KEYS_FILE env var → project root keys.yaml → ~/.coeval/keys.yaml.
resolve_provider_keys(keys_path=None) → dict
Merges key file entries with environment variables into a flat {provider: key} dict.
load_provider_pricing(path=None) → dict
Loads Config/provider_pricing.yaml; falls back to {} on missing file.
resolve_auto_interface(model_id, provider_keys) → str | None
Scans auto_routing in the pricing YAML; returns the first interface whose fragment matches the model ID and whose credentials are present.
Purpose: Shared helpers used by all five phases.
Three-strategy extraction:
json.loads(text)directly.- Strip leading prose — find first
{or[, parse from there. - Bracket window — find first
{and last}(or[/]), parse that substring.
Single-element lists [{...}] are unwrapped to {...} after a successful parse.
Calls iface.generate(), strips markdown code fences, then calls _extract_json().
Retries on JSONDecodeError with doubling delay. Re-raises immediately on non-JSON errors.
Expects a single word from valid_words ({'High', 'Medium', 'Low'} by default).
Strips whitespace and punctuation. Used by Phase 5 per_factor mode.
Normalises teacher output. Tries multiple key aliases for both the prompt and response fields.
Raises KeyError with a descriptive message if either field cannot be resolved.
Union of multiple attribute maps. New values per key are appended; existing values are not duplicated.
Union of rubric dicts. First occurrence wins; later rubrics can only add new factors.
Tracks remaining API call budget per model. is_exhausted(name) / consume(name).
run_phase1() calls _resolve_attrs() twice per task (for 'target' and 'nuanced' attribute types). Decision: static dict → write directly; auto / complete → call all teachers via call_llm_json, merge with merge_attr_maps. Prompt IDs: map_target_attrs, map_nuanced_attrs.
run_phase2() calls _resolve_rubric() per task. Decision: static dict → write directly; auto → call teachers, merge; extend → prepend existing rubric before merge (its factors take priority). Prompt ID: autorubric.
run_phase3() processes every (task, teacher) pair. For each pair:
Keep→ skip if file exists.Model→ skip if file exists.Extend→ generate only the missing items (total - existing_count).New→ generate alltotalitems.
For each datapoint: _sample_attrs() → get_prompt('sample') → call_llm_json() → extract_prompt_response() → storage.append_datapoint().
Benchmark interface teachers are skipped (data is pre-ingested).
_MAX_WORKERS = 10 for concurrent generation via ThreadPoolExecutor.
Processes every (task, teacher, student) triple. Checks get_responded_datapoint_ids() to skip already-written items on resume. Batch-capable interfaces use create_batch_runner() when batch.{interface}.response_collection is enabled. HuggingFace runs sequentially.
Processes every (task, teacher, judge) triple. Checks get_evaluated_response_ids() to skip on resume. Supports both evaluation_mode: single (one call per response, all rubric dimensions at once) and evaluation_mode: per_factor (one call per rubric dimension per response). Batch-capable interfaces use create_batch_runner() when batch.{interface}.evaluation is enabled.
Purpose: Entry point for coeval analyze. Orchestrates loading and report generation.
run_analyze(run_path, out_path, subcommand, judge_selection, agreement_metric, agreement_threshold, teacher_score_formula, benchmark_format, partial_ok, log_level) → int
- Validates
run_pathhasmeta.jsonandphase5_evaluations/. - Calls
load_ees(run_path)→EESDataModel. - Dispatches to the appropriate report writer based on
subcommand. - Returns 0 on success, 1 on error.
Subcommands: all, dashboard, student-report, teacher-report, judge-report, judge-consistency, coverage-summary, score-distribution, interaction-matrix, robust-summary, complete-report.
all runs every report writer in sequence.
Purpose: Read Phase 5 evaluation JSONL files and assemble an in-memory data model.
Reads all *.evaluations.jsonl files from phase5_evaluations/, parses each record into an EvalRecord, and produces an EESDataModel.
EvalRecord
One Phase 5 record: response_id, datapoint_id, task_id, teacher_model_id, student_model_id, judge_model_id, scores (dict), evaluated_at, valid, error_codes, is_self_judging, is_self_teaching.
AnalyticalUnit
The primary analytical unit: one (response, rubric_aspect) pair with its normalised score. All metrics operate on lists of AnalyticalUnit.
EESDataModel
Unified in-memory model: run_path, meta, tasks, teachers, students, judges, records, analytical_units, plus convenience accessors.
Each report module exports one top-level function:
write_<report_name>(model: EESDataModel, out_dir: Path, shared_plotly: Path | None) → Noneshared_plotly, if given, copies the Plotly JS file to a shared location instead of embedding it inline (saves disk space when generating multiple reports).
| Module | Function | Output file |
|---|---|---|
index_page.py |
write_index_page |
index.html (main dashboard) |
summary_report.py |
write_run_summary |
summary/index.html |
student_report.py |
write_student_report |
student_report/index.html |
teacher_report.py |
write_teacher_report |
teacher_report/index.html |
judge_report.py |
write_judge_report |
judge_report/index.html |
consistency.py |
write_judge_consistency |
judge_consistency/index.html |
coverage.py |
write_coverage_summary |
coverage_summary/index.html |
score_dist.py |
write_score_distribution |
score_distribution/index.html |
interaction.py |
write_interaction_matrix |
interaction_matrix/index.html |
robust.py |
write_robust_summary |
robust_summary/index.html |
excel.py |
write_complete_report |
complete_report.xlsx |
Shared HTML utilities live in html_base.py:
get_plotly_js(cache_dir)— downloads or cachesplotly.min.js(no CDN at render time).build_html_page(title, body_html, plotly_js)— wraps content in a standard HTML shell.
Single task, two models (teacher + student + judge)
YAML config
│
▼
load_config() ──► CoEvalConfig
│
▼
run_experiment()
│
├── Phase 1: Teacher calls map_target_attrs prompt
│ └── Writes: phase1_attributes/sentiment.target_attrs.json
│ phase1_attributes/sentiment.nuanced_attrs.json
│
├── Phase 2: Teacher calls autorubric prompt
│ └── Writes: phase2_rubric/sentiment.rubric.json
│
├── Phase 3: Teacher samples attributes → calls sample prompt per item
│ └── Writes: phase3_datapoints/sentiment__teacher1.datapoints.jsonl
│ (50 records, each: {id, prompt, reference_response, sampled_attrs})
│
├── Phase 4: Student receives each prompt → calls test prompt
│ └── Writes: phase4_responses/sentiment__teacher1__student1.responses.jsonl
│ (50 records, each: {response_id, datapoint_id, response, ...})
│
└── Phase 5: Judge scores each response against rubric
└── Writes: phase5_evaluations/sentiment__teacher1__judge1.evaluations.jsonl
(50 records × N rubric factors)
(each: {response_id, scores: {factor: High|Medium|Low}, ...})
coeval analyze all
│
▼
load_ees() → EESDataModel
│
├── write_index_page() → reports/index.html
├── write_student_report() → reports/student_report/index.html
├── write_teacher_report() → reports/teacher_report/index.html
├── write_judge_report() → reports/judge_report/index.html
├── ...
└── write_complete_report() → reports/complete_report.xlsx
- Alphanumeric, hyphens, underscores; no spaces. Validated by
_MODEL_NAME_RE/_TASK_NAME_RE. - Used verbatim in file names — keep them short and readable.
Phase 3 datapoint: {task_id}__{teacher_id}__{seq:05d} e.g. sentiment__gpt4o__00042
Phase 4 response: {datapoint_id}__s__{student_id} e.g. sentiment__gpt4o__00042__s__claude
Phase 5 evaluation: {response_id}__j__{judge_id} e.g. sentiment__gpt4o__00042__s__claude__j__gpt4o
The double-underscore __ separator is reserved; single underscores are safe within component names.
phase3_datapoints/ {task}__{teacher}.datapoints.jsonl
phase4_responses/ {task}__{teacher}__{student}.responses.jsonl
phase5_evaluations/ {task}__{teacher}__{judge}.evaluations.jsonl
| Layer | Strategy |
|---|---|
| Validation (config) | Return list of error strings; caller decides |
| LLM calls (transient) | Exponential backoff, up to 3 retries, then re-raise |
| LLM calls (fatal) | Re-raise immediately (invalid key, model not found) |
| JSON parsing | call_llm_json retries up to 3 times on JSONDecodeError |
| Phase failures | Individual item failures are logged; phase continues; raises RuntimeError only if zero useful output was produced |
| Filesystem | ExperimentStorage raises FileExistsError on conflict without continue_in_place |
| Batch jobs | RuntimeError on non-terminal batch failure state; in-flight batch IDs tracked in pending_batches.json for recovery |
Phases never swallow exceptions silently — every error is logged with model name, task, and error text before any retry or skip.
Create Code/runner/interfaces/my_provider_iface.py:
"""MyProvider model interface."""
from __future__ import annotations
import os
from .base import ModelInterface
# Patterns that indicate a transient (retryable) error
_TRANSIENT = ('rate limit', 'timeout', 'connection', '502', '503')
# Patterns that indicate a fatal (non-retryable) error
_FATAL = ('invalid api key', 'authentication', 'model not found')
class MyProviderInterface(ModelInterface):
def __init__(self, access_key: str | None = None) -> None:
self._key = access_key or os.environ.get('MY_PROVIDER_API_KEY', '')
if not self._key:
raise ValueError("MY_PROVIDER_API_KEY is required")
# Import the SDK lazily to avoid hard dependency:
try:
from myprovider import Client
self._client = Client(api_key=self._key)
except ImportError:
raise ImportError("myprovider SDK is required: pip install myprovider")
def generate(self, prompt: str, parameters: dict) -> str:
import time
model = parameters.get('model', 'default-model')
temp = float(parameters.get('temperature', 0.7))
max_tok = int(parameters.get('max_tokens', 512))
sys_p = parameters.get('system_prompt')
for attempt in range(3):
try:
response = self._client.complete(
model=model, prompt=prompt,
temperature=temp, max_tokens=max_tok,
system=sys_p,
)
return response.text
except Exception as exc:
msg = str(exc).lower()
if any(s in msg for s in _FATAL):
raise
if any(s in msg for s in _TRANSIENT) and attempt < 2:
time.sleep(2 ** attempt)
continue
raise
raise RuntimeError("MyProvider: exhausted retries")In Code/runner/interfaces/pool.py, add a branch to _create():
elif interface == 'my_provider':
from .my_provider_iface import MyProviderInterface
return MyProviderInterface(access_key=access_key)Add 'my_provider' to VALID_INTERFACES:
VALID_INTERFACES = {
...,
'my_provider',
}Add the env var lookup in resolve_provider_keys():
if not keys.get('my_provider'):
v = os.environ.get('MY_PROVIDER_API_KEY')
if v:
keys['my_provider'] = velif interface == 'my_provider':
from .my_provider_iface import MyProviderInterface
iface = MyProviderInterface(access_key=access_key)
iface.generate("ping", {"model": model_cfg.parameters.get("model", ""), "max_tokens": 1})providers:
my_provider:
interface: my_provider
batch_discount: 1.0 # 1.0 = no batch discount
models:
my-model-v1:
input: 0.50 # USD per million input tokens
output: 1.50If the provider SDK is optional:
[project.optional-dependencies]
my_provider = ["myprovider>=1.0"]Create Tests/runner/test_my_provider.py following the pattern in test_new_providers.py:
mock sys.modules['myprovider'] with patch.dict, verify generate() returns a string.
Create Code/runner/interfaces/my_provider_batch.py. Implement add(), run(), __len__(), clear():
class MyProviderBatchRunner:
def __init__(self, access_key=None, poll_seconds=60, **kwargs):
self._key = access_key
self._poll = poll_seconds
self._requests: list[dict] = []
self._id_to_key: dict[str, str] = {}
def add(self, key: str, prompt: str, params: dict) -> None:
custom_id = f"r{len(self._requests)}"
self._id_to_key[custom_id] = key
self._requests.append({"id": custom_id, "prompt": prompt, **params})
def run(self, description="", logger=None, storage=None, phase="") -> dict[str, str]:
if not self._requests:
return {}
# 1. Submit batch to provider
# 2. Poll until terminal status
# 3. Download and parse results
# 4. self.clear()
# 5. Return {user_key: response_text}
...
def __len__(self): return len(self._requests)
def clear(self):
self._requests.clear()
self._id_to_key.clear()from .my_provider_batch import MyProviderBatchRunner
def create_batch_runner(interface, access_key=None, **kwargs):
...
elif interface == 'my_provider':
return MyProviderBatchRunner(access_key=access_key, **kwargs)Add 'my_provider' to BATCH_CAPABLE_INTERFACES.
Follow the pattern in Tests/runner/test_batch_runners.py: mock the provider SDK via sys.modules, test add() / run() / polling / error cases.
Create Code/analyzer/reports/my_report.py:
"""My Custom Report — REQ-A-X.X."""
from __future__ import annotations
from pathlib import Path
from ..loader import EESDataModel
from .html_base import build_html_page, get_plotly_js
def write_my_report(
model: EESDataModel,
out_dir: Path,
shared_plotly: Path | None = None,
) -> None:
out_dir.mkdir(parents=True, exist_ok=True)
# Build your HTML content (use Plotly figures for interactivity)
import plotly.graph_objects as go
fig = go.Figure(...)
plot_html = fig.to_html(full_html=False, include_plotlyjs=False)
# Get Plotly JS (cached; no network call if already downloaded)
plotly_js = get_plotly_js(shared_plotly or out_dir.parent)
body = f"<h1>My Report</h1>\n{plot_html}"
html = build_html_page(title="My Report", body_html=body, plotly_js=plotly_js)
(out_dir / 'index.html').write_text(html, encoding='utf-8')from .reports.my_report import write_my_report
# Add to the dispatch table:
'my-report': lambda: write_my_report(data_model, out_dir / 'my_report', shared_plotly),Also add 'my-report' to the all subcommand sequence.
Add 'my-report' to the choices list for the analyze subcommand.
In Code/analyzer/reports/index_page.py, add a card linking to my_report/index.html.
Add test class to Tests/analyzer/test_analyze_reports.py following the existing pattern: create minimal EESDataModel in a tmp_path, call write_my_report(), assert index.html was created and contains expected markers.
Benchmark loaders convert a HuggingFace dataset into CoEval Phase 3 JSONL format.
Create Public/benchmark/loaders/my_dataset.py:
"""MyDataset benchmark loader."""
from __future__ import annotations
from pathlib import Path
from .base import BenchmarkLoader
class MyDatasetLoader(BenchmarkLoader):
"""Loads MyDataset for the <task_name> task."""
DATASET_NAME = "org/my-dataset" # HuggingFace dataset ID
DEFAULT_SPLIT = "validation"
def load(
self,
out_path: Path,
attribute_map_path: Path | None,
sample_size: int = 500,
split: str | None = None,
) -> int:
"""Download, sample, and write Phase 3 JSONL.
Returns the number of records written.
"""
from datasets import load_dataset
ds = load_dataset(self.DATASET_NAME, split=split or self.DEFAULT_SPLIT)
if sample_size and len(ds) > sample_size:
ds = ds.shuffle(seed=42).select(range(sample_size))
out_path.parent.mkdir(parents=True, exist_ok=True)
count = 0
with out_path.open('w', encoding='utf-8') as fh:
for i, example in enumerate(ds):
# Build the prompt and reference response from the dataset fields
prompt = example['input_text']
reference = example['target_text']
# Infer attributes from the example
attrs = self._infer_attributes(example, attribute_map_path)
record = {
"id": f"my_dataset__{i:05d}",
"prompt": prompt,
"reference_response": reference,
"sampled_target_attributes": attrs,
"source": "my_dataset",
"interface": "benchmark",
}
fh.write(__import__('json').dumps(record, ensure_ascii=False) + '\n')
count += 1
return count
def _infer_attributes(self, example, attribute_map_path) -> dict:
# Map dataset fields to CoEval attribute keys + values
return {"difficulty": "medium"}from .my_dataset import MyDatasetLoader
_REGISTRY = {
...,
'my_dataset': MyDatasetLoader,
}Create Public/benchmark/configs/my_dataset_attribute_map.yaml:
difficulty: [easy, medium, hard]
domain: [science, history, general]Pass this path as attribute_map_path when calling load_benchmark('my_dataset', ...).
In Public/benchmark/setup_mixed.py (or create a dedicated setup script):
from benchmark.loaders import load_benchmark
load_benchmark('my_dataset', out_path=Path('Runs/my-run/phase3_datapoints/...'), ...)Add tests to Tests/benchmark/ mocking datasets.load_dataset and verifying the JSONL output format.
Create Code/runner/phases/phaseN.py following the standard signature:
def run_phaseN(cfg, storage, logger, pool, quota, phase_mode) -> None:
...Write output via storage methods (add new ones to ExperimentStorage if needed).
In Code/runner/storage.py, add read/write/exists methods for the new phase's artifact type.
from .phases.phaseN import run_phaseN
_PHASE_RUNNERS = {
...,
'my_new_phase': run_phaseN,
}Add 'my_new_phase' to PHASE_IDS in config.py.
In config.py, add the new phase ID to any validation rules that check phase names.
# Full suite (excludes Playwright)
pytest Tests/ -q
# Runner unit tests only
pytest Tests/runner/ -v
# Benchmark tests only
pytest Tests/benchmark/ -v
# Analyzer tests only
pytest Tests/analyzer/ -v
# Structural integrity (layout, imports, path constants)
pytest Tests/test_structural_integrity.py -v
# Memory-capped run (kills if RSS > 3 GB)
python scripts/run_tests_safe.py Tests/runner Tests/benchmark -q
# Playwright visual tests (requires: playwright install chromium)
pytest Tests/analyzer/test_reports_playwright.py -vTests/
├── runner/
│ ├── test_config.py # Config parsing and validation rules V-01..V-17
│ ├── test_storage.py # ExperimentStorage filesystem methods
│ ├── test_storage_extended.py # Batch tracking, meta updates, error records
│ ├── test_prompts.py # Template resolution and slot filling
│ ├── test_utils.py # _extract_json, call_llm_json/word, merge helpers
│ ├── test_phase4_phase5.py # QuotaTracker, response/evaluation accumulation
│ ├── test_label_eval.py # LabelEvaluator metrics
│ ├── test_probe_and_estimator.py # Probe modes, PRICE_TABLE, cost estimation
│ ├── test_auto_interface_and_pricing.py # Auto-routing, pricing YAML, dual-track config
│ ├── test_new_providers.py # All 18 interface adapters (mocked)
│ ├── test_batch_runners.py # BedrockBatchRunner and VertexBatchRunner
│ ├── test_benchmarks.py # Benchmark interface adapter
│ ├── test_repair.py # Repair command logic
│ └── test_commands.py # CLI subcommand dispatch
├── benchmark/
│ └── test_compute_scores.py # Benchmark score computation
├── analyzer/
│ ├── test_loader.py # EESDataModel loading from JSONL
│ ├── test_metrics.py # ICC, kappa, score normalisation
│ └── test_analyze_reports.py # All 11 report writers (55 tests)
└── test_structural_integrity.py # Layout, imports, path constants, CLI smoke test
Mock optional SDKs via sys.modules to run tests without installing provider packages:
from unittest.mock import MagicMock, patch
def test_my_interface():
mock_sdk = MagicMock()
with patch.dict(sys.modules, {'myprovider': mock_sdk}):
from runner.interfaces.my_provider_iface import MyProviderInterface
iface = MyProviderInterface(access_key='test-key')
mock_sdk.Client.return_value.complete.return_value.text = "hello"
result = iface.generate("ping", {"model": "m", "max_tokens": 5})
assert result == "hello"Delete MagicMock objects explicitly after the with block to avoid reference cycles:
del mock_sdkUse tmp_path for filesystem tests — pytest injects a fresh temporary directory per test.
The root conftest.py runs gc.collect() after every test to reclaim mock cycles.
Q: Where is the pricing data for cost estimation?
Config/provider_pricing.yaml at the project root. The path is embedded in cost_estimator.py as Path(__file__).parent.parent.parent.parent / 'Config' / 'provider_pricing.yaml' (4 parent levels from Code/runner/interfaces/ to the project root).
Q: Where are provider credentials looked up?
In order: --keys PATH flag → COEVAL_KEYS_FILE env var → {project_root}/keys.yaml → ~/.coeval/keys.yaml. The project root path is Path(__file__).parent.parent.parent.parent / 'keys.yaml' in registry.py.
Q: Why do test dirs have no __init__.py?
Presence of __init__.py in test directories causes pytest to install them as part of the runner package. That triggers MagicMock trees to be scanned on every import runner, causing memory explosions on Windows. The --import-mode=importlib flag in pyproject.toml makes pytest work correctly without __init__.py.
Q: How do I add interface: auto routing for a new provider?
Add entries to the auto_routing section of Config/provider_pricing.yaml:
auto_routing:
my-model:
interface: my_provider
priority: 5Model IDs are matched by substring (longest match wins).
Q: How does --continue avoid duplicate API calls?
Phase 4 calls storage.get_responded_datapoint_ids() before submitting each student. Phase 5 calls storage.get_evaluated_response_ids() before each judge. Both return set[str] of already-written IDs; the phase simply skips any item whose ID is in the set.
Q: Can the analyzer be run independently of the runner?
Yes. Code/analyzer/main.py::run_analyze(run_path, ...) needs only a completed run folder with meta.json and phase5_evaluations/. It has no dependency on the runner package at runtime.
See also: Concepts Glossary · CLI Reference · Configuration Guide · Architecture