From 71471701a83e77451f471b9328e8d73dd379ba88 Mon Sep 17 00:00:00 2001 From: Frank Molinaro <56936412+frank005@users.noreply.github.com> Date: Fri, 24 Apr 2026 11:44:34 -0700 Subject: [PATCH] feat: add cekura_metrics_python extension for observability Adds an optional observability extension that forwards transcripts, per-module latency, and session lifecycle events to Cekura (https://cekura.ai) with zero changes to core framework files. Extension: ai_agents/agents/ten_packages/extension/cekura_metrics_python/ - Subscribes to graph-routed signals emitted by ten_ai_base: asr_result (user transcript), text_data (assistant transcript), and metrics (STT/TTS/LLM latency). - Consumes on_user_joined / on_user_left from agora_rtc for session lifecycle, auto-flushing snapshots on an interval while open. - Stays idle with a warning if CEKURA_API_KEY is unset so demos still run unmodified. - Includes unit tests, a detailed README with routing notes, and requirements.txt (aiohttp). voice-assistant example wiring: - Adds the cekura_metrics node to tenapp/property.json with ${env:CEKURA_API_KEY|} / ${env:CEKURA_ASSISTANT_ID|} placeholders; no hardcoded credentials. - Registers the extension path in manifest.json / manifest-lock.json. - Documents the env vars and opt-in behavior in the example README. Notes for reviewers: - No changes to main_python, message_collector2, or any framework code. Tool-call and LLM reasoning streams (routed via explicit set_dests/return_result in main_python) remain out of scope for this PR and can be added later behind an opt-in hook if needed. Made-with: Cursor --- .../agents/examples/voice-assistant/README.md | 12 + .../voice-assistant/tenapp/manifest-lock.json | 13 + .../voice-assistant/tenapp/manifest.json | 3 + .../voice-assistant/tenapp/property.json | 66 ++ .../extension/cekura_metrics_python/README.md | 425 ++++++++++++ .../cekura_metrics_python/__init__.py | 2 + .../extension/cekura_metrics_python/addon.py | 14 + .../extension/cekura_metrics_python/client.py | 59 ++ .../extension/cekura_metrics_python/config.py | 40 ++ .../cekura_metrics_python/extension.py | 613 ++++++++++++++++++ .../cekura_metrics_python/helpers.py | 174 +++++ .../cekura_metrics_python/manifest.json | 249 +++++++ .../cekura_metrics_python/property.json | 12 + .../cekura_metrics_python/requirements.txt | 1 + .../cekura_metrics_python/session.py | 169 +++++ .../cekura_metrics_python/tests/__init__.py | 1 + .../tests/test_config.py | 81 +++ .../tests/test_session.py | 163 +++++ 18 files changed, 2097 insertions(+) create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/README.md create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/__init__.py create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/addon.py create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/client.py create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/config.py create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/extension.py create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/helpers.py create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/manifest.json create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/property.json create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/requirements.txt create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/session.py create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/__init__.py create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/test_config.py create mode 100644 ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/test_session.py diff --git a/ai_agents/agents/examples/voice-assistant/README.md b/ai_agents/agents/examples/voice-assistant/README.md index 906643ee82..ac786f124c 100644 --- a/ai_agents/agents/examples/voice-assistant/README.md +++ b/ai_agents/agents/examples/voice-assistant/README.md @@ -29,6 +29,18 @@ A comprehensive voice assistant with real-time conversation capabilities using A - `OPENAI_PROXY_URL` - Proxy URL for OpenAI API (optional) - `WEATHERAPI_API_KEY` - Weather API key for weather tool (optional) +### Optional: Cekura observability + +The default `voice_assistant` graph includes the **`cekura_metrics`** extension. If `CEKURA_API_KEY` is not set, it stays disabled and the demo runs unchanged. + +When enabled, set: + +- `CEKURA_API_KEY` — required to POST observability payloads. +- **`CEKURA_ASSISTANT_ID`** *or* set numeric `agent_id` in `tenapp/property.json` on the `cekura_metrics` node (the example uses `assistant_id`: `${env:CEKURA_ASSISTANT_ID|}` with `agent_id` 0). +- `CEKURA_METRIC_IDS` — optional comma-separated Cekura metric ids. + +See `ten_packages/extension/cekura_metrics_python/README.md` for graph details and notes on what is and isn't captured when `main_python` is left unmodified. + ## Setup ### 1. Set Environment Variables diff --git a/ai_agents/agents/examples/voice-assistant/tenapp/manifest-lock.json b/ai_agents/agents/examples/voice-assistant/tenapp/manifest-lock.json index 7c41d3f623..10ec1ed3f8 100644 --- a/ai_agents/agents/examples/voice-assistant/tenapp/manifest-lock.json +++ b/ai_agents/agents/examples/voice-assistant/tenapp/manifest-lock.json @@ -751,6 +751,19 @@ ], "path": "../../../ten_packages/extension/message_collector2" }, + { + "type": "extension", + "name": "cekura_metrics_python", + "version": "0.1.0", + "hash": "1001e0dac91508bd721de22e02a143c7d2a89767846ab09b25fb18b85cbcc8b6", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python" + } + ], + "path": "../../../ten_packages/extension/cekura_metrics_python" + }, { "type": "extension", "name": "weatherapi_tool_python", diff --git a/ai_agents/agents/examples/voice-assistant/tenapp/manifest.json b/ai_agents/agents/examples/voice-assistant/tenapp/manifest.json index 97f3d5c3a2..e1f61b9d67 100644 --- a/ai_agents/agents/examples/voice-assistant/tenapp/manifest.json +++ b/ai_agents/agents/examples/voice-assistant/tenapp/manifest.json @@ -144,6 +144,9 @@ { "path": "../../../ten_packages/extension/message_collector2" }, + { + "path": "../../../ten_packages/extension/cekura_metrics_python" + }, { "path": "../../../ten_packages/extension/weatherapi_tool_python" }, diff --git a/ai_agents/agents/examples/voice-assistant/tenapp/property.json b/ai_agents/agents/examples/voice-assistant/tenapp/property.json index dcd0b8e214..18ceb5fd4d 100644 --- a/ai_agents/agents/examples/voice-assistant/tenapp/property.json +++ b/ai_agents/agents/examples/voice-assistant/tenapp/property.json @@ -85,6 +85,24 @@ "extension_group": "transcriber", "property": {} }, + { + "type": "extension", + "name": "cekura_metrics", + "addon": "cekura_metrics_python", + "extension_group": "default", + "property": { + "api_key": "${env:CEKURA_API_KEY|}", + "agent_id": 0, + "assistant_id": "${env:CEKURA_ASSISTANT_ID|}", + "base_url": "https://api.cekura.ai", + "auto_flush": true, + "auto_flush_interval_ms": 5000, + "metric_ids": "${env:CEKURA_METRIC_IDS|}", + "collect_latency": true, + "collect_transcripts": true, + "collect_tool_calls": true + } + }, { "type": "extension", "name": "weatherapi_tool_python", @@ -181,6 +199,54 @@ ] } ] + }, + { + "extension": "cekura_metrics", + "cmd": [ + { + "names": [ + "on_user_joined", + "on_user_left" + ], + "source": [ + { + "extension": "agora_rtc" + } + ] + } + ], + "data": [ + { + "name": "asr_result", + "source": [ + { + "extension": "stt" + } + ] + }, + { + "name": "text_data", + "source": [ + { + "extension": "tts" + } + ] + }, + { + "name": "metrics", + "source": [ + { + "extension": "stt" + }, + { + "extension": "tts" + }, + { + "extension": "llm" + } + ] + } + ] } ] } diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/README.md b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/README.md new file mode 100644 index 0000000000..48f4d34eea --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/README.md @@ -0,0 +1,425 @@ +# Cekura Metrics Extension for TEN Framework + +This extension collects metrics and transcripts from TEN agent components (ASR/STT, TTS, LLM, etc.) and POSTs them to [Cekura](https://cekura.ai) observability for evaluation and monitoring. + +## Keys and secrets (what we sync to git) + +**There are no real Cekura credentials in the tracked files.** In the repo you should only see: + +| Location | What appears | +|----------|----------------| +| `property.json` examples | `"api_key": "${env:CEKURA_API_KEY|}"` and similar `${env:…}` placeholders (TEN optional default after `|`). | +| This README | Illustrative words like `your-api-key-here`, never live tokens. | +| Unit tests | Dummy values such as `"test-key"` (not a production key). | + +Never commit literal API keys, Groq/OpenAI-style `sk-…` strings, or Agora/Cekura secrets into `property.json` or README. Use environment substitution only. + +## Suggested pull-request split + +| PR | Scope | Purpose | +|----|--------|---------| +| **1** | This extension + `manifest.json` / `property.json` wiring (as in `examples/voice-assistant`) + this README | Ship Cekura with **graph-only** integration and document **what works vs what does not** without changing `main_python`. | +| **2** (optional) | `main_python` (or your control extension) | Emit `transcript` / `llm_response` / `tool_call` data to `cekura_metrics` (e.g. via `helpers.py` and explicit `Loc` destinations) so Cekura sees **assistant** lines and **tools**, not only STT + module metrics. | + +PR 1 is mergeable on its own; PR 2 is only if you need full transcript parity with the in-app conversation log. + +--- + +## What works without changing `main_python` + +When the graph wires the same channels as the stock `voice_assistant` example: + +- **Session lifecycle**: Fan-out `on_user_joined` / `on_user_left` from `agora_rtc` → `cekura_metrics` starts a session on first join and flushes on last leave. +- **User speech (final)**: Fan-out `asr_result` from STT → `cekura_metrics` (in parallel with `main_control`). +- **Assistant speech**: Fan-out `text_data` from TTS → `cekura_metrics`. `ten_ai_base`'s TTS base classes emit `text_data` via `send_data` **without** `set_dests`, so the runtime routes it along graph edges. Any subscriber (including Cekura) can tap it. This carries the **spoken** assistant transcript (what TTS is producing audio for). See "Why this works — routing proof" below. +- **Latency**: `metrics` data from STT / TTS / LLM (`ten_ai_base` module metrics). + +If `CEKURA_API_KEY` is unset or configuration is invalid, the extension **stays idle** (logs a warning) so the rest of the agent still runs. + +--- + +## What does *not* work from graph wiring alone + +Two specific traffic shapes stay invisible to `cekura_metrics` unless `main_python` (or another bridge) explicitly fans them out: + +1. **The UI-bound, chunked transcript** that `main_python` posts to `message_collector`. It is sent with **`_send_data(..., dest="message_collector", ...)`** (a fixed `Loc` in `helper.py`). The runtime delivers it **only** to `message_collector`, bypassing graph fan-out. Whether you also want this shape in Cekura depends on whether TTS `text_data` is enough. +2. **LLM reasoning deltas and `tool_call` events**. `openai_llm2_python` (and other `AsyncLLM2BaseExtension` children) stream `chat_completion` **back through `return_result` (CmdResult)** to whoever called it — which is `main_python`. The `LLMResponseToolCall` / reasoning fragments surface there and never hit any `send_data` edge. To capture them in Cekura, `main_python` has to emit something Cekura subscribes to. + +So the **optional PR 2** exists specifically for (1) if you want UI-parity transcripts and (2) for tool calls / reasoning. + +### Can we add a “translator” in this extension instead of changing `main_python`? + +**Partially — for `text_data` and `asr_result`, yes; for the `message_collector`-bound stream and tool calls, no.** + +A handler in `cekura_metrics_python` only runs for **`Data` / `Cmd` that the runtime actually delivers to this extension**, which per `core/src/ten_runtime/extension/extension.c` (`ten_extension_determine_out_msgs`) means one of: + +- the message was sent **without** `set_dests`, in which case the runtime consults graph connections (`ten_extension_determine_out_msg_dest_from_graph`) and dispatches to every subscriber, or +- the message was sent **with** an explicit `set_dests([Loc(...)])`, in which case the runtime ships it only to those extensions (`ten_extension_determine_out_msg_dest_from_msg`). + +`main_python` → `message_collector` falls in the second bucket, so `cekura_metrics` is never in the delivery list. There is nothing to "translate" in-process. + +Summary: + +- **Yes**, this extension already translates formats it receives — `asr_result` → Cekura transcript rows, `text_data` → Cekura transcript rows, `metrics` → Cekura latency. That is normal `on_data` logic. +- **No**, it cannot turn `message_collector`-targeted traffic, nor `chat_completion` `CmdResult` streams, into Cekura rows without one of: a second `send_data` from `main_python` (or shared `helper.py`), a change to `message_collector2` to duplicate/forward, or a bridge extension placed on a path that already carries those messages. + +### Why this works — routing proof + +From `core/src/ten_runtime/extension/extension.c`: + +- If `ten_msg_get_dest_cnt(msg) > 0` → route by explicit dests (`determine_out_msg_dest_from_msg`). No graph fan-out. +- Else → look up graph connections (`determine_out_msg_dest_from_graph`) and **clone to every subscriber**. + +From `voice-assistant/tenapp/ten_packages/system/ten_ai_base/interface/ten_ai_base/tts.py` and `tts2.py`: the TTS base classes call `ten_env.send_data(data)` on a freshly `Data.create("text_data")` without ever calling `set_dests`. That puts the message in the "graph-routed" path, so a `data.source = [{ "extension": "tts" }]` subscription on `cekura_metrics` is sufficient to receive it in parallel with whoever else consumes it (e.g. `main_control`). + +From `voice-assistant/tenapp/ten_packages/extension/main_python/helper.py` (`_send_data`): explicit `data.set_dests([Loc("", "", dest)])`. That is the second bucket and bypasses graph fan-out by design. + +That is why the optional **second PR** exists for UI-parity transcripts and tool-call events — not for basic caller/assistant transcripts. + +--- + +## For coding assistants (how to wire Cekura in) + +Use this checklist when editing a TEN **example app** (paths are relative to `tenapp/`): + +1. **Register the addon** + In `manifest.json` → `dependencies`, add: + ```json + { "path": "../../../ten_packages/extension/cekura_metrics_python" } + ``` + (Adjust `../` depth if your app lives elsewhere.) + +2. **Lock file** + Run `tman install` inside `tenapp/` so `manifest-lock.json` picks up the new package (or merge the lock entry from `examples/voice-assistant`). + +3. **Graph node** + Under `ten.predefined_graphs[].graph.nodes`, add an extension node named e.g. `cekura_metrics` with `addon`: `cekura_metrics_python`, `extension_group`: `default`, and a `property` block (see [Configuration](#configuration)). + +4. **Graph connections** + Under the same graph’s `connections`, append a block for `cekura_metrics`: + - **`cmd`**: `on_user_joined`, `on_user_left` with `source` → `agora_rtc` (same command names `main_control` already uses). + - **`data`**: `asr_result` with `source` → your STT extension (same name STT already sends to `main_control`). + - **`data`**: `text_data` with `source` → your TTS extension (assistant transcript from `ten_ai_base` TTS base classes). + - **`data`**: `metrics` with `source` → STT, TTS, and LLM extensions that emit `ten_ai_base` metrics. + +5. **Environment** + Set `CEKURA_API_KEY` and either a numeric **`CEKURA_AGENT_ID`** in property JSON (`agent_id`) **or** `CEKURA_ASSISTANT_ID` (`assistant_id` string). Example app uses `${env:CEKURA_ASSISTANT_ID|}` plus `agent_id: 0` when only assistant id is needed. + +6. **Python deps** + Ensure `aiohttp` is installed for this addon (see `requirements.txt`). + +Do **not** commit real API keys; use `${env:...}` only. + +--- + +## For humans (manual wiring) + +You can achieve the same result as above in two ways: + +1. **TMAN Designer** (if your TEN build includes it) + Open the designer, add the `cekura_metrics_python` extension node, set properties from the table below, then draw **data/cmd connections** equivalent to the JSON in the [Reference fragment](#reference-graph-fragment-voice_assistant-example). Export or save so `property.json` / graph state updates. + +2. **Edit `property.json` by hand** + Merge the [Reference fragment](#reference-graph-fragment-voice_assistant-example) into your graph’s `nodes` and `connections`, and merge the manifest path as in step 1 for assistants. + +If your UI only exposes a subset of fields, fall back to raw `property.json` for advanced links (multi-source `metrics`, duplicate RTC commands). + +--- + +## Reference graph fragment (`voice_assistant` example) + +**Node** (place with other extensions): + +```json +{ + "type": "extension", + "name": "cekura_metrics", + "addon": "cekura_metrics_python", + "extension_group": "default", + "property": { + "api_key": "${env:CEKURA_API_KEY|}", + "agent_id": 0, + "assistant_id": "${env:CEKURA_ASSISTANT_ID|}", + "base_url": "https://api.cekura.ai", + "auto_flush": true, + "auto_flush_interval_ms": 5000, + "metric_ids": "${env:CEKURA_METRIC_IDS|}", + "collect_latency": true, + "collect_transcripts": true, + "collect_tool_calls": true + } +} +``` + +Set **`agent_id`** to your Cekura numeric agent id when you are not using `assistant_id`. + +**Connections** (append under the same graph’s `connections` array): + +```json +{ + "extension": "cekura_metrics", + "cmd": [ + { + "names": ["on_user_joined", "on_user_left"], + "source": [{ "extension": "agora_rtc" }] + } + ], + "data": [ + { + "name": "asr_result", + "source": [{ "extension": "stt" }] + }, + { + "name": "text_data", + "source": [{ "extension": "tts" }] + }, + { + "name": "metrics", + "source": [ + { "extension": "stt" }, + { "extension": "tts" }, + { "extension": "llm" } + ] + } + ] +} +``` + +Rename `stt` / `tts` / `llm` / `agora_rtc` if your graph uses different extension instance names. + +--- + +## Installation + +Copy this folder to your agent’s extensions tree (same level as other Python addons), or depend on it via `manifest.json` `path` as in the official examples layout: + +`ai_agents/agents/ten_packages/extension/cekura_metrics_python` + +Install Python dependency: + +```bash +pip install aiohttp +``` + +--- + +## Configuration + +### Property (`property.json` node `property` or root) + +```json +{ + "api_key": "${env:CEKURA_API_KEY}", + "agent_id": 123, + "assistant_id": "", + "base_url": "https://api.cekura.ai", + "auto_flush": true, + "auto_flush_interval_ms": 5000, + "metric_ids": "1,2,3", + "collect_latency": true, + "collect_transcripts": true, + "collect_tool_calls": true +} +``` + +### Properties + +| Property | Type | Default | Description | +|----------|------|---------|-------------| +| `api_key` | string | `""` | Cekura API key; if empty after env substitution, extension does nothing. | +| `agent_id` | int | `0` | Cekura agent id (required unless `assistant_id` is set). | +| `assistant_id` | string | `""` | External assistant id (min length enforced at API). | +| `base_url` | string | `https://api.cekura.ai` | API base URL. | +| `auto_flush` | bool | `true` | POST full snapshots on an interval while session is open. | +| `auto_flush_interval_ms` | int | `5000` | Interval for auto-flush. | +| `metric_ids` | string | `""` | Comma-separated Cekura metric ids to evaluate. | +| `collect_latency` | bool | `true` | Ingest `metrics` / latency payloads. | +| `collect_transcripts` | bool | `true` | Ingest transcript-style payloads where connected. | +| `collect_tool_calls` | bool | `true` | Ingest `tool_call` where connected. | + +--- + +## Runtime behaviour (when extension is enabled) + +1. Configure `api_key` and `agent_id` or `assistant_id`. +2. Wire the graph (see above) or send `session_start` / data from control code using `helpers.py`. +3. With **auto RTC** wiring, the first `on_user_joined` opens a session; last `on_user_left` ends and POSTs. With **`auto_flush`**, snapshots POST on a timer while the session is open. +4. Listen for **`metrics_sent`** after each HTTP POST. + +--- + +## API + +### Data inputs + +#### `transcript` + +```json +{ + "text": "Hello, how can I help you?", + "role": "assistant", + "is_final": true, + "start_time": 1.5, + "end_time": 3.2 +} +``` + +#### `llm_response` + +```json +{ + "text": "I can help you with that.", + "latency_ms": 250.5, + "tokens_in": 50, + "tokens_out": 25, + "model": "gpt-4o" +} +``` + +#### `tts_audio` + +```json +{ + "text": "Hello there", + "latency_ms": 150.0, + "duration_ms": 1200.0, + "vendor": "elevenlabs" +} +``` + +#### `asr_result` + +JSON root from STT (`ten_ai_base`) or flat properties — see `extension.py`. + +#### `metrics` + +`ModuleMetrics` JSON root from STT/TTS/LLM. + +#### `tool_call` + +```json +{ + "name": "get_weather", + "arguments": "{\"location\": \"NYC\"}", + "result": "{\"temp\": 72}", + "success": true, + "latency_ms": 500.0 +} +``` + +### Commands + +- `session_start` / `session_end` / `flush` — see inline docstrings in `extension.py`. +- `on_user_joined` / `on_user_left` — optional RTC lifecycle from `agora_rtc`. + +### Command output + +- `metrics_sent` — `session_id`, `success`, optional `call_log_id`. + +--- + +## Helpers (`helpers.py`) + +Optional typed sends from other extensions. When using them, set destinations the same way as `main_python/helper.py` (`Loc`) if your runtime requires explicit routing; graph-only routing applies to producers that emit on connected channels. + +--- + +## Cekura metrics catalogue + +See [Cekura pre-defined metrics](https://docs.cekura.ai/documentation/key-concepts/metrics/pre-defined-metrics). + +--- + +## Environment variables (full list) + +| Variable | Required when Cekura is enabled? | Purpose | +|----------|----------------------------------|---------| +| `CEKURA_API_KEY` | **Yes** (to send anything) | Cekura API key (`X-CEKURA-API-KEY` on observe). If unset, the extension disables itself. | +| `CEKURA_ASSISTANT_ID` | **One of** this or numeric `agent_id` in JSON | External assistant id (e.g. `asst_…`). Used when `property.json` sets `"assistant_id": "${env:CEKURA_ASSISTANT_ID|}"` and `agent_id` is `0`. | +| `CEKURA_METRIC_IDS` | No | Comma-separated metric ids to evaluate on each observe call (e.g. `1,2,3`). | + +**Not an env var:** you may instead set a numeric **`agent_id`** directly in the `cekura_metrics` node in `property.json` (no secret there — it is a public id in Cekura). You still need `CEKURA_API_KEY` for auth. + +No other Cekura-specific env vars are required by this extension. (Your Agora / STT / LLM / TTS keys stay separate.) + +--- + +## Copy-paste examples + +### `.env` (Cekura only; combine with your existing TEN `.env`) + +```bash +# Required to enable POSTs to Cekura +CEKURA_API_KEY= + +# Pick ONE identity style: +# A) Assistant id from Cekura / provider (matches voice_assistant example property.json) +CEKURA_ASSISTANT_ID= + +# B) If you prefer numeric agent id, leave CEKURA_ASSISTANT_ID unset and put agent_id in property.json (see below) + +# Optional — limit which Cekura metrics run on each call +# CEKURA_METRIC_IDS=1,2,3 +``` + +### `property.json` node — **assistant id via env** (matches `examples/voice-assistant`) + +```json +{ + "type": "extension", + "name": "cekura_metrics", + "addon": "cekura_metrics_python", + "extension_group": "default", + "property": { + "api_key": "${env:CEKURA_API_KEY|}", + "agent_id": 0, + "assistant_id": "${env:CEKURA_ASSISTANT_ID|}", + "base_url": "https://api.cekura.ai", + "auto_flush": true, + "auto_flush_interval_ms": 5000, + "metric_ids": "${env:CEKURA_METRIC_IDS|}", + "collect_latency": true, + "collect_transcripts": true, + "collect_tool_calls": true + } +} +``` + +### `property.json` node — **numeric agent id** (no `CEKURA_ASSISTANT_ID` needed) + +Replace `12345` with your Cekura dashboard agent id (integer, not secret): + +```json +{ + "type": "extension", + "name": "cekura_metrics", + "addon": "cekura_metrics_python", + "extension_group": "default", + "property": { + "api_key": "${env:CEKURA_API_KEY|}", + "agent_id": 12345, + "assistant_id": "", + "base_url": "https://api.cekura.ai", + "auto_flush": true, + "auto_flush_interval_ms": 5000, + "metric_ids": "${env:CEKURA_METRIC_IDS|}", + "collect_latency": true, + "collect_transcripts": true, + "collect_tool_calls": true + } +} +``` + +### `manifest.json` dependency (path layout for `agents/examples/.../tenapp`) + +```json +{ + "path": "../../../ten_packages/extension/cekura_metrics_python" +} +``` + +--- + +## License + +MIT diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/__init__.py b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/__init__.py new file mode 100644 index 0000000000..f725cc8505 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/__init__.py @@ -0,0 +1,2 @@ +# Cekura Metrics Extension for TEN Framework +from . import addon diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/addon.py b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/addon.py new file mode 100644 index 0000000000..2f734329d9 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/addon.py @@ -0,0 +1,14 @@ +from ten_runtime import ( + Addon, + register_addon_as_extension, + TenEnv, +) + + +@register_addon_as_extension("cekura_metrics_python") +class CekuraMetricsExtensionAddon(Addon): + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + from .extension import CekuraMetricsExtension + + ten_env.log_info("Cekura Metrics: on_create_instance") + ten_env.on_create_instance_done(CekuraMetricsExtension(name), context) diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/client.py b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/client.py new file mode 100644 index 0000000000..51354d6509 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/client.py @@ -0,0 +1,59 @@ +import aiohttp +import json +from typing import Optional, Any + +from .config import CekuraMetricsConfig +from .session import Session + + +class CekuraClient: + def __init__(self, config: CekuraMetricsConfig): + self.config = config + self._session: Optional[aiohttp.ClientSession] = None + + async def _get_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession( + headers={ + "X-CEKURA-API-KEY": self.config.api_key, + "Content-Type": "application/json", + }, + timeout=aiohttp.ClientTimeout(total=30), + ) + return self._session + + async def close(self) -> None: + if self._session and not self._session.closed: + await self._session.close() + self._session = None + + async def send_session(self, session: Session) -> dict[str, Any]: + payload = session.to_observe_payload( + agent_id=self.config.agent_id, + assistant_id=self.config.assistant_id, + metric_ids=self.config.metric_ids, + ) + + http_session = await self._get_session() + + async with http_session.post( + self.config.observe_endpoint, + json=payload, + ) as response: + response_text = await response.text() + + if response.status == 201: + return json.loads(response_text) + else: + raise CekuraAPIError( + f"Failed to send session to Cekura: {response.status} - {response_text}", + status_code=response.status, + response_body=response_text, + ) + + +class CekuraAPIError(Exception): + def __init__(self, message: str, status_code: int = 0, response_body: str = ""): + super().__init__(message) + self.status_code = status_code + self.response_body = response_body diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/config.py b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/config.py new file mode 100644 index 0000000000..ec58487f66 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/config.py @@ -0,0 +1,40 @@ +from dataclasses import dataclass, field +from typing import Optional +import json + + +@dataclass +class CekuraMetricsConfig: + api_key: str = "" + agent_id: int = 0 + assistant_id: str = "" + base_url: str = "https://api.cekura.ai" + auto_flush: bool = True + auto_flush_interval_ms: int = 5000 + metric_ids: str = "" + collect_latency: bool = True + collect_transcripts: bool = True + collect_tool_calls: bool = True + + @classmethod + def from_json(cls, json_str: str) -> "CekuraMetricsConfig": + data = json.loads(json_str) + if isinstance(data, dict): + inner = data.get("property") + if isinstance(inner, dict) and ( + "api_key" in inner + or "agent_id" in inner + or "assistant_id" in inner + ): + data = inner + return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}) + + def validate(self) -> None: + if not self.api_key: + raise ValueError("api_key is required for Cekura metrics") + if not self.agent_id and not self.assistant_id: + raise ValueError("Either agent_id or assistant_id must be provided") + + @property + def observe_endpoint(self) -> str: + return f"{self.base_url.rstrip('/')}/observability/v1/observe/" diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/extension.py b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/extension.py new file mode 100644 index 0000000000..a34c7b39ad --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/extension.py @@ -0,0 +1,613 @@ +""" +Cekura Metrics Extension for TEN Framework + +This extension collects metrics from various TEN components (ASR, STT, TTS, LLM, etc.) +and sends them to Cekura's observability API for analysis and evaluation. + +Supported data inputs: +- transcript: Text from ASR/STT with role and timing +- llm_response: LLM responses with latency and token counts (if connected) +- tts_audio: TTS synthesis events with latency (if connected) +- asr_result: ASR transcripts (JSON root from ten_ai_base ASRResult) +- metrics: ModuleMetrics from STT/TTS/LLM (ttfw, ttlw, ttfb, ttft, …) when connected +- tool_call: Tool/function call events with success status + +Commands: +- session_start: Begin a new metrics collection session (starts periodic auto-flush when enabled) +- session_end: End session and flush metrics to Cekura +- flush: Manually flush current session to Cekura (final send; clears session) +- on_user_joined / on_user_left: Optional RTC lifecycle from `agora_rtc` (same as main control); + first join starts a session with a generated call id; last leave ends and flushes. + +Auto-flush (when `auto_flush` is true) POSTs a full session snapshot on each interval while the +session is active, so data is pushed before `session_end` if the call is long. +""" + +import asyncio +import json +import uuid +from typing import Any, Optional +from datetime import datetime + +from ten_runtime import ( + AsyncExtension, + AsyncTenEnv, + Cmd, + CmdResult, + Data, + StatusCode, +) + +from .config import CekuraMetricsConfig +from .session import Session +from .client import CekuraClient, CekuraAPIError + + +def _msg_prop_str(msg: Cmd | Data, key: str, default: str = "") -> str: + """Msg.get_property_string returns (value, TenError | None), not a bare string.""" + v, err = msg.get_property_string(key) + return default if err is not None else v + + +def _msg_prop_bool(msg: Cmd | Data, key: str, default: bool = False) -> bool: + v, err = msg.get_property_bool(key) + return default if err is not None else v + + +def _msg_prop_float(msg: Cmd | Data, key: str, default: float = 0.0) -> float: + v, err = msg.get_property_float(key) + return default if err is not None else float(v) + + +def _msg_json_root_dict(msg: Cmd | Data) -> dict[str, Any] | None: + """Payload from set_property_from_json(None, json). STT sends ASRResult this way (no flat keys).""" + raw, err = msg.get_property_to_json(None) + if err is not None or not raw: + return None + if isinstance(raw, dict): + return raw + if isinstance(raw, str): + try: + parsed = json.loads(raw) + return parsed if isinstance(parsed, dict) else None + except Exception: + return None + return None + + +class CekuraMetricsExtension(AsyncExtension): + def __init__(self, name: str) -> None: + super().__init__(name) + self.config: Optional[CekuraMetricsConfig] = None + self.client: Optional[CekuraClient] = None + self.current_session: Optional[Session] = None + self._auto_flush_task: Optional[asyncio.Task] = None + self._ten_env: Optional[AsyncTenEnv] = None + self._rtc_user_count: int = 0 + + async def on_configure(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_info("Cekura Metrics: on_configure") + + async def on_init(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_info("Cekura Metrics: on_init") + await super().on_init(ten_env) + + config_str, prop_err = await ten_env.get_property_to_json("") + if prop_err is not None: + raise RuntimeError(f"get_property_to_json: {prop_err}") + + self.config = None + self.client = None + try: + cfg = CekuraMetricsConfig.from_json(config_str) + if not (cfg.api_key or "").strip(): + ten_env.log_warn( + "Cekura Metrics: no api_key (e.g. CEKURA_API_KEY); extension disabled for this run." + ) + return + cfg.validate() + self.config = cfg + self.client = CekuraClient(self.config) + ten_env.log_info( + f"Cekura Metrics: configured with agent_id={self.config.agent_id}" + ) + except Exception as e: + ten_env.log_error( + f"Cekura Metrics: invalid configuration — extension disabled: {e}" + ) + self.config = None + self.client = None + + async def on_start(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_info("Cekura Metrics: on_start") + self._ten_env = ten_env + + async def on_stop(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_info("Cekura Metrics: on_stop") + + if self._auto_flush_task and not self._auto_flush_task.done(): + self._auto_flush_task.cancel() + try: + await self._auto_flush_task + except asyncio.CancelledError: + pass + + if self.current_session: + await self._flush_session(ten_env) + self._rtc_user_count = 0 + + async def on_deinit(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_info("Cekura Metrics: on_deinit") + if self.client: + await self.client.close() + + async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + ten_env.log_debug(f"Cekura Metrics: on_cmd - {cmd_name}") + + if not self.client: + cmd_result = CmdResult.create(StatusCode.OK, cmd) + await ten_env.return_result(cmd_result) + return + + try: + if cmd_name == "session_start": + await self._handle_session_start(ten_env, cmd) + elif cmd_name == "session_end": + await self._handle_session_end(ten_env, cmd) + elif cmd_name == "flush": + await self._handle_flush(ten_env, cmd) + elif cmd_name == "on_user_joined": + await self._handle_agora_user_joined(ten_env, cmd) + elif cmd_name == "on_user_left": + await self._handle_agora_user_left(ten_env, cmd) + else: + ten_env.log_warn(f"Cekura Metrics: unknown command - {cmd_name}") + cmd_result = CmdResult.create(StatusCode.ERROR, cmd) + cmd_result.set_property_string("error", f"Unknown command: {cmd_name}") + await ten_env.return_result(cmd_result) + return + + cmd_result = CmdResult.create(StatusCode.OK, cmd) + await ten_env.return_result(cmd_result) + except Exception as e: + ten_env.log_error(f"Cekura Metrics: error handling command {cmd_name} - {e}") + cmd_result = CmdResult.create(StatusCode.ERROR, cmd) + cmd_result.set_property_string("error", str(e)) + await ten_env.return_result(cmd_result) + + async def on_data(self, ten_env: AsyncTenEnv, data: Data) -> None: + data_name = data.get_name() + ten_env.log_debug(f"Cekura Metrics: on_data - {data_name}") + + if not self.client: + return + + if not self.current_session: + ten_env.log_warn(f"Cekura Metrics: received data '{data_name}' but no active session") + return + + try: + if data_name == "transcript": + await self._handle_transcript(ten_env, data) + elif data_name == "llm_response": + await self._handle_llm_response(ten_env, data) + elif data_name == "tts_audio": + await self._handle_tts_audio(ten_env, data) + elif data_name == "asr_result": + await self._handle_asr_result(ten_env, data) + elif data_name == "text_data": + await self._handle_text_data(ten_env, data) + elif data_name == "metrics": + await self._handle_module_metrics(ten_env, data) + elif data_name == "tool_call": + await self._handle_tool_call(ten_env, data) + else: + ten_env.log_debug(f"Cekura Metrics: ignoring unknown data type - {data_name}") + except Exception as e: + ten_env.log_error(f"Cekura Metrics: error handling data {data_name} - {e}") + + async def _handle_session_start(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: + if self.current_session: + ten_env.log_warn("Cekura Metrics: starting new session while previous session active, flushing previous") + await self._flush_session(ten_env) + + session_id = _msg_prop_str(cmd, "session_id") + channel_name = _msg_prop_str(cmd, "channel_name") + customer_number = _msg_prop_str(cmd, "customer_number") + metadata = {} + metadata_str = _msg_prop_str(cmd, "metadata") + if metadata_str: + try: + metadata = json.loads(metadata_str) + except Exception: + pass + + self.current_session = Session( + session_id=session_id, + channel_name=channel_name, + customer_number=customer_number, + metadata=metadata, + ) + + ten_env.log_info(f"Cekura Metrics: session started - {session_id}") + + if self.config.auto_flush: + self._start_auto_flush(ten_env) + + async def _handle_session_end(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: + if not self.current_session: + ten_env.log_warn("Cekura Metrics: session_end received but no active session") + return + + ended_reason = _msg_prop_str(cmd, "ended_reason") + + self.current_session.end(ended_reason) + await self._flush_session(ten_env) + + async def _handle_flush(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: + if not self.current_session: + ten_env.log_warn("Cekura Metrics: flush received but no active session") + return + + await self._flush_session(ten_env) + + def _channel_name_from_rtc_cmd(self, cmd: Cmd) -> str: + """Best-effort channel name from Agora RTC on_user_* cmd payload (if present).""" + try: + raw, err = cmd.get_property_to_json(None) + if err or not raw: + return "" + if isinstance(raw, str): + data = json.loads(raw) + else: + data = raw + if isinstance(data, dict): + return str( + data.get("channel") or data.get("channel_name") or "" + ) + except Exception: + pass + return "" + + async def _handle_agora_user_joined(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: + """Mirror main_control: first RTC user starts a Cekura call session (no other extension edits).""" + self._rtc_user_count += 1 + if self._rtc_user_count != 1: + return + + if self.current_session: + ten_env.log_warn( + "Cekura Metrics: on_user_joined while session active; flushing previous" + ) + await self._flush_session(ten_env) + + channel = self._channel_name_from_rtc_cmd(cmd) + session_id = str(uuid.uuid4()) + metadata = {"session_source": "agora_rtc"} + if channel: + metadata["rtc_channel"] = channel + + self.current_session = Session( + session_id=session_id, + channel_name=channel, + customer_number="", + metadata=metadata, + ) + ten_env.log_info(f"Cekura Metrics: RTC session started (auto) — {session_id}") + + if self.config.auto_flush: + self._start_auto_flush(ten_env) + + async def _handle_agora_user_left(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: + """Last RTC user left: end and flush (matches single-user voice agent).""" + self._rtc_user_count -= 1 + if self._rtc_user_count > 0: + return + self._rtc_user_count = 0 + + if not self.current_session: + ten_env.log_warn("Cekura Metrics: on_user_left but no active session") + return + + self.current_session.end("agora_on_user_left") + await self._flush_session(ten_env) + + async def _handle_transcript(self, ten_env: AsyncTenEnv, data: Data) -> None: + if not self.config.collect_transcripts: + return + + text = _msg_prop_str(data, "text") + role = _msg_prop_str(data, "role") + + is_final = _msg_prop_bool(data, "is_final", True) + + if not is_final: + return + + start_time = _msg_prop_float(data, "start_time") + end_time = _msg_prop_float(data, "end_time") + + cekura_role = "Main Agent" if role.lower() in ["bot", "assistant", "agent", "ai"] else "Testing Agent" + + self.current_session.add_transcript( + role=cekura_role, + content=text, + start_time=start_time, + end_time=end_time, + ) + ten_env.log_debug(f"Cekura Metrics: transcript added - role={cekura_role}, len={len(text)}") + + async def _handle_module_metrics(self, ten_env: AsyncTenEnv, data: Data) -> None: + """TEN AI base sends ModuleMetrics as data name \"metrics\" (ttfb, ttfw, ttft, etc.).""" + if not self.config.collect_latency: + return + root = _msg_json_root_dict(data) + if not root: + return + module = str(root.get("module", "") or "") + vendor = str(root.get("vendor", "") or "") + m = root.get("metrics") + if not isinstance(m, dict): + return + + metadata: dict[str, Any] = {} + if vendor: + metadata["vendor"] = vendor + meta = root.get("metadata") + if isinstance(meta, dict): + for k in ("session_id", "turn_id", "model_id", "voice_id"): + if k in meta and meta[k] is not None: + metadata[k] = meta[k] + + def _emit(kind: str, ms: float) -> None: + self.current_session.add_latency_metric(kind, ms, **metadata) + ten_env.log_debug( + f"Cekura Metrics: {kind} latency from metrics msg: {ms}ms module={module}" + ) + + if module == "asr": + # Prefer word-timing; skip actual_send / vendor_metrics (noisy or huge dicts). + for key in ("ttfw", "ttlw"): + v = m.get(key) + if isinstance(v, (int, float)) and float(v) > 0: + _emit("asr", float(v)) + return + elif module == "tts": + v = m.get("ttfb") + if isinstance(v, (int, float)) and float(v) > 0: + _emit("tts", float(v)) + elif module == "llm": + for key in ("ttft", "ttfs"): + v = m.get(key) + if isinstance(v, (int, float)) and float(v) > 0: + _emit("llm", float(v)) + return + + async def _handle_llm_response(self, ten_env: AsyncTenEnv, data: Data) -> None: + if not self.config.collect_latency: + return + + latency_ms = _msg_prop_float(data, "latency_ms") + + metadata = {} + ti, err_ti = data.get_property_int("tokens_in") + if err_ti is None: + metadata["tokens_in"] = ti + to, err_to = data.get_property_int("tokens_out") + if err_to is None: + metadata["tokens_out"] = to + model = _msg_prop_str(data, "model") + if model: + metadata["model"] = model + + if latency_ms > 0: + self.current_session.add_latency_metric("llm", latency_ms, **metadata) + ten_env.log_debug(f"Cekura Metrics: LLM latency recorded - {latency_ms}ms") + + async def _handle_tts_audio(self, ten_env: AsyncTenEnv, data: Data) -> None: + if not self.config.collect_latency: + return + + latency_ms = _msg_prop_float(data, "latency_ms") + + metadata = {} + dur, err_dur = data.get_property_float("duration_ms") + if err_dur is None: + metadata["duration_ms"] = dur + vendor = _msg_prop_str(data, "vendor") + if vendor: + metadata["vendor"] = vendor + + if latency_ms > 0: + self.current_session.add_latency_metric("tts", latency_ms, **metadata) + ten_env.log_debug(f"Cekura Metrics: TTS latency recorded - {latency_ms}ms") + + async def _handle_asr_result(self, ten_env: AsyncTenEnv, data: Data) -> None: + root = _msg_json_root_dict(data) + if root: + text = str(root.get("text", "") or "") + is_final = bool(root.get("final", False)) + latency_ms = float(root.get("latency_ms", 0) or 0) + else: + text = _msg_prop_str(data, "text") + is_final = _msg_prop_bool(data, "is_final", False) + latency_ms = _msg_prop_float(data, "latency_ms") + + if self.config.collect_transcripts and is_final and text: + self.current_session.add_transcript( + role="Testing Agent", + content=text, + start_time=datetime.now().timestamp(), + end_time=datetime.now().timestamp(), + ) + + if self.config.collect_latency and latency_ms > 0: + metadata = {} + if root: + c = root.get("confidence") + if isinstance(c, (int, float)): + metadata["confidence"] = float(c) + v = root.get("vendor") + if isinstance(v, str) and v: + metadata["vendor"] = v + else: + conf, err_conf = data.get_property_float("confidence") + if err_conf is None: + metadata["confidence"] = conf + vendor = _msg_prop_str(data, "vendor") + if vendor: + metadata["vendor"] = vendor + + self.current_session.add_latency_metric("asr", latency_ms, **metadata) + ten_env.log_debug(f"Cekura Metrics: ASR latency recorded - {latency_ms}ms") + + async def _handle_text_data(self, ten_env: AsyncTenEnv, data: Data) -> None: + """ + Handle `text_data` emitted by ten_ai_base TTS (AssistantTranscription JSON). + + TTS base classes call send_data(Data.create("text_data")) without + set_dests, so the runtime routes via graph connections. Subscribing to + text_data from the TTS extension gives us the assistant-spoken + transcript without touching main_python. + """ + if not self.config.collect_transcripts: + return + + root = _msg_json_root_dict(data) + if not root: + return + + obj = str(root.get("object", "") or "") + if obj and obj != "assistant.transcription": + ten_env.log_debug( + f"Cekura Metrics: ignoring text_data with object={obj}" + ) + return + + text = str(root.get("text", "") or "").strip() + is_final = bool(root.get("is_final", False)) or bool( + root.get("final", False) + ) + if not text or not is_final: + return + + start_ms = root.get("start_ms") + duration_ms = root.get("duration_ms") + now = datetime.now().timestamp() + try: + start_time = float(start_ms) / 1000.0 if start_ms is not None else now + except Exception: + start_time = now + try: + end_time = ( + start_time + (float(duration_ms) / 1000.0) + if duration_ms is not None + else now + ) + except Exception: + end_time = now + + self.current_session.add_transcript( + role="Main Agent", + content=text, + start_time=start_time, + end_time=end_time, + ) + ten_env.log_debug( + f"Cekura Metrics: assistant transcript added via text_data - len={len(text)}" + ) + + async def _handle_tool_call(self, ten_env: AsyncTenEnv, data: Data) -> None: + if not self.config.collect_tool_calls: + return + + name = _msg_prop_str(data, "name") + arguments = _msg_prop_str(data, "arguments") + result = _msg_prop_str(data, "result") + success = _msg_prop_bool(data, "success", True) + latency_ms = _msg_prop_float(data, "latency_ms") + + self.current_session.add_tool_call( + name=name, + arguments=arguments, + result=result, + success=success, + latency_ms=latency_ms, + ) + ten_env.log_debug(f"Cekura Metrics: tool call recorded - {name}, success={success}") + + def _start_auto_flush(self, ten_env: AsyncTenEnv) -> None: + if not self.config or not self.config.auto_flush: + return + if self._auto_flush_task and not self._auto_flush_task.done(): + self._auto_flush_task.cancel() + + async def auto_flush_loop() -> None: + while self.current_session and not self.current_session.ended_at: + await asyncio.sleep(self.config.auto_flush_interval_ms / 1000) + if not self.current_session or self.current_session.ended_at: + break + if not self.current_session.has_observe_payload(): + continue + ten_env.log_debug("Cekura Metrics: auto-flush sending snapshot") + await self._post_session(ten_env, self.current_session) + + self._auto_flush_task = asyncio.create_task(auto_flush_loop()) + + async def _post_session(self, ten_env: AsyncTenEnv, session: Session) -> None: + """POST one session snapshot to Cekura and emit metrics_sent (success or failure).""" + if not self.client: + return + try: + ten_env.log_info(f"Cekura Metrics: sending session {session.session_id}") + result = await self.client.send_session(session) + + call_log_id = result.get("id", 0) + ten_env.log_info( + f"Cekura Metrics: session sent successfully, call_log_id={call_log_id}" + ) + + out_cmd = Cmd.create("metrics_sent") + out_cmd.set_property_string("session_id", session.session_id) + out_cmd.set_property_bool("success", True) + out_cmd.set_property_int("call_log_id", call_log_id) + await ten_env.send_cmd(out_cmd) + + except CekuraAPIError as e: + ten_env.log_error(f"Cekura Metrics: API error - {e}") + + out_cmd = Cmd.create("metrics_sent") + out_cmd.set_property_string("session_id", session.session_id) + out_cmd.set_property_bool("success", False) + await ten_env.send_cmd(out_cmd) + + except Exception as e: + ten_env.log_error(f"Cekura Metrics: unexpected error - {e}") + + out_cmd = Cmd.create("metrics_sent") + out_cmd.set_property_string("session_id", session.session_id) + out_cmd.set_property_bool("success", False) + await ten_env.send_cmd(out_cmd) + + async def _flush_session(self, ten_env: AsyncTenEnv) -> None: + if not self.current_session: + return + + if self._auto_flush_task and not self._auto_flush_task.done(): + self._auto_flush_task.cancel() + try: + await self._auto_flush_task + except asyncio.CancelledError: + pass + + session = self.current_session + self.current_session = None + + if not session.has_observe_payload(): + ten_env.log_warn( + f"Cekura Metrics: session {session.session_id} has no data to send" + ) + return + + await self._post_session(ten_env, session) diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/helpers.py b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/helpers.py new file mode 100644 index 0000000000..6ad431a354 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/helpers.py @@ -0,0 +1,174 @@ +""" +Helper functions for other TEN extensions to send metrics to the Cekura Metrics extension. + +Usage in your extension: + + from cekura_metrics_python.helpers import ( + send_transcript, + send_llm_response, + send_tts_latency, + send_asr_result, + send_tool_call, + start_session, + end_session, + ) + + # In your on_start or when a call begins: + await start_session(ten_env, session_id="call-123", channel_name="room-abc") + + # When you have transcript data: + await send_transcript(ten_env, text="Hello!", role="assistant", is_final=True) + + # When LLM responds: + await send_llm_response(ten_env, text="Response", latency_ms=250, model="gpt-4o") + + # When TTS generates audio: + await send_tts_latency(ten_env, latency_ms=150, vendor="elevenlabs") + + # When ASR transcribes speech: + await send_asr_result(ten_env, text="User said this", latency_ms=200, confidence=0.95) + + # When a tool is called: + await send_tool_call(ten_env, name="get_weather", arguments='{"city":"NYC"}', result='{"temp":72}', success=True) + + # When the call ends: + await end_session(ten_env, session_id="call-123", ended_reason="customer-hangup") +""" + +import json +from typing import Optional, Any + +from ten_runtime import AsyncTenEnv, Data, Cmd + + +async def send_transcript( + ten_env: AsyncTenEnv, + text: str, + role: str = "assistant", + is_final: bool = True, + start_time: float = 0.0, + end_time: float = 0.0, +) -> None: + """Send a transcript message to the Cekura Metrics extension.""" + data = Data.create("transcript") + data.set_property_string("text", text) + data.set_property_string("role", role) + data.set_property_bool("is_final", is_final) + data.set_property_float("start_time", start_time) + data.set_property_float("end_time", end_time) + await ten_env.send_data(data) + + +async def send_llm_response( + ten_env: AsyncTenEnv, + text: str = "", + latency_ms: float = 0.0, + tokens_in: int = 0, + tokens_out: int = 0, + model: str = "", +) -> None: + """Send LLM response metrics to the Cekura Metrics extension.""" + data = Data.create("llm_response") + if text: + data.set_property_string("text", text) + data.set_property_float("latency_ms", latency_ms) + data.set_property_int("tokens_in", tokens_in) + data.set_property_int("tokens_out", tokens_out) + if model: + data.set_property_string("model", model) + await ten_env.send_data(data) + + +async def send_tts_latency( + ten_env: AsyncTenEnv, + latency_ms: float, + text: str = "", + duration_ms: float = 0.0, + vendor: str = "", +) -> None: + """Send TTS latency metrics to the Cekura Metrics extension.""" + data = Data.create("tts_audio") + data.set_property_float("latency_ms", latency_ms) + if text: + data.set_property_string("text", text) + if duration_ms > 0: + data.set_property_float("duration_ms", duration_ms) + if vendor: + data.set_property_string("vendor", vendor) + await ten_env.send_data(data) + + +async def send_asr_result( + ten_env: AsyncTenEnv, + text: str, + is_final: bool = True, + latency_ms: float = 0.0, + confidence: float = 0.0, + vendor: str = "", +) -> None: + """Send ASR result to the Cekura Metrics extension.""" + data = Data.create("asr_result") + data.set_property_string("text", text) + data.set_property_bool("is_final", is_final) + data.set_property_float("latency_ms", latency_ms) + if confidence > 0: + data.set_property_float("confidence", confidence) + if vendor: + data.set_property_string("vendor", vendor) + await ten_env.send_data(data) + + +async def send_tool_call( + ten_env: AsyncTenEnv, + name: str, + arguments: str = "", + result: str = "", + success: bool = True, + latency_ms: float = 0.0, +) -> None: + """Send a tool call event to the Cekura Metrics extension.""" + data = Data.create("tool_call") + data.set_property_string("name", name) + data.set_property_string("arguments", arguments) + data.set_property_string("result", result) + data.set_property_bool("success", success) + data.set_property_float("latency_ms", latency_ms) + await ten_env.send_data(data) + + +async def start_session( + ten_env: AsyncTenEnv, + session_id: str, + channel_name: str = "", + customer_number: str = "", + metadata: Optional[dict[str, Any]] = None, +) -> None: + """Start a new metrics collection session.""" + cmd = Cmd.create("session_start") + cmd.set_property_string("session_id", session_id) + if channel_name: + cmd.set_property_string("channel_name", channel_name) + if customer_number: + cmd.set_property_string("customer_number", customer_number) + if metadata: + cmd.set_property_string("metadata", json.dumps(metadata)) + await ten_env.send_cmd(cmd) + + +async def end_session( + ten_env: AsyncTenEnv, + session_id: str, + ended_reason: str = "", +) -> None: + """End the current session and flush metrics to Cekura.""" + cmd = Cmd.create("session_end") + cmd.set_property_string("session_id", session_id) + if ended_reason: + cmd.set_property_string("ended_reason", ended_reason) + await ten_env.send_cmd(cmd) + + +async def flush_session(ten_env: AsyncTenEnv) -> None: + """Manually flush the current session to Cekura.""" + cmd = Cmd.create("flush") + await ten_env.send_cmd(cmd) diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/manifest.json b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/manifest.json new file mode 100644 index 0000000000..d1c0c41f21 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/manifest.json @@ -0,0 +1,249 @@ +{ + "type": "extension", + "name": "cekura_metrics_python", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.11" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "**.tent", + "**.py", + "README.md", + "requirements.txt" + ] + }, + "api": { + "data_in": [ + { + "name": "transcript", + "property": { + "properties": { + "text": { + "type": "string" + }, + "role": { + "type": "string" + }, + "is_final": { + "type": "bool" + }, + "start_time": { + "type": "float64" + }, + "end_time": { + "type": "float64" + } + }, + "required": ["text", "role"] + } + }, + { + "name": "llm_response", + "property": { + "properties": { + "text": { + "type": "string" + }, + "latency_ms": { + "type": "float64" + }, + "tokens_in": { + "type": "int64" + }, + "tokens_out": { + "type": "int64" + }, + "model": { + "type": "string" + } + } + } + }, + { + "name": "tts_audio", + "property": { + "properties": { + "text": { + "type": "string" + }, + "latency_ms": { + "type": "float64" + }, + "duration_ms": { + "type": "float64" + }, + "vendor": { + "type": "string" + } + } + } + }, + { + "name": "metrics", + "property": { + "properties": {} + } + }, + { + "name": "asr_result", + "property": { + "properties": { + "text": { + "type": "string" + }, + "is_final": { + "type": "bool" + }, + "latency_ms": { + "type": "float64" + }, + "confidence": { + "type": "float64" + }, + "vendor": { + "type": "string" + } + } + } + }, + { + "name": "text_data", + "property": { + "properties": {} + } + }, + { + "name": "tool_call", + "property": { + "properties": { + "name": { + "type": "string" + }, + "arguments": { + "type": "string" + }, + "result": { + "type": "string" + }, + "success": { + "type": "bool" + }, + "latency_ms": { + "type": "float64" + } + }, + "required": ["name"] + } + } + ], + "cmd_in": [ + { + "name": "on_user_joined", + "property": { + "properties": {} + } + }, + { + "name": "on_user_left", + "property": { + "properties": {} + } + }, + { + "name": "session_start", + "property": { + "properties": { + "session_id": { + "type": "string" + }, + "channel_name": { + "type": "string" + }, + "customer_number": { + "type": "string" + }, + "metadata": { + "type": "string" + } + }, + "required": ["session_id"] + } + }, + { + "name": "session_end", + "property": { + "properties": { + "session_id": { + "type": "string" + }, + "ended_reason": { + "type": "string" + } + } + } + }, + { + "name": "flush" + } + ], + "cmd_out": [ + { + "name": "metrics_sent", + "property": { + "properties": { + "session_id": { + "type": "string" + }, + "success": { + "type": "bool" + }, + "call_log_id": { + "type": "int64" + } + } + } + } + ], + "property": { + "properties": { + "api_key": { + "type": "string" + }, + "agent_id": { + "type": "int64" + }, + "assistant_id": { + "type": "string" + }, + "base_url": { + "type": "string" + }, + "auto_flush": { + "type": "bool" + }, + "auto_flush_interval_ms": { + "type": "int64" + }, + "metric_ids": { + "type": "string" + }, + "collect_latency": { + "type": "bool" + }, + "collect_transcripts": { + "type": "bool" + }, + "collect_tool_calls": { + "type": "bool" + } + } + } + } +} diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/property.json b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/property.json new file mode 100644 index 0000000000..5ed01686f4 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/property.json @@ -0,0 +1,12 @@ +{ + "api_key": "${env:CEKURA_API_KEY|}", + "agent_id": 0, + "assistant_id": "", + "base_url": "https://api.cekura.ai", + "auto_flush": true, + "auto_flush_interval_ms": 5000, + "metric_ids": "", + "collect_latency": true, + "collect_transcripts": true, + "collect_tool_calls": true +} diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/requirements.txt b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/requirements.txt new file mode 100644 index 0000000000..3beb7cbaf9 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/requirements.txt @@ -0,0 +1 @@ +aiohttp>=3.9.0 diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/session.py b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/session.py new file mode 100644 index 0000000000..999d7ec119 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/session.py @@ -0,0 +1,169 @@ +from dataclasses import dataclass, field +from typing import Optional, Any +from datetime import datetime +import json + + +@dataclass +class TranscriptMessage: + role: str + content: str + start_time: float = 0.0 + end_time: float = 0.0 + + def to_dict(self) -> dict: + return { + "role": self.role, + "content": self.content, + "start_time": self.start_time, + "end_time": self.end_time, + } + + +@dataclass +class ToolCallRecord: + name: str + arguments: str + result: str + success: bool + latency_ms: float + timestamp: float = field(default_factory=lambda: datetime.now().timestamp()) + + def to_dict(self) -> dict: + ts = self.timestamp + return { + "role": "function_call", + "content": f"Tool: {self.name}", + "start_time": ts, + "end_time": ts, + "data": { + "name": self.name, + "arguments": self.arguments, + "result": self.result, + "success": self.success, + "latency_ms": self.latency_ms, + "timestamp": ts, + }, + } + + +@dataclass +class LatencyMetric: + component: str + latency_ms: float + timestamp: float = field(default_factory=lambda: datetime.now().timestamp()) + metadata: dict = field(default_factory=dict) + + +@dataclass +class Session: + session_id: str + channel_name: str = "" + customer_number: str = "" + metadata: dict = field(default_factory=dict) + started_at: datetime = field(default_factory=datetime.now) + ended_at: Optional[datetime] = None + ended_reason: str = "" + + transcripts: list[TranscriptMessage] = field(default_factory=list) + tool_calls: list[ToolCallRecord] = field(default_factory=list) + latency_metrics: list[LatencyMetric] = field(default_factory=list) + + llm_latencies: list[float] = field(default_factory=list) + tts_latencies: list[float] = field(default_factory=list) + asr_latencies: list[float] = field(default_factory=list) + + def add_transcript(self, role: str, content: str, start_time: float = 0.0, end_time: float = 0.0) -> None: + self.transcripts.append(TranscriptMessage( + role=role, + content=content, + start_time=start_time, + end_time=end_time, + )) + + def add_tool_call(self, name: str, arguments: str, result: str, success: bool, latency_ms: float) -> None: + self.tool_calls.append(ToolCallRecord( + name=name, + arguments=arguments, + result=result, + success=success, + latency_ms=latency_ms, + )) + + def add_latency_metric(self, component: str, latency_ms: float, **metadata) -> None: + self.latency_metrics.append(LatencyMetric( + component=component, + latency_ms=latency_ms, + metadata=metadata, + )) + if component == "llm": + self.llm_latencies.append(latency_ms) + elif component == "tts": + self.tts_latencies.append(latency_ms) + elif component == "asr": + self.asr_latencies.append(latency_ms) + + def end(self, reason: str = "") -> None: + self.ended_at = datetime.now() + self.ended_reason = reason + + def has_observe_payload(self) -> bool: + """True if there is data to POST (transcripts, tool calls, or latency samples).""" + return bool( + self.transcripts or self.tool_calls or self.latency_metrics + ) + + def build_transcript_json(self) -> list[dict]: + messages = [] + for t in self.transcripts: + messages.append(t.to_dict()) + for tc in self.tool_calls: + messages.append(tc.to_dict()) + messages.sort(key=lambda x: float(x.get("start_time", 0) or 0)) + return messages + + def build_metadata(self) -> dict: + meta = dict(self.metadata) + meta["channel_name"] = self.channel_name + + if self.llm_latencies: + meta["llm_avg_latency_ms"] = sum(self.llm_latencies) / len(self.llm_latencies) + meta["llm_max_latency_ms"] = max(self.llm_latencies) + meta["llm_min_latency_ms"] = min(self.llm_latencies) + + if self.tts_latencies: + meta["tts_avg_latency_ms"] = sum(self.tts_latencies) / len(self.tts_latencies) + meta["tts_max_latency_ms"] = max(self.tts_latencies) + meta["tts_min_latency_ms"] = min(self.tts_latencies) + + if self.asr_latencies: + meta["asr_avg_latency_ms"] = sum(self.asr_latencies) / len(self.asr_latencies) + meta["asr_max_latency_ms"] = max(self.asr_latencies) + meta["asr_min_latency_ms"] = min(self.asr_latencies) + + meta["total_tool_calls"] = len(self.tool_calls) + meta["failed_tool_calls"] = sum(1 for tc in self.tool_calls if not tc.success) + + return meta + + def to_observe_payload(self, agent_id: int = 0, assistant_id: str = "", metric_ids: str = "") -> dict: + payload = { + "call_id": self.session_id, + "transcript_type": "cekura", + "transcript_json": self.build_transcript_json(), + "timestamp": self.started_at.isoformat() + "Z", + "metadata": self.build_metadata(), + } + + if agent_id: + payload["agent"] = agent_id + if assistant_id: + payload["assistant_id"] = assistant_id + if self.customer_number: + payload["customer_number"] = self.customer_number + if self.ended_reason: + payload["call_ended_reason"] = self.ended_reason + if metric_ids: + payload["metric_ids"] = metric_ids + + return payload diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/__init__.py b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/__init__.py new file mode 100644 index 0000000000..2dfea55034 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/__init__.py @@ -0,0 +1 @@ +# Cekura Metrics Extension Tests diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/test_config.py b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/test_config.py new file mode 100644 index 0000000000..e184da8ff8 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/test_config.py @@ -0,0 +1,81 @@ +import pytest + +from ..config import CekuraMetricsConfig + + +class TestCekuraMetricsConfig: + def test_from_json(self): + json_str = ''' + { + "api_key": "test-key", + "agent_id": 123, + "base_url": "https://api.cekura.ai", + "auto_flush": true, + "collect_latency": true + } + ''' + + config = CekuraMetricsConfig.from_json(json_str) + + assert config.api_key == "test-key" + assert config.agent_id == 123 + assert config.base_url == "https://api.cekura.ai" + assert config.auto_flush is True + assert config.collect_latency is True + + def test_validate_requires_api_key(self): + config = CekuraMetricsConfig( + api_key="", + agent_id=123, + ) + + with pytest.raises(ValueError, match="api_key is required"): + config.validate() + + def test_validate_requires_agent_or_assistant(self): + config = CekuraMetricsConfig( + api_key="test-key", + agent_id=0, + assistant_id="", + ) + + with pytest.raises(ValueError, match="agent_id or assistant_id"): + config.validate() + + def test_validate_success_with_agent_id(self): + config = CekuraMetricsConfig( + api_key="test-key", + agent_id=123, + ) + + config.validate() + + def test_validate_success_with_assistant_id(self): + config = CekuraMetricsConfig( + api_key="test-key", + assistant_id="asst_abc123", + ) + + config.validate() + + def test_observe_endpoint(self): + config = CekuraMetricsConfig( + api_key="test-key", + agent_id=123, + base_url="https://api.cekura.ai", + ) + + assert config.observe_endpoint == "https://api.cekura.ai/observability/v1/observe/" + + config.base_url = "https://api.cekura.ai/" + assert config.observe_endpoint == "https://api.cekura.ai/observability/v1/observe/" + + def test_default_values(self): + config = CekuraMetricsConfig() + + assert config.base_url == "https://api.cekura.ai" + assert config.auto_flush is True + assert config.auto_flush_interval_ms == 5000 + assert config.collect_latency is True + assert config.collect_transcripts is True + assert config.collect_tool_calls is True diff --git a/ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/test_session.py b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/test_session.py new file mode 100644 index 0000000000..7b5db1e865 --- /dev/null +++ b/ai_agents/agents/ten_packages/extension/cekura_metrics_python/tests/test_session.py @@ -0,0 +1,163 @@ +import pytest +from datetime import datetime + +from ..session import Session, TranscriptMessage, ToolCallRecord + + +class TestSession: + def test_create_session(self): + session = Session( + session_id="test-123", + channel_name="room-abc", + customer_number="+1234567890", + ) + + assert session.session_id == "test-123" + assert session.channel_name == "room-abc" + assert session.customer_number == "+1234567890" + assert session.ended_at is None + assert len(session.transcripts) == 0 + assert len(session.tool_calls) == 0 + + def test_add_transcript(self): + session = Session(session_id="test-123") + + session.add_transcript( + role="Main Agent", + content="Hello, how can I help?", + start_time=1.0, + end_time=2.5, + ) + + assert len(session.transcripts) == 1 + assert session.transcripts[0].role == "Main Agent" + assert session.transcripts[0].content == "Hello, how can I help?" + + def test_add_tool_call(self): + session = Session(session_id="test-123") + + session.add_tool_call( + name="get_weather", + arguments='{"city": "NYC"}', + result='{"temp": 72}', + success=True, + latency_ms=250.0, + ) + + assert len(session.tool_calls) == 1 + assert session.tool_calls[0].name == "get_weather" + assert session.tool_calls[0].success is True + + def test_add_latency_metrics(self): + session = Session(session_id="test-123") + + session.add_latency_metric("llm", 250.0, model="gpt-4o") + session.add_latency_metric("llm", 300.0, model="gpt-4o") + session.add_latency_metric("tts", 150.0, vendor="elevenlabs") + session.add_latency_metric("asr", 200.0, vendor="deepgram") + + assert len(session.llm_latencies) == 2 + assert len(session.tts_latencies) == 1 + assert len(session.asr_latencies) == 1 + + metadata = session.build_metadata() + assert metadata["llm_avg_latency_ms"] == 275.0 + assert metadata["llm_max_latency_ms"] == 300.0 + assert metadata["llm_min_latency_ms"] == 250.0 + + def test_end_session(self): + session = Session(session_id="test-123") + assert session.ended_at is None + + session.end("customer-hangup") + + assert session.ended_at is not None + assert session.ended_reason == "customer-hangup" + + def test_has_observe_payload(self): + empty = Session(session_id="empty") + assert empty.has_observe_payload() is False + + t = Session(session_id="t") + t.add_transcript("Main Agent", "Hi") + assert t.has_observe_payload() is True + + tc = Session(session_id="tc") + tc.add_tool_call("x", "", "", True, 0.0) + assert tc.has_observe_payload() is True + + lat = Session(session_id="lat") + lat.add_latency_metric("llm", 100.0) + assert lat.has_observe_payload() is True + + def test_build_transcript_json(self): + session = Session(session_id="test-123") + + session.add_transcript("Main Agent", "Hello!", start_time=1.0, end_time=2.0) + session.add_transcript("Testing Agent", "Hi there", start_time=2.5, end_time=3.5) + + transcript_json = session.build_transcript_json() + + assert len(transcript_json) == 2 + assert transcript_json[0]["role"] == "Main Agent" + assert transcript_json[1]["role"] == "Testing Agent" + + def test_to_observe_payload(self): + session = Session( + session_id="test-123", + channel_name="room-abc", + customer_number="+1234567890", + ) + + session.add_transcript("Main Agent", "Hello!", start_time=1.0, end_time=2.0) + session.end("completed") + + payload = session.to_observe_payload( + agent_id=123, + metric_ids="1,2,3", + ) + + assert payload["call_id"] == "test-123" + assert payload["agent"] == 123 + assert payload["customer_number"] == "+1234567890" + assert payload["call_ended_reason"] == "completed" + assert payload["metric_ids"] == "1,2,3" + assert payload["transcript_type"] == "cekura" + assert len(payload["transcript_json"]) == 1 + + +class TestTranscriptMessage: + def test_to_dict(self): + msg = TranscriptMessage( + role="Main Agent", + content="Hello", + start_time=1.0, + end_time=2.0, + ) + + d = msg.to_dict() + + assert d["role"] == "Main Agent" + assert d["content"] == "Hello" + assert d["start_time"] == 1.0 + assert d["end_time"] == 2.0 + + +class TestToolCallRecord: + def test_to_dict(self): + tc = ToolCallRecord( + name="get_weather", + arguments='{"city": "NYC"}', + result='{"temp": 72}', + success=True, + latency_ms=250.0, + ) + + d = tc.to_dict() + + assert d["role"] == "function_call" + assert "get_weather" in d["content"] + assert "start_time" in d and d["start_time"] == tc.timestamp + assert d["data"]["name"] == "get_weather" + assert d["data"]["success"] is True + assert d["data"]["timestamp"] == tc.timestamp