feat: add incremental checkpointing for nat eval (#1631)#1652
feat: add incremental checkpointing for nat eval (#1631)#1652Akshat8510 wants to merge 4 commits intoNVIDIA:developfrom
Conversation
Signed-off-by: Akshat Kumar <akshat230405@gmail.com>
Signed-off-by: Akshat Kumar <akshat230405@gmail.com>
WalkthroughAdds per-item incremental checkpointing to evaluation: new helper to write JSONL checkpoint entries; Changes
Sequence Diagram(s)mermaid Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py`:
- Around line 323-337: The remote runner currently waits for asyncio.gather in
EvaluationRemoteWorkflowHandler.run_workflow_remote and so never writes per-item
checkpoints; change the implementation to persist each item as it completes
(e.g., use asyncio.as_completed or attach per-task callbacks) so you call
dataset_handler.publish_eval_input and append the resulting item_dict to the
checkpoint_file ("workflow_output.jsonl") immediately when each task finishes;
ensure you reuse the existing logic that builds temp_input from
eval_input.eval_input_items and the step_filter from
eval_config.general.output.workflow_output_step_filter, open checkpoint_file
with append and flush on each write to provide incremental persistence and
recovery.
- Line 175: The public async methods run_workflow_local and run_workflow_remote
currently lack explicit return type hints; update their function signatures to
include the Python 3.11+ return annotation "-> None" (e.g., change "async def
run_workflow_local(self, session_manager: SessionManager, dataset_handler:
DatasetHandler):" to "async def run_workflow_local(self, session_manager:
SessionManager, dataset_handler: DatasetHandler) -> None:") and do the same for
run_workflow_remote so both public APIs have explicit return type hints; ensure
any related type stubs or usages remain compatible.
- Around line 276-297: The synchronous checkpoint write block (the code that
builds EvalInput, calls dataset_handler.publish_eval_input, json.loads and opens
checkpoint_file for append) must be moved off the event loop: create a
synchronous helper method named _append_checkpoint_line(self, checkpoint_file,
line) that opens the file, writes the line and flushes, then invoke that helper
via asyncio.to_thread(...) from the async context (e.g., inside run_one where
self.config.write_output is checked) and wrap the entire checkpoint sequence
(building temp_input with EvalInput, calling publish_eval_input, extracting
item_dict, and the to_thread call) in a try/except so any serialization or I/O
error is caught and logged but does not propagate to fail the task; keep
existing symbols dataset_handler.publish_eval_input, EvalInput, checkpoint_file,
and step_filter to locate the code to change.
packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py
Outdated
Show resolved
Hide resolved
packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py
Outdated
Show resolved
Hide resolved
packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Akshat Kumar <akshat230405@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py (2)
292-293: Remove redundant local import ofEvalInput.
EvalInputis already imported at module level (line 42). The local re-import here and at line 336 (inside a loop) is unnecessary clutter.Proposed fix
- from nat.data_models.evaluator import EvalInput temp_input = EvalInput(eval_input_items=[item])And similarly at line 336:
- from nat.data_models.evaluator import EvalInput temp_input = EvalInput(eval_input_items=[item])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py` around lines 292 - 293, The local re-imports of EvalInput inside evaluate.py are redundant; remove the inner "from nat.data_models.evaluator import EvalInput" occurrences (the ones adjacent to creating temp_input and the one at the loop around line 336) and just use the module-level EvalInput already imported at top, leaving the temp_input = EvalInput(eval_input_items=[item]) constructions intact; this eliminates duplicate imports and cleans up the function(s) that construct temp_input.
282-301: Extract duplicated checkpoint logic into a shared async helper.The checkpoint block here (lines 282–301) and in
run_workflow_remote(lines 327–343) share nearly identical logic: computestep_filter, wrap item inEvalInput, serialize viapublish_eval_input, parse JSON, and write via_write_checkpoint_item. Consider extracting an async method like_checkpoint_item(self, item, dataset_handler, checkpoint_file, step_filter)to DRY this up.Additionally,
output_dir.mkdir(parents=True, exist_ok=True)is called on every concurrent item in the local path (line 286). Move this to a one-time setup before theasyncio.gatherto avoid redundant syscalls under concurrency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py` around lines 282 - 301, Duplicate incremental checkpointing logic should be extracted into a shared async helper and the output directory creation moved out of per-item concurrent execution. Implement an async method (e.g., _checkpoint_item(self, item, dataset_handler, checkpoint_file, step_filter)) that encapsulates computing step_filter (use self.eval_config.general.output.workflow_output_step_filter), wrapping the item in nat.data_models.evaluator.EvalInput, calling dataset_handler.publish_eval_input, parsing the JSON and invoking self._write_checkpoint_item via asyncio.to_thread; replace the duplicated blocks in the current evaluate logic and in run_workflow_remote to call this helper. Also perform output_dir.mkdir(parents=True, exist_ok=True) once before launching asyncio.gather (not per item) so the filesystem creation isn't repeated concurrently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py`:
- Around line 322-343: The current remote run writes checkpoints only after
handler.run_workflow_remote(self.eval_input) completes; change
run_workflow_remote in evaluate.py to stream per-item checkpoint writes during
the remote execution by coordinating with EvaluationRemoteWorkflowHandler so
each finished item triggers publishing and writing immediately (use
asyncio.as_completed or per-task callbacks inside/around
handler.run_workflow_remote rather than waiting for the full gather); for each
completed task call dataset_handler.publish_eval_input(...) and await
asyncio.to_thread(self._write_checkpoint_item, checkpoint_file, item_dict) to
ensure incremental JSONL writes; update any associated
handler.run_workflow_remote signatures or add a streaming iterator/callback hook
so you can process individual results as they finish without blocking on the
entire gather.
- Around line 299-300: Replace the two exception logging calls that currently
use logger.warning(...) inside the incremental checkpoint write handlers with
logger.exception(...), so the full traceback is recorded; specifically update
the except blocks that catch Exception as e around the incremental checkpoint
write for item (the handler using "Failed to write incremental checkpoint for
item %s: %s", item.id, e) and the other similar except block further down
(around the code referenced at the second exception handler) to call
logger.exception with the same message format and arguments.
- Around line 149-155: The helper method _write_checkpoint_item was inserted
such that the subsequent block that computes llm_latencies and assigns to
self.usage_stats.usage_stats_items[item.id] is accidentally nested inside it;
move the entire _write_checkpoint_item method so it sits at class level (either
before or after _compute_usage_stats) and then dedent the lines that compute
llm_latencies and set self.usage_stats.usage_stats_items[item.id] (the block
that references steps and item.id) so that they are part of _compute_usage_stats
again, restoring proper scope for variables like steps and item.id.
---
Nitpick comments:
In `@packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py`:
- Around line 292-293: The local re-imports of EvalInput inside evaluate.py are
redundant; remove the inner "from nat.data_models.evaluator import EvalInput"
occurrences (the ones adjacent to creating temp_input and the one at the loop
around line 336) and just use the module-level EvalInput already imported at
top, leaving the temp_input = EvalInput(eval_input_items=[item]) constructions
intact; this eliminates duplicate imports and cleans up the function(s) that
construct temp_input.
- Around line 282-301: Duplicate incremental checkpointing logic should be
extracted into a shared async helper and the output directory creation moved out
of per-item concurrent execution. Implement an async method (e.g.,
_checkpoint_item(self, item, dataset_handler, checkpoint_file, step_filter))
that encapsulates computing step_filter (use
self.eval_config.general.output.workflow_output_step_filter), wrapping the item
in nat.data_models.evaluator.EvalInput, calling
dataset_handler.publish_eval_input, parsing the JSON and invoking
self._write_checkpoint_item via asyncio.to_thread; replace the duplicated blocks
in the current evaluate logic and in run_workflow_remote to call this helper.
Also perform output_dir.mkdir(parents=True, exist_ok=True) once before launching
asyncio.gather (not per item) so the filesystem creation isn't repeated
concurrently.
packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py
Outdated
Show resolved
Hide resolved
packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py
Outdated
Show resolved
Hide resolved
packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Akshat Kumar <akshat230405@gmail.com>
There was a problem hiding this comment.
♻️ Duplicate comments (1)
packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py (1)
322-343:⚠️ Potential issue | 🟠 MajorRemote checkpointing is still post-run, not incremental during execution.
At Line 325, the method waits for
handler.run_workflow_remote(...)to fully finish before any checkpoint write at Line 335+, so a hang/interruption during remote execution still yields no partial JSONL progress. Also, the outertry(Lines 329-343) means one item serialization/write failure aborts checkpointing for all remaining items in that loop.Suggested direction
async def run_workflow_remote(self, dataset_handler: DatasetHandler) -> None: from nat.plugins.eval.runtime.remote_workflow import EvaluationRemoteWorkflowHandler handler = EvaluationRemoteWorkflowHandler(self.config, self.eval_config.general.max_concurrency) - await handler.run_workflow_remote(self.eval_input) - - if self.config.write_output: - try: - output_dir = self.eval_config.general.output_dir - output_dir.mkdir(parents=True, exist_ok=True) - checkpoint_file = output_dir / "workflow_output.jsonl" - step_filter = self.eval_config.general.output.workflow_output_step_filter if self.eval_config.general.output else None - - for item in self.eval_input.eval_input_items: - from nat.data_models.evaluator import EvalInput - temp_input = EvalInput(eval_input_items=[item]) - item_dict = json.loads(dataset_handler.publish_eval_input(temp_input, step_filter))[0] - await asyncio.to_thread(self._write_checkpoint_item, checkpoint_file, item_dict) - except Exception: - logger.exception("Failed to write remote checkpoint items") + if not self.config.write_output: + await handler.run_workflow_remote(self.eval_input) + return + + output_dir = self.eval_config.general.output_dir + output_dir.mkdir(parents=True, exist_ok=True) + checkpoint_file = output_dir / "workflow_output.jsonl" + step_filter = self.eval_config.general.output.workflow_output_step_filter if self.eval_config.general.output else None + + async def on_item_complete(item: EvalInputItem) -> None: + try: + temp_input = EvalInput(eval_input_items=[item]) + item_dict = json.loads(dataset_handler.publish_eval_input(temp_input, step_filter))[0] + await asyncio.to_thread(self._write_checkpoint_item, checkpoint_file, item_dict) + except Exception: + logger.exception("Failed to write remote checkpoint for item %s", item.id) + + await handler.run_workflow_remote(self.eval_input, on_item_complete=on_item_complete)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py` around lines 322 - 343, The checkpointing currently runs only after EvaluationRemoteWorkflowHandler.run_workflow_remote completes, so move checkpoint writes to run concurrently and make them per-item-fault tolerant: start handler.run_workflow_remote as an asyncio.create_task (or modify EvaluationRemoteWorkflowHandler to provide an async iterator/callback for per-item progress), and in parallel iterate self.eval_input.eval_input_items to serialize and write each item to checkpoint via await asyncio.to_thread(self._write_checkpoint_item, checkpoint_file, item_dict) as they become available; ensure the per-item loop around dataset_handler.publish_eval_input and _write_checkpoint_item catches and logs exceptions per item (do not let one failure abort the rest) and keep use of eval_config.general.output.workflow_output_step_filter when calling publish_eval_input.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py`:
- Around line 322-343: The checkpointing currently runs only after
EvaluationRemoteWorkflowHandler.run_workflow_remote completes, so move
checkpoint writes to run concurrently and make them per-item-fault tolerant:
start handler.run_workflow_remote as an asyncio.create_task (or modify
EvaluationRemoteWorkflowHandler to provide an async iterator/callback for
per-item progress), and in parallel iterate self.eval_input.eval_input_items to
serialize and write each item to checkpoint via await
asyncio.to_thread(self._write_checkpoint_item, checkpoint_file, item_dict) as
they become available; ensure the per-item loop around
dataset_handler.publish_eval_input and _write_checkpoint_item catches and logs
exceptions per item (do not let one failure abort the rest) and keep use of
eval_config.general.output.workflow_output_step_filter when calling
publish_eval_input.
|
Hi @AnuradhaKaruppiah, I have addressed all the feedback provided by the CodeRabbit reviewer. |
|
The code has been verified via |
Description
Addresses #1631. Currently,
nat evalonly writes the finalworkflow_output.jsonafter the entire dataset has finished processing. If the evaluation hangs or is interrupted, all progress is lost.This PR introduces incremental checkpointing by appending results to a
workflow_output.jsonlfile in real-time as each dataset item completes. This ensures that partial results are always recoverable from disk.Changes:
run_workflow_localandrun_workflow_remoteinevaluate.pyto acceptdataset_handler.EvalInputItemtoworkflow_output.jsonl.f.flush()to ensure immediate disk persistence, preventing data loss during process interruptions.workflow_output.jsonis still generated at the end of successful runs.Closes #1631
By Submitting this PR I confirm:
Summary by CodeRabbit
New Features
Reliability