Skip to content

Latest commit

 

History

History
296 lines (211 loc) · 12 KB

File metadata and controls

296 lines (211 loc) · 12 KB

DigestObserver — Integration Guide

A custom tf::ObserverInterface implementation that produces LLM-friendly profiling digests in O(1) memory. Replaces 300+ MB raw JSON traces with ~2KB structured diagnostics.

Current version: all key thresholds are measurement-driven (calibrated per-run from observed data). No hardcoded magic numbers in hot diagnostics — see Measurement-driven calibration below.


File

  • digest_observer.hpp — single, self-contained header (~670 lines, C++17)

Prerequisites

  • Taskflow v3.x (header-only, no build needed)
  • C++17 compiler (g++ 10+, clang 12+, MSVC 19.28+)

Option A: Standalone Usage (No Taskflow Modification)

Drop the header anywhere and include it after Taskflow's headers:

#include <taskflow/taskflow.hpp>
#include "digest_observer.hpp"   // <-- your copy

int main() {
  tf::Executor executor(8);
  tf::Taskflow taskflow;

  // ... build your graph ...

  auto obs = executor.make_observer<tf::DigestObserver>();
  executor.run(taskflow).wait();
  obs->digest(std::cerr);       // print to stderr
}

Fix the include path inside digest_observer.hpp near the top:

// Change:
#include "../core/observer.hpp"
// To:
#include <taskflow/core/observer.hpp>

That's it. No other files need to change. Compile as usual:

g++ -std=c++17 -O2 -pthread -I/path/to/taskflow my_program.cpp -o my_program

Option B: Integrate into Taskflow Source Tree

Place digest_observer.hpp alongside Taskflow's built-in algorithms:

taskflow/
  taskflow/
    algorithm/
      digest_observer.hpp   <-- place here
    core/
      observer.hpp
      executor.hpp

Step 1: Forward-declare in observer.hpp

In taskflow/core/observer.hpp, add a forward declaration near the top (after the namespace opens):

namespace tf {
class DigestObserver;  // forward declaration

Step 2: Include in executor.hpp

In taskflow/core/executor.hpp, add the include after existing algorithm headers:

#include "../algorithm/digest_observer.hpp"

Step 3 (Optional): Environment-variable activation

To enable digest profiling via TF_ENABLE_PROFILER=digest (like the built-in profiler), modify the TFProfManager setup in executor.hpp:

Find where TFProfManager checks the TF_ENABLE_PROFILER environment variable and add a branch:

const char* env = std::getenv("TF_ENABLE_PROFILER");
if (env) {
  std::string val(env);
  if (val == "digest") {
    auto obs = executor.make_observer<tf::DigestObserver>();
    _digest_observers.push_back(obs);
  } else {
    // existing TFProfObserver logic
  }
}

Then in TFProfManager's destructor, dump digest on exit:

for (auto& obs : _digest_observers) {
  obs->digest(std::cerr);
}

With this integration, users can profile without changing any code:

bash -c 'TF_ENABLE_PROFILER=digest ./my_program' 2>&1

Integrating with an existing application (e.g. OpenTimer pattern)

For applications with multiple phases where you only want to profile one phase (e.g. only the update_timing step in a static-timing analyzer, not the read_* parsing phases), use lazy attach + clear() between phases:

class MyApp {
  tf::Executor _executor;
  std::shared_ptr<tf::DigestObserver> _digest_obs;

  void run_one_phase() {
    // Lazy-attach observer once, on first call
    if (!_digest_obs) {
      _digest_obs = _executor.make_observer<tf::DigestObserver>();
    }
    // Reset accumulators so this digest captures ONLY this phase
    _digest_obs->clear();

    _executor.run(_taskflow).wait();

    _digest_obs->digest(std::cerr);
  }
};

This is the pattern used in OpenTimer's Timer::_update_timing() — see ot/timer/timer.cpp for a full reference implementation.


API Reference

Construction

auto obs = executor.make_observer<tf::DigestObserver>();

Creates the observer and registers it with the executor. set_up() is called automatically with the executor's worker count.

Profiling Output

obs->digest(std::cerr);          // print to an output stream
std::string s = obs->digest();   // capture as string

Reset

obs->clear();                     // reset all accumulators for a new run

Query

obs->num_tasks();                 // total tasks observed
obs->num_workers();               // number of workers

Digest Output Sections

Section What It Shows What to Look For
Task Duration Mean, max, 5-bucket histogram Most tasks in <1μs/1μs buckets = fine-grained workload
Efficiency Parallel efficiency % + scheduling overhead %. Sched overhead is calibrated per-run from sub-1μs gaps observed in the data. Output reads est. X μs @ 0.Y μs/task, calibrated from N sub-1μs gaps efficiency < 30% = structural problem; overhead > 20% AND mean < 2× sched_cost = task granularity issue
Parallelism Shape Avg active workers per log-time bucket. Bucket widths are clipped against actual run window so partial buckets aren't under-counted Should be consistent with parallel efficiency. If not, file a bug
Load Balance Per-worker task count and busy % spread > 30% = partitioner mismatch
Idle Breakdown Two signals per worker: macro-idle (wall - busy, captures workers with no work) AND micro-gap breakdown (% quick / cont / long inter-task gaps). The "long" threshold is adaptive = 100 × measured sched_cost, snapped to the nearest decade in {10μs, 100μs, 1ms} Macro-idle high = dep-starvation. Long micro-gaps high = preemption / cache-cold / NUMA, NOT necessarily dep-wait
Top Outliers 10 longest tasks with names, durations, workers Straggler identification; ratio shown vs trimmed mean (excluding the outliers themselves)
Diagnosis Auto-detected pattern + fix suggestion Actionable next step

Auto-Diagnosed Patterns

Pattern Trigger Fix
SCHEDULING_OVERHEAD task mean < 2 × calibrated sched_cost, overhead > 20% Coarsen loops, batch tasks
SERIAL_BOTTLENECK weighted serial fraction > 70%, OR > 50% with efficiency < 50% Restructure DAG, add level-parallel for_each
LOAD_IMBALANCE Worker busy% spread > 30% Use DynamicPartitioner. If a single straggler explains the busiest worker, the diagnosis adds a "root cause" line
STRAGGLER_TASKS Worst task > 50× trimmed mean (excludes top-K outliers from the denominator). Suppressed when LOAD_IMBALANCE already attributes it to a straggler Equalize sizes, add sequential cutoff
RECURSIVE_OVERLAP Efficiency > 100% Check base-case cutoff depth
UNDER_DECOMPOSITION Total tasks < 5 × workers, efficiency < 30% Split work finer
HEALTHY No flags triggered No task-level fix needed

Measurement-driven calibration

The observer makes no assumptions about scheduler behavior. The following are calibrated per-run from observed data, not hardcoded:

Metric How it's measured Where it's used
sched_cost_us Mean of all sub-1μs inter-task gaps observed. (Sub-1μs gaps are by definition healthy work-steals, so their mean ≈ scheduler dispatch cost.) Falls back to 0.5μs if no quick gaps observed. SCHEDULING_OVERHEAD trigger; long-gap threshold; reported in EFFICIENCY section header
parallelism shape bucket widths min(bucket_end, wall_end) - max(bucket_start, wall_beg) — actual time the run spent inside each log-time bucket, not theoretical bucket width Parallelism shape display; weighted parallelism in SERIAL_BOTTLENECK
long-gap threshold 100 × sched_cost_us, snapped to nearest decade ∈ {10μs, 100μs, 1ms} Idle Breakdown long-gap classification. Header reads 'long-gap' = >X μs (≈ 100× sched_cost, calibrated Y μs/task)
straggler trimmed mean (total_busy − sum(top-K outlier durations)) / (total_tasks − K) — mean of the non-outlier tasks, so a single 890ms task in a 795K-task pool doesn't self-dilute the straggler ratio STRAGGLER_TASKS ratio
weighted parallelism Busy-time-weighted average of avg-workers-per-bucket. Continuous, no arbitrary < 2.0 cliff SERIAL_BOTTLENECK weighted serial fraction
LOAD_IMBALANCE root cause Detected by checking if the worst outlier task lives on the busiest worker AND dominates that worker's busy time. Suppresses double-counted STRAGGLER_TASKS diagnosis when this is true LOAD_IMBALANCE "root cause" annotation

What's not calibrated (intentionally — these are UX/policy choices, not physics):

  • Histogram bucket boundaries (<1μs, 1μs, 2-10μs, 11-100μs, >100μs)
  • Log-time bucket boundaries (1ms, 10ms, 100ms, 1s, 10s, 100s)
  • K_OUTLIERS = 10 for the top-K reservoir
  • Diagnosis percentage thresholds (30% imbalance, 50% efficiency, 70% serial fraction, 50× straggler ratio)

Example: Iterative Optimization Loop

#include <taskflow/taskflow.hpp>
#include "digest_observer.hpp"

int main() {
  tf::Executor executor;
  auto obs = executor.make_observer<tf::DigestObserver>();

  tf::Taskflow taskflow;
  // ... build graph ...

  // Iteration 1: profile
  executor.run(taskflow).wait();
  obs->digest(std::cerr);
  // Read diagnosis, apply fix

  // Iteration 2: re-profile after fix
  obs->clear();
  // ... rebuild graph with fix applied ...
  executor.run(taskflow).wait();
  obs->digest(std::cerr);
  // Compare. Stop when HEALTHY or no improvement for 2 iterations.
}

Memory and Performance

Metric DigestObserver TFProfObserver (built-in)
Memory per task O(1) — fixed ~2.5 KB total O(n) — ~80 bytes/task
Memory at 5.6M tasks ~2.5 KB ~450 MB
Output size ~2 KB text 321 MB JSON
Runtime overhead ~50 ns/task (streaming accumulators + 5-bucket binning) ~80 ns/task (span storage)
Post-processing None (direct to stderr) Requires external parser

The 2.5 KB per-run memory: 8 accumulators (one per worker, each ~80 bytes for histograms + 7×8 byte time-bucket array) + a 10-element outlier heap + scalar wall/origin state. Scales linearly with worker count, NOT with task count.


What Profiling Cannot See

  • Dependency edges — the observer sees task entry/exit, not DAG structure
  • Cache/memory behavior — use perf stat -e cache-misses for this
  • NUMA effects — cross-socket latency is invisible to task-level profiling. If you suspect NUMA, pin the executor to one node first (taskset -c 0-19,40-59 or numactl --cpunodebind=0 --membind=0) and re-profile.

When the digest says HEALTHY but performance is still poor, the bottleneck is at the memory/cache/scheduler-pinning level. Use hardware counters (perf, vtune) and OS placement controls (taskset, numactl) to investigate.


Design Notes

  • Per-worker accumulators — each worker writes only its own DigestAccumulator, no contention
  • Outlier reservoir — mutex-protected min-heap of top-K, but only locks when a task exceeds 3× the running mean (rarely contended in practice)
  • Duration histogram — 5 fixed buckets (<1μs, 1μs, 2-10μs, 11-100μs, >100μs) chosen to align with typical Taskflow scheduling cost (~500 ns)
  • Parallelism shape — 7 log-time buckets (1ms to 100s+) showing average active workers per time window. Bucket widths clipped against actual run window so a 0.5s run doesn't show as half-empty in the 100ms-1s bucket.
  • Gap attribution — 5-bucket inter-task gap histogram (<1μs, 1-10μs, 10-100μs, 100μs-1ms, >1ms) collected at fixed boundaries, then re-aggregated at digest time into quick / cont / long based on the calibrated long-gap threshold. Lets the same accumulator data express different threshold choices for different schedulers.
  • All key trigger thresholds use measured values — see Measurement-driven calibration. This is a deliberate design choice to make the same diagnoses portable across schedulers of different per-task dispatch cost.