From 91c69ddd8bc2e5392815c37cdc0a01b61ee8512e Mon Sep 17 00:00:00 2001 From: Akshat Kumar Date: Wed, 25 Feb 2026 20:18:41 +0530 Subject: [PATCH 1/4] feat: implement incremental checkpointing for nat eval (#1631) Signed-off-by: Akshat Kumar --- .../src/nat/plugins/eval/runtime/evaluate.py | 52 ++++++++++++++++--- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py index 42db91d993..2144cb9f3d 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py @@ -172,7 +172,7 @@ def _compute_usage_stats(self, item: EvalInputItem): llm_latency=llm_latency) return self.usage_stats.usage_stats_items[item.id] - async def run_workflow_local(self, session_manager: SessionManager): + async def run_workflow_local(self, session_manager: SessionManager, dataset_handler: DatasetHandler): ''' Launch the workflow with the specified questions and extract the output using the jsonpath ''' @@ -272,6 +272,30 @@ async def cancel_pending_tasks(): self.weave_eval.log_prediction(item, output) await self.weave_eval.log_usage_stats(item, usage_stats_item) + + # --- START INCREMENTAL CHECKPOINTING --- + if self.config.write_output: + output_dir = self.eval_config.general.output_dir + output_dir.mkdir(parents=True, exist_ok=True) + checkpoint_file = output_dir / "workflow_output.jsonl" + + # Use the same filtering logic as the final output + step_filter = self.eval_config.general.output.workflow_output_step_filter \ + if self.eval_config.general.output else None + + # Format just this one item + from nat.data_models.evaluator import EvalInput + temp_input = EvalInput(eval_input_items=[item]) + item_json_list = dataset_handler.publish_eval_input(temp_input, step_filter) + + # publish_eval_input returns a JSON string of a list. + # We extract the first object to write as a single line in JSONL. + item_dict = json.loads(item_json_list)[0] + + with open(checkpoint_file, "a", encoding="utf-8") as f: + f.write(json.dumps(item_dict) + "\n") + f.flush() # Ensure it's written to disk immediately + # --- END INCREMENTAL CHECKPOINTING --- finally: if root_span_token is not None: ctx_state._root_span_id.reset(root_span_token) @@ -292,14 +316,26 @@ async def wrapped_run(item: EvalInputItem) -> None: await asyncio.gather(*[wrapped_run(item) for item in eval_input_items]) pbar.close() - async def run_workflow_remote(self): + async def run_workflow_remote(self, dataset_handler: DatasetHandler): from nat.plugins.eval.runtime.remote_workflow import EvaluationRemoteWorkflowHandler handler = EvaluationRemoteWorkflowHandler(self.config, self.eval_config.general.max_concurrency) await handler.run_workflow_remote(self.eval_input) - for item in self.eval_input.eval_input_items: - usage_stats_item = self._compute_usage_stats(item) - self.weave_eval.log_prediction(item, item.output_obj) - await self.weave_eval.log_usage_stats(item, usage_stats_item) + + # Add the checkpointing here too for remote runs + if self.config.write_output: + output_dir = self.eval_config.general.output_dir + output_dir.mkdir(parents=True, exist_ok=True) + checkpoint_file = output_dir / "workflow_output.jsonl" + step_filter = self.eval_config.general.output.workflow_output_step_filter if self.eval_config.general.output else None + + # Since remote returns all at once, we write them all to the JSONL + for item in self.eval_input.eval_input_items: + from nat.data_models.evaluator import EvalInput + temp_input = EvalInput(eval_input_items=[item]) + item_dict = json.loads(dataset_handler.publish_eval_input(temp_input, step_filter))[0] + with open(checkpoint_file, "a", encoding="utf-8") as f: + f.write(json.dumps(item_dict) + "\n") + f.flush() async def profile_workflow(self) -> ProfilerResults: """ @@ -720,7 +756,7 @@ async def run_and_evaluate(self, local_session_manager: SessionManager | None = None try: if self.config.endpoint: - await self.run_workflow_remote() + await self.run_workflow_remote(dataset_handler) elif not self.config.skip_workflow: if session_manager is None: session_manager = await SessionManager.create( @@ -728,7 +764,7 @@ async def run_and_evaluate(self, shared_builder=eval_workflow, max_concurrency=self.eval_config.general.max_concurrency) local_session_manager = session_manager - await self.run_workflow_local(session_manager) + await self.run_workflow_local(session_manager, dataset_handler) # Pre-evaluation process the workflow output self.eval_input = dataset_handler.pre_eval_process_eval_input(self.eval_input) From b5a82701b5ca5ca57e62d5e3bac548c452daa8fd Mon Sep 17 00:00:00 2001 From: Akshat Kumar Date: Wed, 25 Feb 2026 20:22:08 +0530 Subject: [PATCH 2/4] feat: implement incremental checkpointing for nat eval Signed-off-by: Akshat Kumar --- .../nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py index 2144cb9f3d..74c93d202e 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py @@ -273,7 +273,7 @@ async def cancel_pending_tasks(): self.weave_eval.log_prediction(item, output) await self.weave_eval.log_usage_stats(item, usage_stats_item) - # --- START INCREMENTAL CHECKPOINTING --- + # START INCREMENTAL CHECKPOINTING if self.config.write_output: output_dir = self.eval_config.general.output_dir output_dir.mkdir(parents=True, exist_ok=True) @@ -295,7 +295,6 @@ async def cancel_pending_tasks(): with open(checkpoint_file, "a", encoding="utf-8") as f: f.write(json.dumps(item_dict) + "\n") f.flush() # Ensure it's written to disk immediately - # --- END INCREMENTAL CHECKPOINTING --- finally: if root_span_token is not None: ctx_state._root_span_id.reset(root_span_token) From bb2de021f86c91f485f7268edd9100affc229be1 Mon Sep 17 00:00:00 2001 From: Akshat Kumar Date: Wed, 25 Feb 2026 21:04:00 +0530 Subject: [PATCH 3/4] chore: use asyncio.to_thread and add type hints per review Signed-off-by: Akshat Kumar --- .../src/nat/plugins/eval/runtime/evaluate.py | 76 ++++++++++--------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py index 74c93d202e..f10cbce491 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py @@ -146,6 +146,12 @@ def _compute_usage_stats(self, item: EvalInputItem): max_timestamp = 0.0 runtime = 0.0 + def _write_checkpoint_item(self, checkpoint_file: Path, item_dict: dict[str, Any]) -> None: + """Helper to write a single JSONL line to disk. Called via to_thread to avoid blocking.""" + with open(checkpoint_file, "a", encoding="utf-8") as f: + f.write(json.dumps(item_dict) + "\n") + f.flush() + # find llm latency by calculating p95 of all llm calls llm_latencies = [] previous_llm_start_time = None @@ -172,7 +178,7 @@ def _compute_usage_stats(self, item: EvalInputItem): llm_latency=llm_latency) return self.usage_stats.usage_stats_items[item.id] - async def run_workflow_local(self, session_manager: SessionManager, dataset_handler: DatasetHandler): + async def run_workflow_local(self, session_manager: SessionManager, dataset_handler: DatasetHandler) -> None: ''' Launch the workflow with the specified questions and extract the output using the jsonpath ''' @@ -275,26 +281,24 @@ async def cancel_pending_tasks(): # START INCREMENTAL CHECKPOINTING if self.config.write_output: - output_dir = self.eval_config.general.output_dir - output_dir.mkdir(parents=True, exist_ok=True) - checkpoint_file = output_dir / "workflow_output.jsonl" - - # Use the same filtering logic as the final output - step_filter = self.eval_config.general.output.workflow_output_step_filter \ - if self.eval_config.general.output else None - - # Format just this one item - from nat.data_models.evaluator import EvalInput - temp_input = EvalInput(eval_input_items=[item]) - item_json_list = dataset_handler.publish_eval_input(temp_input, step_filter) - - # publish_eval_input returns a JSON string of a list. - # We extract the first object to write as a single line in JSONL. - item_dict = json.loads(item_json_list)[0] - - with open(checkpoint_file, "a", encoding="utf-8") as f: - f.write(json.dumps(item_dict) + "\n") - f.flush() # Ensure it's written to disk immediately + try: + output_dir = self.eval_config.general.output_dir + output_dir.mkdir(parents=True, exist_ok=True) + checkpoint_file = output_dir / "workflow_output.jsonl" + + step_filter = self.eval_config.general.output.workflow_output_step_filter \ + if self.eval_config.general.output else None + + from nat.data_models.evaluator import EvalInput + temp_input = EvalInput(eval_input_items=[item]) + item_json_list = dataset_handler.publish_eval_input(temp_input, step_filter) + item_dict = json.loads(item_json_list)[0] + + # Use to_thread to prevent blocking the event loop + await asyncio.to_thread(self._write_checkpoint_item, checkpoint_file, item_dict) + except Exception as e: + logger.warning("Failed to write incremental checkpoint for item %s: %s", item.id, e) + # --- END INCREMENTAL CHECKPOINTING --- finally: if root_span_token is not None: ctx_state._root_span_id.reset(root_span_token) @@ -315,26 +319,28 @@ async def wrapped_run(item: EvalInputItem) -> None: await asyncio.gather(*[wrapped_run(item) for item in eval_input_items]) pbar.close() - async def run_workflow_remote(self, dataset_handler: DatasetHandler): + async def run_workflow_remote(self, dataset_handler: DatasetHandler) -> None: from nat.plugins.eval.runtime.remote_workflow import EvaluationRemoteWorkflowHandler handler = EvaluationRemoteWorkflowHandler(self.config, self.eval_config.general.max_concurrency) await handler.run_workflow_remote(self.eval_input) # Add the checkpointing here too for remote runs if self.config.write_output: - output_dir = self.eval_config.general.output_dir - output_dir.mkdir(parents=True, exist_ok=True) - checkpoint_file = output_dir / "workflow_output.jsonl" - step_filter = self.eval_config.general.output.workflow_output_step_filter if self.eval_config.general.output else None - - # Since remote returns all at once, we write them all to the JSONL - for item in self.eval_input.eval_input_items: - from nat.data_models.evaluator import EvalInput - temp_input = EvalInput(eval_input_items=[item]) - item_dict = json.loads(dataset_handler.publish_eval_input(temp_input, step_filter))[0] - with open(checkpoint_file, "a", encoding="utf-8") as f: - f.write(json.dumps(item_dict) + "\n") - f.flush() + try: + output_dir = self.eval_config.general.output_dir + output_dir.mkdir(parents=True, exist_ok=True) + checkpoint_file = output_dir / "workflow_output.jsonl" + step_filter = self.eval_config.general.output.workflow_output_step_filter if self.eval_config.general.output else None + + for item in self.eval_input.eval_input_items: + from nat.data_models.evaluator import EvalInput + temp_input = EvalInput(eval_input_items=[item]) + item_dict = json.loads(dataset_handler.publish_eval_input(temp_input, step_filter))[0] + + # Use to_thread here as well + await asyncio.to_thread(self._write_checkpoint_item, checkpoint_file, item_dict) + except Exception as e: + logger.warning("Failed to write remote checkpoint items: %s", e) async def profile_workflow(self) -> ProfilerResults: """ From 693a7709f304c922dd408ef49fc17d9566229f2a Mon Sep 17 00:00:00 2001 From: Akshat Kumar Date: Wed, 25 Feb 2026 21:17:21 +0530 Subject: [PATCH 4/4] update logging to exception and fix method indentation Signed-off-by: Akshat Kumar --- .../src/nat/plugins/eval/runtime/evaluate.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py index f10cbce491..f8f5e7473b 100644 --- a/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py +++ b/packages/nvidia_nat_eval/src/nat/plugins/eval/runtime/evaluate.py @@ -117,6 +117,12 @@ def __init__(self, config: EvaluationRunConfig, callback_manager: "EvalCallbackM # Pre-generated OTEL root span_ids for eager trace linking (item_id -> span_id) self._item_span_ids: dict[str, int] = {} + def _write_checkpoint_item(self, checkpoint_file: Path, item_dict: dict[str, Any]) -> None: + """Helper to write a single JSONL line to disk. Called via to_thread to avoid blocking.""" + with open(checkpoint_file, "a", encoding="utf-8") as f: + f.write(json.dumps(item_dict) + "\n") + f.flush() + def _compute_usage_stats(self, item: EvalInputItem): """Compute usage stats for a single item using the intermediate steps""" # get the prompt and completion tokens from the intermediate steps @@ -146,12 +152,6 @@ def _compute_usage_stats(self, item: EvalInputItem): max_timestamp = 0.0 runtime = 0.0 - def _write_checkpoint_item(self, checkpoint_file: Path, item_dict: dict[str, Any]) -> None: - """Helper to write a single JSONL line to disk. Called via to_thread to avoid blocking.""" - with open(checkpoint_file, "a", encoding="utf-8") as f: - f.write(json.dumps(item_dict) + "\n") - f.flush() - # find llm latency by calculating p95 of all llm calls llm_latencies = [] previous_llm_start_time = None @@ -296,9 +296,9 @@ async def cancel_pending_tasks(): # Use to_thread to prevent blocking the event loop await asyncio.to_thread(self._write_checkpoint_item, checkpoint_file, item_dict) - except Exception as e: - logger.warning("Failed to write incremental checkpoint for item %s: %s", item.id, e) - # --- END INCREMENTAL CHECKPOINTING --- + except Exception: + logger.exception("Failed to write incremental checkpoint for item %s", item.id) + # END INCREMENTAL CHECKPOINTING finally: if root_span_token is not None: ctx_state._root_span_id.reset(root_span_token) @@ -339,8 +339,8 @@ async def run_workflow_remote(self, dataset_handler: DatasetHandler) -> None: # Use to_thread here as well await asyncio.to_thread(self._write_checkpoint_item, checkpoint_file, item_dict) - except Exception as e: - logger.warning("Failed to write remote checkpoint items: %s", e) + except Exception: + logger.exception("Failed to write remote checkpoint items") async def profile_workflow(self) -> ProfilerResults: """