Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,11 @@ def _fan_out_with_async(self, generator: ColumnGeneratorWithModelRegistry, max_w
progress_tracker, executor_kwargs = self._setup_fan_out(generator, max_workers)
executor = AsyncConcurrentExecutor(max_workers=max_workers, **executor_kwargs)
work_items = [
(generator.agenerate(record), {"index": i}) for i, record in self.batch_manager.iter_current_batch()
(
generator.agenerate(record),
{"index": i, "column_name": generator.config.name},
)
for i, record in self.batch_manager.iter_current_batch()
]
executor.run(work_items)
self._finalize_fan_out(progress_tracker)
Expand All @@ -397,7 +401,11 @@ def _fan_out_with_threads(self, generator: ColumnGeneratorWithModelRegistry, max
progress_tracker, executor_kwargs = self._setup_fan_out(generator, max_workers)
with ConcurrentThreadExecutor(max_workers=max_workers, **executor_kwargs) as executor:
for i, record in self.batch_manager.iter_current_batch():
executor.submit(lambda record: generator.generate(record), record, context={"index": i})
executor.submit(
lambda record: generator.generate(record),
record,
context={"index": i, "column_name": generator.config.name},
)
self._finalize_fan_out(progress_tracker)

def _make_result_callback(self, progress_tracker: ProgressTracker) -> Callable[[dict], None]:
Expand Down Expand Up @@ -484,12 +492,61 @@ def _cleanup_dropped_record_images(self, dropped_indices: set[int]) -> None:
for path in [paths] if isinstance(paths, str) else paths:
media_storage.delete_image(path)

@staticmethod
def _extract_failure_detail(exc: Exception) -> str:
detail = getattr(exc, "detail", None)
if isinstance(detail, str):
normalized_detail = " ".join(detail.split()).strip()
if normalized_detail:
return normalized_detail
exc_str = str(exc).strip()
for line in exc_str.splitlines():
if "Cause:" in line:
return " ".join(line.split("Cause:", maxsplit=1)[1].split()).strip()
return " ".join(exc_str.split()).strip() or type(exc).__name__

@classmethod
def _classify_worker_failure(cls, exc: Exception) -> str:
failure_kind = getattr(exc, "failure_kind", None)
if isinstance(failure_kind, str) and failure_kind.strip():
return failure_kind.replace("_", " ")

detail = cls._extract_failure_detail(exc).lower()
exc_name = type(exc).__name__.lower()

if "timeout" in exc_name or "timed out" in detail:
return "timeout"
if "rate" in exc_name and "limit" in exc_name:
return "rate limit"
if "authentication" in exc_name:
return "authentication"
if "permission" in exc_name:
return "permission denied"
if "contextwindow" in exc_name or "context width" in detail:
return "context window"
if "response_schema" in detail or "schema" in detail:
return "schema validation"
Comment on lines +527 to +528
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overly broad "schema" in detail fallback may misclassify unrelated errors

The condition "schema" in detail runs for any exception that does not carry a structured failure_kind attribute (e.g., ModelStructuredOutputError, ModelBadRequestError, or future custom errors). If such an error's formatted cause string happens to contain the word "schema" — for instance, a bad-request error referencing an "API schema" or a structured-output error mentioning "output schema" — it will be labelled "schema validation" in the warning, which can mislead operators diagnosing the root cause.

The first sub-condition "response_schema" in detail is already highly specific. Replacing the broader fallback with an equally precise keyword (or removing it) would prevent false-positive classification:

if "response_schema" in detail or "model_validate" in detail or "doesn't match requested" in detail:
    return "schema validation"

Alternatively, demote the catch-all "schema" check to after the "validation" check so that less-specific matches are only reached if nothing else fits.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 527-528

Comment:
**Overly broad `"schema" in detail` fallback may misclassify unrelated errors**

The condition `"schema" in detail` runs for any exception that does not carry a structured `failure_kind` attribute (e.g., `ModelStructuredOutputError`, `ModelBadRequestError`, or future custom errors). If such an error's formatted cause string happens to contain the word "schema" — for instance, a bad-request error referencing an "API schema" or a structured-output error mentioning "output schema" — it will be labelled `"schema validation"` in the warning, which can mislead operators diagnosing the root cause.

The first sub-condition `"response_schema" in detail` is already highly specific. Replacing the broader fallback with an equally precise keyword (or removing it) would prevent false-positive classification:

```python
if "response_schema" in detail or "model_validate" in detail or "doesn't match requested" in detail:
    return "schema validation"
```

Alternatively, demote the catch-all `"schema"` check to after the `"validation"` check so that less-specific matches are only reached if nothing else fits.

How can I resolve this? If you propose a fix, please make it concise.

if "validation" in exc_name or "validation" in detail:
return "validation"
return "generation error"

@classmethod
def _format_worker_failure_warning(cls, exc: Exception, *, context: dict | None = None) -> str:
record_index = context["index"] if context is not None and "index" in context else "unknown"
column_name = context.get("column_name") if context is not None else None
context_label = f" in column {column_name!r}" if column_name else ""
failure_kind = cls._classify_worker_failure(exc)
failure_detail = cls._extract_failure_detail(exc)
return (
f"⚠️ Generation for record at index {record_index} failed{context_label} ({failure_kind}). "
f"Will omit this record from the dataset. Detail: {failure_detail}"
)

def _worker_error_callback(self, exc: Exception, *, context: dict | None = None) -> None:
"""If a worker fails, we can handle the exception here."""
logger.warning(
f"⚠️ Generation for record at index {context['index']} failed. "
f"Will omit this record from the dataset.\n{exc}"
)
logger.warning(self._format_worker_failure_warning(exc, context=context))
if context is None or "index" not in context:
raise RuntimeError("Worker error callback called without a valid context index.")
self._records_to_drop.add(context["index"])
Comment on lines 545 to 550
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RuntimeError silently swallowed by both executor paths

The RuntimeError raised here is intended to provide fail-loud behaviour when context is missing, but both executor implementations prevent it from ever reaching the main thread:

  • Thread path (ConcurrentThreadExecutor): _error_callback is called at line 188 of concurrency.py, outside any try/except inside _handle_future. _handle_future is registered as a done_callback via future.add_done_callback(_handle_future). Python's concurrent.futures catches and only logs (via LOGGER.exception) any exception raised from a done-callback — it does not propagate to the caller of _fan_out_with_threads.
  • Async path (AsyncConcurrentExecutor): The callback is already wrapped in an explicit try/except Exception at lines 215–219 of async_concurrency.py, which catches the RuntimeError and logs "error_callback raised an exception".

In both production paths the RuntimeError is silently swallowed. The test test_worker_error_callback_requires_context_index exercises the direct method call — not the integrated path — so this gap is not caught by the test suite.

Because the _records_to_drop.add(context["index"]) line is never reached when context is missing, the failed record is also not dropped from the dataset (the exact silent-data-corruption risk flagged in a previous review comment).

Consider moving the context validation to the outermost fan-out site (before the executor loop) where the exception can propagate freely, or guard the record-drop with a logged error and a bare return so the callback's side-effect is clearly skipped without the false-safe of an unreachable raise.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 545-550

Comment:
**`RuntimeError` silently swallowed by both executor paths**

The `RuntimeError` raised here is intended to provide fail-loud behaviour when context is missing, but both executor implementations prevent it from ever reaching the main thread:

- **Thread path** (`ConcurrentThreadExecutor`): `_error_callback` is called at line 188 of `concurrency.py`, outside any try/except inside `_handle_future`. `_handle_future` is registered as a `done_callback` via `future.add_done_callback(_handle_future)`. Python's `concurrent.futures` catches and only logs (via `LOGGER.exception`) any exception raised from a done-callback — it does **not** propagate to the caller of `_fan_out_with_threads`.
- **Async path** (`AsyncConcurrentExecutor`): The callback is already wrapped in an explicit `try/except Exception` at lines 215–219 of `async_concurrency.py`, which catches the `RuntimeError` and logs `"error_callback raised an exception"`.

In both production paths the `RuntimeError` is silently swallowed. The test `test_worker_error_callback_requires_context_index` exercises the direct method call — not the integrated path — so this gap is not caught by the test suite.

Because the `_records_to_drop.add(context["index"])` line is never reached when context is missing, the failed record is also not dropped from the dataset (the exact silent-data-corruption risk flagged in a previous review comment).

Consider moving the context validation to the outermost fan-out site (before the executor loop) where the exception can propagate freely, or guard the record-drop with a logged error and a bare `return` so the callback's side-effect is clearly skipped without the false-safe of an unreachable `raise`.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines 545 to 550
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

progress_tracker.record_failure() skipped when RuntimeError is raised

_make_error_callback calls progress_tracker.record_failure() only if _worker_error_callback returns normally. When context is None or missing "index", _worker_error_callback raises RuntimeError before returning, so record_failure() is never reached. This means the progress tracker will undercount failures for that execution slot.

In the thread executor path the RuntimeError is caught by Python's done-callback machinery and swallowed (as already noted). In the async path it is caught by the explicit try/except Exception in _run_task. In both cases record_failure() is never called, leaving the progress bar one count short.

A minimal fix is to call record_failure() before the guard raises:

def _worker_error_callback(self, exc: Exception, *, context: dict | None = None) -> None:
    """If a worker fails, we can handle the exception here."""
    logger.warning(self._format_worker_failure_warning(exc, context=context))
    if context is None or "index" not in context:
        raise RuntimeError("Worker error callback called without a valid context index.")
    self._records_to_drop.add(context["index"])

Alternatively, move progress_tracker.record_failure() into a finally block inside the wrapper in _make_error_callback so it is always called regardless of whether _worker_error_callback raises.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 545-550

Comment:
**`progress_tracker.record_failure()` skipped when `RuntimeError` is raised**

`_make_error_callback` calls `progress_tracker.record_failure()` only if `_worker_error_callback` returns normally. When `context` is `None` or missing `"index"`, `_worker_error_callback` raises `RuntimeError` before returning, so `record_failure()` is never reached. This means the progress tracker will undercount failures for that execution slot.

In the thread executor path the `RuntimeError` is caught by Python's done-callback machinery and swallowed (as already noted). In the async path it is caught by the explicit `try/except Exception` in `_run_task`. In both cases `record_failure()` is never called, leaving the progress bar one count short.

A minimal fix is to call `record_failure()` before the guard raises:

```python
def _worker_error_callback(self, exc: Exception, *, context: dict | None = None) -> None:
    """If a worker fails, we can handle the exception here."""
    logger.warning(self._format_worker_failure_warning(exc, context=context))
    if context is None or "index" not in context:
        raise RuntimeError("Worker error callback called without a valid context index.")
    self._records_to_drop.add(context["index"])
```

Alternatively, move `progress_tracker.record_failure()` into a `finally` block inside the wrapper in `_make_error_callback` so it is always called regardless of whether `_worker_error_callback` raises.

How can I resolve this? If you propose a fix, please make it concise.


def _worker_result_callback(self, result: dict | list[dict], *, context: dict | None = None) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
logger = logging.getLogger(__name__)


def _normalize_error_detail(detail: str | None) -> str | None:
if detail is None:
return None
normalized = " ".join(detail.split()).strip()
return normalized or None


def get_exception_primary_cause(exception: BaseException) -> BaseException:
"""Returns the primary cause of an exception by walking backwards.

Expand All @@ -38,7 +45,27 @@ def get_exception_primary_cause(exception: BaseException) -> BaseException:
return get_exception_primary_cause(exception.__cause__)


class GenerationValidationFailureError(Exception): ...
class GenerationValidationFailureError(Exception):
summary: str
detail: str | None
failure_kind: str

def __init__(
self,
summary: str,
*,
detail: str | None = None,
failure_kind: str = "validation_error",
) -> None:
self.summary = summary.strip()
self.detail = _normalize_error_detail(detail)
self.failure_kind = failure_kind

message = self.summary
if self.detail is not None:
message = f"{message} Validation detail: {self.detail}"

super().__init__(message)


class ModelRateLimitError(DataDesignerError): ...
Expand Down Expand Up @@ -80,7 +107,23 @@ class ModelAPIConnectionError(DataDesignerError): ...
class ModelStructuredOutputError(DataDesignerError): ...


class ModelGenerationValidationFailureError(DataDesignerError): ...
class ModelGenerationValidationFailureError(DataDesignerError):
detail: str | None
failure_kind: str | None

def __init__(
self,
message: object | None = None,
*,
detail: str | None = None,
failure_kind: str | None = None,
) -> None:
if message is None:
super().__init__()
else:
super().__init__(message)
self.detail = _normalize_error_detail(detail)
self.failure_kind = failure_kind


class ImageGenerationError(DataDesignerError): ...
Expand Down Expand Up @@ -214,11 +257,18 @@ def handle_llm_exceptions(

# Parsing and validation errors
case GenerationValidationFailureError():
detail_text = exception.detail.rstrip(".") if exception.detail is not None else None
validation_detail = f" Validation detail: {detail_text}." if detail_text is not None else ""
raise ModelGenerationValidationFailureError(
FormattedLLMErrorMessage(
cause=f"The provided output schema was unable to be parsed from model {model_name!r} responses while {purpose}.",
cause=(
f"The model output from {model_name!r} could not be parsed into the requested format "
f"while {purpose}.{validation_detail}"
),
solution="This is most likely temporary as we make additional attempts. If you continue to see more of this, simplify or modify the output schema for structured output and try again. If you are attempting token-intensive tasks like generations with high-reasoning effort, ensure that max_tokens in the model config is high enough to reach completion.",
)
),
detail=exception.detail,
failure_kind=exception.failure_kind,
) from None

case DataDesignerError():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,23 @@ def _identity(x: Any) -> Any:
logger = logging.getLogger(__name__)


def _classify_generation_failure_kind(exc: ParserException) -> str:
detail = " ".join(str(get_exception_primary_cause(exc)).split()).lower()
if "response_schema" in detail or "model_validate" in detail:
return "schema_validation"
if "validation error" in detail or "doesn't match requested" in detail:
return "schema_validation"
return "parse_error"


def _build_generation_validation_error(summary: str, exc: ParserException) -> GenerationValidationFailureError:
return GenerationValidationFailureError(
summary,
detail=str(get_exception_primary_cause(exc)),
failure_kind=_classify_generation_failure_kind(exc),
)


# Known keyword arguments extracted into request fields for each modality.
# Note: `extra_body` and `extra_headers` appear in every set but receive special
# treatment in `consolidate_kwargs` (merged with provider-level overrides) and in
Expand Down Expand Up @@ -326,8 +343,9 @@ def generate(
break
except ParserException as exc:
if max_correction_steps == 0 and max_conversation_restarts == 0:
raise GenerationValidationFailureError(
"Unsuccessful generation attempt. No retries were attempted."
raise _build_generation_validation_error(
"Unsuccessful generation attempt. No retries were attempted.",
exc,
) from exc

if curr_num_correction_steps <= max_correction_steps:
Expand All @@ -341,9 +359,12 @@ def generate(
tool_call_turns = checkpoint_tool_call_turns

else:
raise GenerationValidationFailureError(
f"Unsuccessful generation despite {max_correction_steps} correction steps "
f"and {max_conversation_restarts} conversation restarts."
raise _build_generation_validation_error(
(
f"Unsuccessful generation despite {max_correction_steps} correction steps "
f"and {max_conversation_restarts} conversation restarts."
),
exc,
) from exc

if not skip_usage_tracking and mcp_facade is not None:
Expand Down Expand Up @@ -424,8 +445,9 @@ async def agenerate(
break
except ParserException as exc:
if max_correction_steps == 0 and max_conversation_restarts == 0:
raise GenerationValidationFailureError(
"Unsuccessful generation attempt. No retries were attempted."
raise _build_generation_validation_error(
"Unsuccessful generation attempt. No retries were attempted.",
exc,
) from exc

if curr_num_correction_steps <= max_correction_steps:
Expand All @@ -438,9 +460,12 @@ async def agenerate(
tool_call_turns = checkpoint_tool_call_turns

else:
raise GenerationValidationFailureError(
f"Unsuccessful generation despite {max_correction_steps} correction steps "
f"and {max_conversation_restarts} conversation restarts."
raise _build_generation_validation_error(
(
f"Unsuccessful generation despite {max_correction_steps} correction steps "
f"and {max_conversation_restarts} conversation restarts."
),
exc,
) from exc

if not skip_usage_tracking and mcp_facade is not None:
Expand Down
Loading
Loading