Skip to content

Add structured telemetry with ClickHouse sink to flytekit task execution steps#34

Merged
ryanjwong merged 8 commits intomasterfrom
devin/1770858693-flytekit-step-telemetry
Feb 19, 2026
Merged

Add structured telemetry with ClickHouse sink to flytekit task execution steps#34
ryanjwong merged 8 commits intomasterfrom
devin/1770858693-flytekit-step-telemetry

Conversation

@ryanjwong
Copy link
Copy Markdown

@ryanjwong ryanjwong commented Feb 12, 2026

Why are the changes needed?

Flytekit's task execution pipeline has many distinct phases (task loading, input download, deserialization, user code execution, output conversion, upload, etc.) but only a few are instrumented with timing today. This makes it hard to diagnose where time is spent during task execution.

These changes emit structured telemetry at each step boundary. When FLYTE_TELEMETRY_CLICKHOUSE_URL is set, each event is immediately inserted as a row in ClickHouse via a background HTTP POST. Otherwise, structured JSON logs are emitted to stderr for Loki/LogQL ingestion as a fallback.

What changes were proposed in this pull request?

5 files changed:

  1. flytekit/loggers.py — New telemetry_logger (flytekit.telemetry), controlled by FLYTE_TELEMETRY_ENABLED env var (default: on). New ClickHouseTelemetrySink class that inserts each event as a single row via a fire-and-forget background thread HTTP POST (FORMAT JSONEachRow). No buffering, no batching — every step is immediately visible in ClickHouse. Zero new dependencies (stdlib urllib + threading only).

  2. flytekit/core/utils.py — Enhanced timeit context manager:

    • New _emit_telemetry() method emits structured data with event, step, wall_time_s, process_time_s, status, error_type, execution_id, task_name, project, domain.
    • Routes to ClickHouseTelemetrySink when enabled, falls back to telemetry_logger (stderr JSON) otherwise.
    • New **extras kwarg for passing additional metadata (e.g. input_size_bytes).
    • Entire _emit_telemetry body is wrapped in try/except Exception: pass so telemetry can never break execution or mask real exceptions from __exit__.
  3. flytekit/bin/entrypoint.py — Wrapped 7 previously uninstrumented steps in _dispatch_execute: load_task, download_inputs, deserialize_inputs, task_dispatch_execute, output_offloading, upload_outputs, output_deck.

  4. flytekit/core/base_task.py — Wrapped 3 previously uninstrumented steps in PythonTask.dispatch_execute: pre_execute, post_execute, write_decks.

  5. tests/flytekit/unit/core/test_utils.py — 17 new tests (30 total, all passing): 8 for stderr fallback path, 9 for ClickHouse sink (per-row POST, background thread firing, routing, error silencing, enable/disable).

ClickHouse env vars

Env var Default Purpose
FLYTE_TELEMETRY_ENABLED "1" Master toggle for all telemetry
FLYTE_TELEMETRY_CLICKHOUSE_URL (none) e.g. https://host:8443 — enables ClickHouse sink
FLYTE_TELEMETRY_CLICKHOUSE_USER "default" ClickHouse username
FLYTE_TELEMETRY_CLICKHOUSE_PASSWORD "" ClickHouse password
FLYTE_TELEMETRY_CLICKHOUSE_DATABASE "default" Target database
FLYTE_TELEMETRY_CLICKHOUSE_TABLE "flytekit_telemetry" Target table

Example telemetry event (ClickHouse row)

{"event": "flytekit_step", "step": "execute_user_code", "wall_time_s": 1.234, "process_time_s": 0.89, "status": "success", "task_name": "my_task", "execution_id": "f12abc", "project": "ml", "domain": "production", "timestamp": "2025-02-12 10:30:45.123"}

Fallback: LogQL queries (when ClickHouse not configured)

# p99 user code execution time by task
{app="flytekit"} | json | event="flytekit_step" | step="execute_user_code" | unwrap wall_time_s | quantile_over_time(0.99, [5m]) by (task_name)

# error rate per step
sum by (step) (rate({app="flytekit"} | json | event="flytekit_step" | status="error" [5m]))

How was this patch tested?

30 unit tests in tests/flytekit/unit/core/test_utils.py, all passing:

Test What it verifies
test_timeit_telemetry_success_fallback_to_logger Structured log emitted on success with correct fields
test_timeit_telemetry_error_fallback_to_logger status="error" and error_type set on exception
test_timeit_telemetry_extras Custom kv pairs via **extras pass through
test_timeit_telemetry_context_enrichment Context (execution_id, task_name, etc.) pulled from FlyteContext
test_telemetry_disabled FLYTE_TELEMETRY_ENABLED=0 silences telemetry
test_telemetry_enabled_by_default Telemetry enabled when env var unset
test_timeit_telemetry_json_format Output parses as valid JSON with expected fields
test_timeit_all_steps_in_task_execution Local task run emits pre_execute, Execute user level code, post_execute
test_clickhouse_sink_disabled_without_url Sink disabled when URL not set
test_clickhouse_sink_enabled_with_url Sink enabled when URL is set
test_clickhouse_sink_send_noop_when_disabled send() is no-op when disabled
test_clickhouse_sink_send_fires_background_thread send() spawns daemon thread targeting _post_row
test_clickhouse_sink_post_row_sends_json HTTP POST body is single-line JSON with correct URL
test_timeit_routes_to_clickhouse_sink timeit routes to sink when enabled
test_timeit_routes_to_clickhouse_on_error Error status/type captured on exception
test_clickhouse_sink_post_row_silences_errors Network errors don't propagate
test_default_clickhouse_sink_is_disabled Default sink is disabled (no URL configured)

⚠️ Human review checklist

  • Telemetry is ON by defaultFLYTE_TELEMETRY_ENABLED defaults to "1". This will add telemetry overhead for all existing deployments. Confirm this is acceptable or change default to off.
  • Silent exception swallowing_emit_telemetry and _post_row have bare except Exception: pass. Telemetry configuration issues and ClickHouse connectivity problems will be completely silent. No way to know if telemetry is actually reaching ClickHouse.
  • No ClickHouse table DDL provided — The table flytekit_telemetry must exist before telemetry works. Consider adding a migration script or documenting the required schema.
  • Thread-per-event overhead — Each step spawns a background thread + HTTP connection. For workflows with many tasks/steps, this could be significant. Consider if connection pooling or batching is needed at scale.

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Link to Devin run: https://app.devin.ai/sessions/c6fb4462bb7e4072b175e7fc664aad4b
Requested by: @ryanjwong

- Add telemetry_logger to loggers.py (always JSON, toggleable via FLYTE_TELEMETRY_ENABLED)
- Enhance timeit to emit structured JSON telemetry with step name, wall/process time, status, task context
- Wrap all uninstrumented steps in entrypoint._dispatch_execute: load_task, download_inputs, deserialize_inputs, task_dispatch_execute, output_offloading, upload_outputs, output_deck
- Wrap uninstrumented steps in PythonTask.dispatch_execute: pre_execute, post_execute, write_decks

Co-Authored-By: ryan@exa.ai <ryanjwong007@gmail.com>
@devin-ai-integration
Copy link
Copy Markdown

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 6 additional findings.

Open in Devin Review

- test_timeit_telemetry_success: verify structured log on success
- test_timeit_telemetry_error: verify status=error and error_type on exception
- test_timeit_telemetry_extras: verify custom kv pairs pass through
- test_timeit_telemetry_context_enrichment: verify context is pulled from FlyteContext
- test_telemetry_disabled: verify FLYTE_TELEMETRY_ENABLED=0 silences logger
- test_telemetry_enabled_by_default: verify logger is INFO when env unset
- test_timeit_telemetry_json_format: verify output parses as valid JSON
- test_timeit_all_steps_in_task_execution: verify pre_execute, execute, post_execute all emit

Co-Authored-By: ryan@exa.ai <ryanjwong007@gmail.com>
Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 10 additional findings in Devin Review.

Open in Devin Review

Comment thread flytekit/core/utils.py Outdated
devin-ai-integration Bot and others added 2 commits February 12, 2026 01:36
…l exceptions

Co-Authored-By: ryan@exa.ai <ryanjwong007@gmail.com>
Routes telemetry to ClickHouse via HTTP POST (FORMAT JSONEachRow) when
FLYTE_TELEMETRY_CLICKHOUSE_URL is set. Falls back to structured JSON
logs on stderr when ClickHouse is not configured.

- ClickHouseTelemetrySink: thread-safe buffer, background flush, atexit
- Zero new dependencies (stdlib urllib only)
- 10 new tests (31 total), all passing

Co-Authored-By: ryan@exa.ai <ryanjwong007@gmail.com>
@devin-ai-integration devin-ai-integration Bot changed the title Add structured telemetry logging to flytekit task execution steps Add structured telemetry with ClickHouse sink to flytekit task execution steps Feb 12, 2026
devin-ai-integration Bot and others added 2 commits February 12, 2026 22:44
Co-Authored-By: ryan@exa.ai <ryanjwong007@gmail.com>
…ow inserts

Remove buffering, locks, atexit, flush threshold. Each telemetry event
now fires a single background HTTP POST immediately — every step is
visible in ClickHouse as soon as it completes.

Co-Authored-By: ryan@exa.ai <ryanjwong007@gmail.com>
Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 17 additional findings in Devin Review.

Open in Devin Review

Comment thread flytekit/loggers.py
devin-ai-integration Bot and others added 2 commits February 12, 2026 22:56
Co-Authored-By: ryan@exa.ai <ryanjwong007@gmail.com>
…ted JSON under params)

Co-Authored-By: ryan@exa.ai <ryanjwong007@gmail.com>
Comment thread flytekit/loggers.py
Comment on lines +26 to +31
LOGGING_TELEMETRY_ENV_VAR = "FLYTE_TELEMETRY_ENABLED"
CLICKHOUSE_URL_ENV_VAR = "FLYTE_TELEMETRY_CLICKHOUSE_URL"
CLICKHOUSE_USER_ENV_VAR = "FLYTE_TELEMETRY_CLICKHOUSE_USER"
CLICKHOUSE_PASSWORD_ENV_VAR = "FLYTE_TELEMETRY_CLICKHOUSE_PASSWORD"
CLICKHOUSE_DATABASE_ENV_VAR = "FLYTE_TELEMETRY_CLICKHOUSE_DATABASE"
CLICKHOUSE_TABLE_ENV_VAR = "FLYTE_TELEMETRY_CLICKHOUSE_TABLE"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if these don't exist does it fail correctly by not emitting any metrics?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — if the env vars aren't set, it degrades gracefully:

  1. ClickHouseTelemetrySink.__init__ defaults self._url to "" (line 222)
  2. self._enabled = bool(self._url)False when URL is empty (line 227)
  3. send() returns immediately if not enabled (line 235-236)
  4. If FLYTE_TELEMETRY_ENABLED is explicitly set to "0"/"false", the sink is set to None entirely (line 271)

So no metrics emitted, no errors, no HTTP calls — completely silent when unconfigured.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants