Skip to content

Latest commit

 

History

History
1321 lines (1025 loc) · 40.4 KB

File metadata and controls

1321 lines (1025 loc) · 40.4 KB

API Reference

Table of Contents


Type Aliases

Messages

Messages = str | list[ChatMessage]

The primary message type. Either a plain string (completion mode) or a list of chat messages (chat mode).

ChatMessage

ChatMessage = ChatCompletionMessageParam  # from openai.types.chat

OpenAI's chat message type with role, content, and optional tool_calls / tool_call_id fields.

SystemMessage

class SystemMessage:
    role: Literal["system"] = "system"
    content: MessageContent

    @classmethod
    def from_path(cls, path: str | Path) -> "SystemMessage": ...

Provider-agnostic system message type. Use vf.SystemMessage.from_path(...) to load a system prompt from a UTF-8 text file while preserving the file contents verbatim.

Info

Info = dict[str, Any]

Arbitrary metadata dictionary from dataset rows.

SamplingArgs

SamplingArgs = dict[str, Any]

Generation parameters passed to the inference server (e.g., temperature, top_p, max_tokens).

RewardFunc

IndividualRewardFunc = Callable[..., float | Awaitable[float]]
GroupRewardFunc = Callable[..., list[float] | Awaitable[list[float]]]
RewardFunc = IndividualRewardFunc | GroupRewardFunc

Individual reward functions operate on single rollouts. Group reward functions operate on all rollouts for an example together (useful for relative scoring).

ClientType

ClientType = Literal[
    "openai_completions",
    "openai_chat_completions",
    "openai_chat_completions_token",
    "openai_responses",
    "renderer",
    "anthropic_messages",
    "nemorl_chat_completions",
]

Selects which Client implementation to use. Set via ClientConfig.client_type.


Data Types

State

class State(dict):
    INPUT_FIELDS = ["prompt", "answer", "info", "example_id"]

A dict subclass that tracks rollout information. Accessing keys in INPUT_FIELDS automatically forwards to the nested input object.

Fields set during initialization:

Field Type Description
input RolloutInput Nested input data
client Client Client instance
model str Model name
sampling_args SamplingArgs | None Generation parameters
is_completed bool Whether rollout has ended
is_truncated bool Whether generation was truncated
tool_defs list[Tool] | None Available tool definitions
trajectory list[TrajectoryStep] Multi-turn trajectory
trajectory_id str UUID for this rollout
timing RolloutTiming Timing information

Fields set after scoring:

Field Type Description
completion Messages | None Final completion
reward float | None Final reward
advantage float | None Advantage over group mean
metrics dict[str, float] | None Per-function metrics
stop_condition str | None Name of triggered stop condition
error Error | None Error if rollout failed

RolloutInput

class RolloutInput(TypedDict):
    prompt: Messages        # Required
    example_id: int         # Required
    answer: str             # Optional
    info: Info              # Optional

RolloutOutput

class RolloutOutput(dict):
    # Required fields
    example_id: int
    prompt: Messages | None
    completion: Messages | None
    reward: float
    timing: RolloutTiming
    is_completed: bool
    is_truncated: bool
    metrics: dict[str, float]
    # Optional fields
    answer: str
    info: Info
    error: str | None
    stop_condition: str | None
    token_usage: TokenUsage
    trajectory: list[TrajectoryStep]
    tool_defs: list[Tool] | None

Serialized output from a rollout. This is a dict subclass that provides typed access to known fields while supporting arbitrary additional fields from state_columns. All values must be JSON-serializable. Used in GenerateOutputs and for saving results to disk.

TrajectoryStep

class TrajectoryStep(TypedDict):
    prompt: Messages
    completion: Messages
    response: Response
    tokens: TrajectoryStepTokens | None
    reward: float | None
    advantage: float | None
    is_truncated: bool
    trajectory_id: str
    extras: dict[str, Any]

A single turn in a multi-turn rollout.

TrajectoryStepTokens

class TrajectoryStepTokens(TypedDict):
    prompt_ids: list[int]
    prompt_mask: list[int]
    completion_ids: list[int]
    completion_mask: list[int]
    completion_logprobs: list[float]
    overlong_prompt: bool
    is_truncated: bool
    routed_experts: list[list[list[int]]] | None  # [seq_len, layers, topk] to enable router replay
    multi_modal_data: NotRequired[Any]  # renderers.MultiModalData sidecar (pixel_values, placeholder ranges) — set only on multimodal rollouts

Token-level data for training.

TimeSpan

class TimeSpan(CustomBaseModel):
    """A timed span. duration = end - start."""
    start: float = 0.0   # Unix timestamp (seconds since epoch)
    end: float = 0.0     # Unix timestamp (seconds since epoch)
    # duration: float    (computed_field)

TimeSpans

class TimeSpans(CustomBaseModel):
    """A list of TimeSpan with aggregate duration (sum)."""
    spans: list[TimeSpan] = []
    # duration: float    (computed_field)

RolloutTiming

class RolloutTiming(CustomBaseModel):
    """Rollout-level timing. All values in seconds."""
    start_time: float                       # wall-clock at rollout start
    setup: TimeSpan = TimeSpan()            # setup_state() span
    generation: TimeSpan = TimeSpan()       # full generation phase
    scoring: TimeSpan = TimeSpan()          # rubric.score_*() span
    model: TimeSpans = TimeSpans()          # all model-call spans
    env: TimeSpans = TimeSpans()            # all env-response spans
    # total, overhead: float                (computed_fields)

Derivations:

  • total = scoring.end - generation.start
  • overhead = total - setup.duration - model.duration - env.duration - scoring.duration

generation.start is stamped at the top of the rollout (before setup_state), so total covers the entire rollout including setup, generation loop, finalize, and scoring. overhead captures any time not attributed to the named phases.

TokenUsage

class TokenUsage(TypedDict, total=False):
    input_tokens: float
    output_tokens: float
    final_input_tokens: float
    final_output_tokens: float
Field Description
input_tokens Sum of prompt tokens across all turns. Shared context is counted each time it appears in a prompt.
output_tokens Sum of completion tokens across all turns.
final_input_tokens Non-completion tokens in the final turn's context (system prompts, user messages, tool results, etc.).
final_output_tokens Completion tokens in the final turn's context. Equals output_tokens for single-turn rollouts.

In a single-turn rollout, input_tokens == final_input_tokens and output_tokens == final_output_tokens. In a multi-turn rollout, input_tokens > final_input_tokens because earlier turns' prompts are counted again.

The final_* metrics assume a single, continuously extended trajectory. Non-linear trajectories (multi-agent, context summarization, history rewriting) are not accounted for.

GenerateOutputs

class GenerateOutputs(TypedDict):
    outputs: list[RolloutOutput]
    metadata: GenerateMetadata

Output from Environment.generate(). Contains a list of RolloutOutput objects (one per rollout) and generation metadata. Each RolloutOutput is a serialized, JSON-compatible dict containing the rollout's prompt, completion, answer, reward, metrics, timing, and other per-rollout data.

GenerateMetadata

class VersionInfo(TypedDict):
    vf_version: str
    vf_commit: str | None
    env_version: str | None
    env_commit: str | None

class GenerateMetadata(TypedDict):
    env_id: str
    name: NotRequired[str]
    env_args: dict
    model: str
    base_url: str
    num_examples: int
    rollouts_per_example: int
    sampling_args: SamplingArgs
    date: str
    time_ms: float
    avg_reward: float
    avg_metrics: dict[str, float]
    avg_error: float
    pass_at_k: dict[str, float]
    pass_all_k: dict[str, float]
    pass_threshold: float
    usage: TokenUsage | None
    version_info: VersionInfo
    state_columns: list[str]
    path_to_save: Path
    tools: list[Tool] | None

base_url is always serialized as a string. For multi-endpoint runs (e.g., using ClientConfig.endpoint_configs), it is stored as a comma-separated list of URLs.

version_info captures the verifiers framework version/commit and the environment package version/commit at generation time. Populated automatically by GenerateOutputsBuilder.

RolloutScore / RolloutScores

class RolloutScore(TypedDict):
    reward: float
    metrics: dict[str, float]

class RolloutScores(TypedDict):
    reward: list[float]
    metrics: dict[str, list[float]]

Classes

Environment Classes

Environment

class Environment(ABC):
    def __init__(
        self,
        dataset: Dataset | None = None,
        eval_dataset: Dataset | None = None,
        system_prompt: str | None = None,
        few_shot: list[ChatMessage] | None = None,
        parser: Parser | None = None,
        rubric: Rubric | None = None,
        sampling_args: SamplingArgs | None = None,
        message_type: MessageType = "chat",
        max_workers: int = 512,
        env_id: str | None = None,
        env_args: dict | None = None,
        max_seq_len: int | None = None,
        score_rollouts: bool = True,
        pass_threshold: float = 0.5,
        **kwargs,
    ): ...

Abstract base class for all environments.

Generation methods:

Method Returns Description
generate(inputs, client, model, ...) GenerateOutputs Run rollouts asynchronously. client accepts Client | ClientConfig.
generate_sync(inputs, client, ...) GenerateOutputs Synchronous wrapper
evaluate(client, model, ...) GenerateOutputs Evaluate on eval_dataset
evaluate_sync(client, model, ...) GenerateOutputs Synchronous evaluation

Dataset methods:

Method Returns Description
get_dataset(n=-1, seed=None) Dataset Get training dataset (optionally first n, shuffled)
get_eval_dataset(n=-1, seed=None) Dataset Get evaluation dataset
make_dataset(...) Dataset Static method to create dataset from inputs

Rollout methods (used internally or by subclasses):

Method Returns Description
rollout(input, client, model, sampling_args) State Abstract: run single rollout
init_state(input, client, model, sampling_args) State Create initial state from input
get_model_response(state, prompt, ...) Response Get model response for prompt
is_completed(state) bool Check all stop conditions
run_rollout(sem, input, client, model, sampling_args) State Run rollout with semaphore
run_group(group_inputs, client, model, ...) list[State] Generate and score one group

Configuration methods:

Method Description
set_kwargs(**kwargs) Set attributes using setter methods when available
set_concurrency(concurrency) Set concurrency and scale all registered thread-pool executors to match
add_rubric(rubric) Add or merge rubric
set_max_seq_len(max_seq_len) Set maximum sequence length
set_score_rollouts(bool) Enable/disable scoring

SingleTurnEnv

Single-response Q&A tasks. Inherits from Environment.

MultiTurnEnv

class MultiTurnEnv(Environment):
    def __init__(
        self,
        max_turns: int = -1,
        timeout_seconds: float | None = None,
        **kwargs,
    ): ...

Multi-turn interactions. Subclasses must implement env_response.

Abstract method:

async def env_response(self, messages: Messages, state: State, **kwargs) -> Messages:
    """Generate environment feedback after model turn."""

Built-in stop conditions: has_error, prompt_too_long, max_turns_reached, timeout_reached, max_total_completion_tokens_reached, has_final_env_response

Hooks:

Method Description
setup_state(state) Initialize per-rollout state
get_prompt_messages(state) Customize prompt construction
render_completion(state) Customize completion rendering
add_trajectory_step(state, step) Customize trajectory handling
set_max_total_completion_tokens(int) Set maximum total completion tokens

ToolEnv

class ToolEnv(MultiTurnEnv):
    def __init__(
        self,
        tools: list[Callable] | None = None,
        max_turns: int = 10,
        error_formatter: Callable[[Exception], str] = lambda e: f"{e}",
        stop_errors: list[type[Exception]] | None = None,
        **kwargs,
    ): ...

Tool calling with stateless Python functions. Automatically converts functions to OpenAI tool format.

Built-in stop condition: no_tools_called (ends when model responds without tool calls)

Methods:

Method Description
add_tool(tool) Add a tool at runtime
remove_tool(tool) Remove a tool at runtime
call_tool(name, args, id) Override to customize tool execution

StatefulToolEnv

Tools requiring per-rollout state. Override setup_state and update_tool_args to inject state.

SandboxEnv

class SandboxEnv(StatefulToolEnv):
    def __init__(
        self,
        sandbox_name: str = "sandbox-env",
        docker_image: str = "python:3.11-slim",
        start_command: str = "tail -f /dev/null",
        cpu_cores: int = 1,
        memory_gb: int = 2,
        disk_size_gb: int = 5,
        gpu_count: int = 0,
        timeout_minutes: int = 60,
        timeout_per_command_seconds: int = 30,
        environment_vars: dict[str, str] | None = None,
        team_id: str | None = None,
        advanced_configs: AdvancedConfigs | None = None,
        labels: list[str] | None = None,
        **kwargs,
    ): ...

Sandboxed container execution using prime sandboxes.

Key parameters:

Parameter Type Description
sandbox_name str Name prefix for sandbox instances
docker_image str Docker image to use for the sandbox
cpu_cores int Number of CPU cores
memory_gb int Memory allocation in GB
disk_size_gb int Disk size in GB
gpu_count int Number of GPUs
timeout_minutes int Sandbox timeout in minutes
timeout_per_command_seconds int Per-command execution timeout
environment_vars dict[str, str] | None Environment variables to set in sandbox
labels list[str] | None Labels for sandbox categorization and filtering

PythonEnv

Persistent Python REPL in sandbox. Extends SandboxEnv.

OpenEnvEnv

class OpenEnvEnv(MultiTurnEnv):
    def __init__(
        self,
        openenv_project: str | Path | None = None,
        num_train_examples: int = 100,
        num_eval_examples: int = 50,
        seed: int = 0,
        prompt_renderer: Callable[..., Messages] | None = None,
        max_turns: int = -1,
        rubric: Rubric | None = None,
        **kwargs,
    ): ...

OpenEnv integration that runs OpenEnv projects in Prime Sandboxes using a prebuilt image manifest (.build.json), supports both gym and MCP contracts, and requires a prompt_renderer to convert observations into chat messages.

SWEDebugEnv

class SWEDebugEnv(SandboxMixin, MultiTurnEnv):
    def __init__(
        self,
        taskset: SandboxTaskSet,
        dataset: Any = None,
        *,
        run_setup: bool = True,
        debug_step: Literal["none", "gold_patch", "command", "script"] = "gold_patch",
        run_tests: bool = True,
        debug_command: str | None = None,
        debug_script: str | None = None,
        debug_script_path: str | None = None,
        debug_timeout: int | None = None,
        test_timeout: int = 900,
        cpu_cores: int | None = None,
        memory_gb: int | None = None,
        disk_size_gb: int | None = None,
        labels: list[str] | None = None,
        timeout_seconds: float = 1800.0,
        output_tail_chars: int = 2000,
        **sandbox_kwargs,
    ): ...

No-agent debugger for SWE-style SandboxTaskSet instances. It creates the task sandbox, optionally runs task setup, runs one debug step (none, gold_patch, command, or script), and optionally runs tests and scores the result.

EnvGroup

env_group = vf.EnvGroup(
    envs=[env1, env2, env3],
    env_names=["math", "code", "qa"]  # optional
)

Combines multiple environments for mixed-task training. Combined datasets use info["env_id"] as internal routing metadata; it is not a top-level input, state, or output field.


v1 Taskset/Harness Classes

The v1 API is exposed as verifiers.v1 and documented in BYO Harness. Its core unit is:

state = await harness.run(task, state=None)

Taskset and Env package that runner for datasets, evals, and training.

Task

class Task(dict):
    def freeze(self) -> Task: ...

Immutable, JSON-serializable input data. A task is usually created by a Taskset, but can be run directly through a standalone Harness.

Common top-level fields:

Field Description
prompt User/developer/tool messages for the rollout. Must not contain system messages.
system_prompt Per-task system messages or string.
answer Reference answer or target data. Stays on task, not state.
info Serializable metadata.
max_turns Per-task base-loop turn limit.
tools Tool visibility: {"show": [...]} or {"hide": [...]}.
toolsets Toolset visibility or rollout-local toolset config.
sandbox Per-task sandbox overrides for sandboxed programs.
program Task-owned files, dirs, env, setup, artifacts, bindings, and command args.

task.runtime is not public schema. Runtime metadata belongs on State.

State

class State(dict):
    @classmethod
    def for_task(task: Mapping[str, Any], ...) -> State: ...
    def stop(self, condition: str = "state_done") -> None: ...
    def get_model(self) -> str: ...
    def get_client(api: str = "chat_completions", *, sync: bool = False) -> object: ...
    def get_endpoint_config(api: str = "chat_completions") -> dict[str, str]: ...
    def get_tools() -> dict[str, Callable[..., Awaitable[object]]]: ...
    def get_max_turns(default: int) -> int: ...
    def finalize() -> State: ...

Mutable rollout output. State starts from a task and accumulates trajectory, completion, metrics, reward, timing, artifacts, errors, and user-defined serializable fields.

Framework-managed fields such as is_completed, stop_condition, is_truncated, and error cannot be written directly. Use state.stop(...) or raise vf.Error subclasses.

State.for_task(...) can borrow selected active runtime handles from another state:

child_state = state.for_task(child_task, borrow=["model", "sandbox"], tools="bash")
child_state = await child_harness.run(child_task, child_state)

Borrowed handles are process-local and stripped before state crosses the serialization boundary.

Taskset

class Taskset:
    def __init__(
        source=None,
        eval_source=None,
        taskset_id: str | None = None,
        system_prompt=None,
        user=None,
        toolsets=(),
        stops=(),
        setups=(),
        updates=(),
        metrics=(),
        rewards=(),
        advantages=(),
        cleanups=(),
        config: TasksetConfig | Mapping[str, object] | None = None,
    ): ...

    def rows() -> list[dict[str, Any]]: ...
    def eval_rows() -> list[dict[str, Any]]: ...
    def task(row: Mapping[str, Any]) -> Task: ...
    def to_task(value: Mapping[str, Any] | Task | str) -> Task: ...
    async def init_group(task: Task, num_rollouts: int) -> tuple[list[Task], list[State]]: ...
    def get_dataset() -> Dataset: ...
    def get_eval_dataset() -> Dataset: ...

Packages task rows and task-owned behavior. source and eval_source may be iterables or zero-argument loaders. Loaders should close over resolved config instead of accepting runtime kwargs.

Harness

class Harness:
    def __init__(
        program=None,
        system_prompt=None,
        user=None,
        sandbox=None,
        client=None,
        model: str | None = None,
        sampling_args: SamplingArgs | None = None,
        max_turns: int | None = None,
        toolsets=None,
        stops=None,
        setups=None,
        updates=None,
        metrics=None,
        rewards=None,
        advantages=None,
        cleanups=None,
        config: HarnessConfig | Mapping[str, object] | None = None,
    ): ...

    async def run(task: Task | Mapping[str, Any], state: State | None = None) -> State: ...
    async def score_group(tasks: list[Task], states: list[State]) -> list[State]: ...
    async def cleanup_group(tasks: list[Task], states: list[State]) -> None: ...
    async def teardown() -> None: ...

Runs one task. All model calls go through the v1 interception endpoint so trajectory capture, sampling args, tool forwarding, and protocol translation use one path across local Python, sandboxed Python, command programs, and the base tool loop.

program forms:

Form Meaning
None Default endpoint-backed tool loop.
callable In-process Python program with task, state.
{"base": true, ...} Explicit default loop, usually with sandbox options.
{"fn": "pkg.module:run", ...} Importable Python program.
{"command": ["cmd", "arg"], ...} Local or sandboxed command.

Sandboxed program.fn refs resolve their owning local package from the resolved module root: single-file modules use pyproject.toml in the same directory as the module file, and package modules use pyproject.toml inside the package directory. v1 uploads and installs that package in the program sandbox. Package dependencies come from normal [project.dependencies].

Env

class Env(vf.Environment):
    def __init__(taskset: Taskset, harness: Harness | None = None): ...

Adapter that makes a v1 taskset/harness pair usable by eval and training workers. If harness is omitted, Env uses the base Harness.

Toolset And MCPTool

class Toolset:
    def __init__(
        tools=(),
        show=None,
        hide=None,
        bindings=None,
        objects=None,
        write: bool = False,
        scope: Literal["rollout", "group", "global"] | None = None,
        sandbox=None,
        stops=(),
        setups=(),
        updates=(),
        cleanups=(),
        teardowns=(),
        config: ToolsetConfig | Mapping[str, object] | None = None,
    ): ...

class MCPTool:
    def __init__(command: str, args=None, env=None, cwd: str | None = None): ...

Toolsets package callable tools, MCP servers, private dependency factories, hidden bindings, and tool-owned lifecycle handlers. objects.* bindings are private to the owning toolset/user and are not directly accessible from state. String binding sources are framework paths; literal strings should be bound via callable sources.

v1 Config Models

TasksetConfig.from_toml(path, section=None)
HarnessConfig.from_toml(path, section=None)
ToolsetConfig(...)
SandboxConfig(...)
UserConfig(...)
MCPToolConfig(...)

v1 config models are Pydantic models. Constructors accept config objects or plain mappings; TOML config uses "module:object" refs for Python callables and loaders. Unknown fields fail validation.


Parser Classes

Parser

class Parser:
    def __init__(self, extract_fn: Callable[[str], str] = lambda x: x): ...
    
    def parse(self, text: str) -> Any: ...
    def parse_answer(self, completion: Messages) -> str | None: ...
    def get_format_reward_func(self) -> Callable: ...

Base parser. Default behavior returns text as-is.

XMLParser

class XMLParser(Parser):
    def __init__(
        self,
        fields: list[str | tuple[str, ...]],
        answer_field: str = "answer",
        extract_fn: Callable[[str], str] = lambda x: x,
    ): ...

Extracts structured fields from XML-tagged output.

parser = vf.XMLParser(fields=["reasoning", "answer"])
# Parses: <reasoning>...</reasoning><answer>...</answer>

# With alternatives:
parser = vf.XMLParser(fields=["reasoning", ("code", "answer")])
# Accepts either <code> or <answer> for second field

Methods:

Method Returns Description
parse(text) SimpleNamespace Parse XML into object with field attributes
parse_answer(completion) str | None Extract answer field from completion
get_format_str() str Get format description string
get_fields() list[str] Get canonical field names
format(**kwargs) str Format kwargs into XML string

ThinkParser

class ThinkParser(Parser):
    def __init__(self, extract_fn: Callable[[str], str] = lambda x: x): ...

Extracts content after </think> tag. For models that always include <think> tags but don't parse them automatically.

MaybeThinkParser

Handles optional <think> tags (for models that may or may not think).


Rubric Classes

Rubric

class Rubric:
    def __init__(
        self,
        funcs: list[RewardFunc] | None = None,
        weights: list[float] | None = None,
        parser: Parser | None = None,
    ): ...

Combines multiple reward functions with weights. Default weight is 1.0. Functions with weight=0.0 are tracked as metrics only.

Methods:

Method Description
add_reward_func(func, weight=1.0) Add a reward function
add_metric(func, weight=0.0) Add a metric (no reward contribution)
add_class_object(name, obj) Add object accessible in reward functions

Reward function signature:

def my_reward(
    completion: Messages,
    answer: str = "",
    prompt: Messages | None = None,
    state: State | None = None,
    parser: Parser | None = None,  # if rubric has parser
    info: Info | None = None,
    **kwargs
) -> float:
    ...

Group reward function signature:

def my_group_reward(
    completions: list[Messages],
    answers: list[str],
    states: list[State],
    # ... plural versions of individual args
    **kwargs
) -> list[float]:
    ...

JudgeRubric

LLM-as-judge evaluation.

MathRubric

Math-specific evaluation using math-verify.

RubricGroup

Combines rubrics for EnvGroup.


Client Classes

Client

class Client(ABC, Generic[ClientT, MessagesT, ResponseT, ToolT]):
    def __init__(self, client_or_config: ClientT | ClientConfig) -> None: ...

    @property
    def client(self) -> ClientT: ...

    async def get_response(
        self,
        prompt: Messages,
        model: str,
        sampling_args: SamplingArgs,
        tools: list[Tool] | None = None,
        **kwargs,
    ) -> Response: ...

    async def close(self) -> None: ...

Abstract base class for all model clients. Wraps a provider-specific SDK client and translates between provider-agnostic vf types (Messages, Tool, Response) and provider-native formats. The client property exposes the underlying SDK client (e.g., AsyncOpenAI, AsyncAnthropic).

get_response() is the main public method — it converts the prompt and tools to the native format, calls the provider API, validates the response, and converts it back to a vf.Response. Errors are wrapped in vf.ModelError unless they are already vf.Error or authentication errors.

Abstract methods (for subclass implementors):

Method Description
setup_client(config) Create the native SDK client from ClientConfig
to_native_prompt(messages) Convert Messages → native prompt format + extra kwargs
to_native_tool(tool) Convert Tool → native tool format
get_native_response(prompt, model, ...) Call the provider API
raise_from_native_response(response) Raise ModelError for invalid responses
from_native_response(response) Convert native response → vf.Response
close() Close the underlying SDK client

Built-in Client Implementations

Class client_type SDK Client Description
OpenAIChatCompletionsClient "openai_chat_completions" AsyncOpenAI Chat Completions API (default)
OpenAICompletionsClient "openai_completions" AsyncOpenAI Legacy Completions API
OpenAIChatCompletionsTokenClient "openai_chat_completions_token" AsyncOpenAI Custom vLLM token route (/v1/chat/completions/tokens) — server-side templating + token IDs returned alongside content
OpenAIResponsesClient "openai_responses" AsyncOpenAI OpenAI Responses API
RendererClient "renderer" AsyncOpenAI Renderer-backed token-in generate client (client-side tokenization via the renderers package)
AnthropicMessagesClient "anthropic_messages" AsyncAnthropic Anthropic Messages API
NeMoRLChatCompletionsClient "nemorl_chat_completions" AsyncOpenAI NeMo-RL Chat Completions variant

All built-in clients are available as vf.OpenAIChatCompletionsClient, vf.AnthropicMessagesClient, etc. RendererClient requires the optional renderer package; install it with uv add "verifiers[renderers]" before importing vf.RendererClient or using client_type="renderer".

Response

class Response(BaseModel):
    id: str
    created: int
    model: str
    usage: Usage | None
    message: ResponseMessage

class ResponseMessage(BaseModel):
    content: str | None
    reasoning_content: str | None
    finish_reason: Literal["stop", "length", "tool_calls"] | None
    is_truncated: bool | None
    tokens: ResponseTokens | None
    tool_calls: list[ToolCall] | None

Provider-agnostic model response. All Client implementations return Response from get_response().

Tool

class Tool(BaseModel):
    name: str
    description: str
    parameters: dict[str, object]
    strict: bool | None = None

Provider-agnostic tool definition. Environments define tools using this type; each Client converts them to its native format via to_native_tool().


Configuration Types

v1 Config

class Config(BaseModel):
    def __init__(self, config: object | None = None, /, **data: object): ...

    @classmethod
    def from_config(cls, config: object | None = None, /, **data: object) -> Self: ...

    @classmethod
    def from_toml(
        cls, path: str | Path, section: str | Iterable[str] | None = None
    ) -> Self: ...

class EnvConfig(Config):
    taskset: TasksetConfig
    harness: HarnessConfig

class TasksetConfig(Config):
    taskset_id: str | None = None
    system_prompt: object | None = None
    source: object | None = None
    eval_source: object | None = None
    user: object | None = None

class HarnessConfig(Config):
    program: object | None = None
    system_prompt: object | None = None
    sandbox: SandboxConfig | None = None
    model: str | None = None
    sampling_args: dict[str, object] = {}
    max_turns: int = 10

EnvConfig is the typed v1 loader envelope. TOML [env.taskset] and [env.harness] sections populate EnvConfig.taskset and EnvConfig.harness. Environment-specific fields belong on the taskset or harness config that owns them; EnvConfig subclasses only bind concrete child config types. taskset must be typed as a TasksetConfig subclass, and harness must be typed as a HarnessConfig subclass. Annotation-only Config fields on Config subclasses default to their config class, so nested config objects do not need Field(default_factory=...).

Config subclasses accept a positional source config plus direct keyword overrides. The source object is positional-only so subclasses can define a real field named config.

ClientConfig

class ClientConfig(BaseModel):
    client_idx: int = 0
    client_type: ClientType = "openai_chat_completions"
    preserve_all_thinking: bool = False
    preserve_thinking_between_tool_calls: bool = False
    api_key_var: str = "PRIME_API_KEY"
    api_base_url: str = "https://api.pinference.ai/api/v1"
    endpoint_configs: list[EndpointClientConfig] = []
    timeout: float = 3600.0
    connect_timeout: float = 5.0
    max_connections: int = 28000
    max_keepalive_connections: int = 28000
    max_retries: int = 10
    extra_headers: dict[str, str] = {}
    extra_headers_from_state: dict[str, str] = {}

extra_headers_from_state maps HTTP header names to state field names. For each inference request, the header value is dynamically read from the rollout state dict. For example, {"X-Session-ID": "example_id"} adds a X-Session-ID header with the value of state["example_id"], enabling sticky routing at the inference router level.

client_type selects which Client implementation to instantiate (see Client Classes). Use endpoint_configs for multi-endpoint round-robin. In grouped scoring mode, groups are distributed round-robin across endpoint configs.

preserve_all_thinking and preserve_thinking_between_tool_calls are forwarded to the underlying renderer when client_type == "renderer". They control whether past-assistant reasoning_content is re-emitted on subsequent renders — preserve_all_thinking keeps every past-assistant turn's thinking, and preserve_thinking_between_tool_calls keeps thinking only inside the in-flight assistant→tool→…→assistant block after the most recent user turn (when that block contains at least one tool response). Both default to False (template default applies).

When api_key_var is "PRIME_API_KEY" (the default), credentials are loaded with the following precedence:

  • API key: PRIME_API_KEY env var > ~/.prime/config.json > "EMPTY"
  • Team ID: PRIME_TEAM_ID env var > ~/.prime/config.json > not set

This allows seamless use after running prime login.

EndpointClientConfig

class EndpointClientConfig(BaseModel):
    client_idx: int = 0
    api_key_var: str = "PRIME_API_KEY"
    api_base_url: str = "https://api.pinference.ai/api/v1"
    timeout: float = 3600.0
    max_connections: int = 28000
    max_keepalive_connections: int = 28000
    max_retries: int = 10
    extra_headers: dict[str, str] = {}

Leaf endpoint configuration used inside ClientConfig.endpoint_configs. Has the same fields as ClientConfig except endpoint_configs itself, preventing recursive nesting.

EvalConfig

class EvalConfig(BaseModel):
    env_id: str
    name: str | None = None
    env_args: dict
    env_dir_path: str
    endpoint_id: str | None = None
    model: str
    client_config: ClientConfig
    sampling_args: SamplingArgs
    num_examples: int
    rollouts_per_example: int
    max_concurrent: int
    independent_scoring: bool = False
    extra_env_kwargs: dict = {}
    max_retries: int = 0
    verbose: bool = False
    state_columns: list[str] | None = None
    save_results: bool = False
    resume_path: Path | None = None
    save_to_hf_hub: bool = False
    hf_hub_dataset_name: str | None = None

Endpoint

Endpoint = TypedDict(
    "Endpoint",
    {
        "key": str,
        "url": str,
        "model": str,
        "api_client_type": NotRequired[ClientType],
        "extra_headers": NotRequired[dict[str, str]],
    },
)
Endpoints = dict[str, list[Endpoint]]

Endpoints maps an endpoint id to one or more endpoint variants. A single variant is represented as a one-item list.


Prime CLI Plugin

Verifiers exposes a plugin contract consumed by prime for command execution.

PRIME_PLUGIN_API_VERSION

PRIME_PLUGIN_API_VERSION = 1

API version for compatibility checks between prime and verifiers.

PrimeCLIPlugin

@dataclass(frozen=True)
class PrimeCLIPlugin:
    api_version: int = PRIME_PLUGIN_API_VERSION
    eval_module: str = "verifiers.cli.commands.eval"
    gepa_module: str = "verifiers.cli.commands.gepa"
    install_module: str = "verifiers.cli.commands.install"
    init_module: str = "verifiers.cli.commands.init"
    setup_module: str = "verifiers.cli.commands.setup"
    build_module: str = "verifiers.cli.commands.build"

    def build_module_command(
        self, module_name: str, args: Sequence[str] | None = None
    ) -> list[str]:
        ...

build_module_command returns a subprocess command list for python -m <module> ....

get_plugin

def get_plugin() -> PrimeCLIPlugin:
    ...

Returns the plugin instance consumed by prime.


Decorators

@vf.stop

@vf.stop
async def my_condition(self, state: State) -> bool:
    """Return True to end the rollout."""
    ...

@vf.stop(priority=10)  # Higher priority runs first
async def early_check(self, state: State) -> bool:
    ...

Mark a method as a stop condition. All stop conditions are checked by is_completed().

@vf.cleanup

@vf.cleanup
async def my_cleanup(self, state: State) -> None:
    """Called after each rollout completes."""
    ...

@vf.cleanup(priority=10)
async def early_cleanup(self, state: State) -> None:
    ...

Mark a method as a rollout cleanup handler. Cleanup methods should be idempotent—safe to call multiple times—and handle errors gracefully to ensure cleanup completes even when resources are in unexpected states.

@vf.teardown

@vf.teardown
async def my_teardown(self) -> None:
    """Called when environment is destroyed."""
    ...

@vf.teardown(priority=10)
async def early_teardown(self) -> None:
    ...

Mark a method as an environment teardown handler.


Utility Functions

Data Utilities

vf.load_example_dataset(name: str) -> Dataset

Load a built-in example dataset.

vf.extract_boxed_answer(text: str, strict: bool = False) -> str

Extract answer from LaTeX \boxed{} format. When strict=True, returns "" if no \boxed{} is found (used by MathRubric to avoid scoring unformatted responses). When strict=False (default), returns the original text as a passthrough.

vf.extract_hash_answer(text: str) -> str | None

Extract answer after #### marker (GSM8K format).

Environment Utilities

vf.load_environment(env_id: str, **kwargs) -> Environment

Load an environment by ID (e.g., "primeintellect/gsm8k").

Configuration Utilities

vf.ensure_keys(keys: list[str]) -> None

Validate that required environment variables are set. Raises MissingKeyError (a ValueError subclass) with a clear message listing all missing keys and instructions for setting them.

class MissingKeyError(ValueError):
    keys: list[str]  # list of missing key names

Example:

def load_environment(api_key_var: str = "OPENAI_API_KEY") -> vf.Environment:
    vf.ensure_keys([api_key_var])
    # now safe to use os.environ[api_key_var]
    ...

Logging Utilities

vf.print_prompt_completions_sample(outputs: GenerateOutputs, n: int = 3)

Pretty-print sample rollouts.

vf.setup_logging(level: str = "INFO")

Configure verifiers logging. Set VF_LOG_LEVEL env var to change default.

vf.log_level(level: str | int)

Context manager to temporarily set the verifiers logger to a new log level. Useful for temporarily adjusting verbosity during specific operations.

with vf.log_level("DEBUG"):
    # verifiers logs at DEBUG level here
    ...
# reverts to previous level
vf.quiet_verifiers()

Context manager to temporarily silence verifiers logging by setting WARNING level. Shorthand for vf.log_level("WARNING").

with vf.quiet_verifiers():
    # verifiers logging is quieted here
    outputs = env.generate(...)
# logging restored