feat(agent): task dispatcher — autonomous runner, board poller, board write-back#2965
feat(agent): task dispatcher — autonomous runner, board poller, board write-back#2965sanil-23 wants to merge 2 commits into
Conversation
…metadata Task sources previously created "dumb" cards that set only `notes`, leaving tinyhumansai#2891's enriched brief fields empty even though `enrich.rs` already computes a summary, urgency, and an actionable prompt. - Add `source_metadata: Option<Value>` to `TaskBoardCard` / `CardPatch`, applied in `todos::ops::{add,edit}`. - Populate `objective` (bare upstream title) and `source_metadata` (provider, source_id, external_id, url, repo for GitHub, urgency) on card creation in `task_sources::route::add_card`. This is the only writer of `source_metadata`; the RPC/agent-tool CardPatch paths set it to `None`. - Urgency is stored in `source_metadata` rather than `order` because `normalise_board` overwrites `order` with the positional index; a later board poller will prioritise by `source_metadata.urgency`. - TS `TaskBoardCard` gains an optional `sourceMetadata` field for parity. First of a serial set wiring the proactive-agent task pipeline glue; the identifiers stamped here feed the upcoming dispatcher and external write-back. Co-Authored-By: Claude <noreply@anthropic.com>
… write-back
Wires the proactive task pipeline so enriched cards actually run and the
board reflects the outcome. One executor, two feeders, deduped by a claim.
- New `agent/task_dispatcher.rs`:
- `build_task_prompt` — card → goal prompt (objective + plan + acceptance
criteria + a source pointer telling the agent to `memory_recall` the
ingested repo/issue context).
- `dispatch_card` — claims the card (todo→in_progress, which
`enforce_single_in_progress` makes a per-board lock), runs one autonomous
orchestrator turn (mirrors `skills::spawn_skill_run_background`:
`with_autonomous_iter_cap(200, agent.run_single(..))`), then writes back
`done` + evidence / `blocked` + reason. Detached; returns a run id.
- Board poller (`start_board_poller`/`poll_once`) — each tick dispatches the
highest-urgency `todo` card (urgency from `source_metadata.urgency`), gated
by `scheduler_gate` capacity. Catch-all for cards without a proactive
trigger.
- Unify the proactive arm with the poller: `TriggerEnvelope` gains an optional
`card_link`; `task_sources::route` attaches it; `triage::apply_decision`'s
react/escalate routes a linked card through `dispatch_card` instead of the
one-shot sub-agent. The claim deduplicates against the poller, so both
feeders are safe.
- Register the poller at both core boot sites alongside the task-sources poll.
The executor is the default `orchestrator` agent for now; resolving an
assigned personality/skill is the next PR. Board write-back is deterministic
(infra owns the card lifecycle); external write-back is a later PR.
Co-Authored-By: Claude <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR introduces an autonomous task board dispatcher that reads highest-urgency ChangesAutonomous Task Dispatcher with Board Integration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
ESLint skipped: no ESLint configuration detected in root package.json. To enable, add Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/agent/task_dispatcher.rs`:
- Around line 121-137: The code claims a card as InProgress via
ops::update_status then detaches run_autonomous in tokio::spawn without any
durability or lease/heartbeat, so crashes leave the card permanently InProgress;
fix by making the claim durable with a run identifier and lease metadata (e.g.,
update_status should set status=InProgress with run_id and a
lease_expiry/heartbeat timestamp), then spawn a supervisor task that (1) runs
run_autonomous(&prompt, &run_id), (2) periodically renews the lease/heartbeat
for that run while running, and (3) retries or guarantees
write_back(&location_for_run, &card_id, &run_id, outcome) and clears the lease
on completion or failure; also update poll_once to detect and reclaim stale
InProgress records whose lease_expiry has passed so orphaned runs can be
retried.
In `@src/openhuman/agent/triage/escalation.rs`:
- Around line 94-115: The current handling of dispatch_linked_card(link) treats
all errors as benign skips; change it to distinguish claim-related benign errors
from real failures: update the match on dispatch_linked_card(link).await to
pattern-match the error variants (or call a helper like is_benign_claim_error)
and, for benign claim failures keep the tracing::info and return Ok(()), but for
storage/runtime/dispatcher failures emit the TriggerEscalationFailed event
(e.g., events::publish_trigger_escalation_failed or similar), log at error level
with reason and card_id, and return Err(reason) so the failure is propagated;
ensure envelope.card_link branch still publishes events::publish_escalated only
on success and that failing paths use the new error-handling logic.
In `@src/openhuman/task_sources/route.rs`:
- Around line 46-48: The match arm handling SourceTarget::AgentTodoProactive
(inside route_enriched) currently awaits dispatch_triage(...) and propagates
errors after add_card() has already persisted the card; change it to call
dispatch_triage(...).await but treat failures as best-effort: capture the Result
(e.g. if let Err(e) = dispatch_triage(...).await { log the error with context
including card_id and source } ) and always return Ok(card_id). Apply the same
pattern to the other dispatch_triage calls referenced in the file (the blocks
mentioned around the 188-253 range) so triage failures are logged but not
returned as Err from route_enriched.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ec55bc21-98ab-4951-83bc-2433d31a7c19
📒 Files selected for processing (14)
app/src/types/turnState.tssrc/core/jsonrpc.rssrc/openhuman/agent/mod.rssrc/openhuman/agent/task_board.rssrc/openhuman/agent/task_dispatcher.rssrc/openhuman/agent/triage/envelope.rssrc/openhuman/agent/triage/escalation.rssrc/openhuman/channels/runtime/startup.rssrc/openhuman/notifications/rpc.rssrc/openhuman/task_sources/route.rssrc/openhuman/threads/turn_state/mirror_tests.rssrc/openhuman/todos/ops.rssrc/openhuman/todos/schemas.rssrc/openhuman/tools/impl/agent/todo.rs
| ops::update_status(&location, &card_id, TaskCardStatus::InProgress) | ||
| .map_err(|e| format!("[task_dispatcher] claim failed for card {card_id}: {e}"))?; | ||
|
|
||
| let run_id = uuid::Uuid::new_v4().to_string(); | ||
| tracing::info!( | ||
| card_id = %card_id, | ||
| run_id = %run_id, | ||
| prompt_chars = prompt.chars().count(), | ||
| "[task_dispatcher] card claimed (todo→in_progress), spawning autonomous run" | ||
| ); | ||
|
|
||
| let run_id_for_return = run_id.clone(); | ||
| let location_for_run = location.clone(); | ||
| tokio::spawn(async move { | ||
| let outcome = run_autonomous(&prompt, &run_id).await; | ||
| write_back(&location_for_run, &card_id, &run_id, outcome); | ||
| }); |
There was a problem hiding this comment.
in_progress claims need a recovery path.
Line 121 persists the lock before the run is durable, and Lines 134-137 detach the executor without any lease/heartbeat. A shutdown, panic, or final write-back failure leaves the card stuck InProgress; poll_once then idles forever because it refuses to dispatch while any card is running.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/openhuman/agent/task_dispatcher.rs` around lines 121 - 137, The code
claims a card as InProgress via ops::update_status then detaches run_autonomous
in tokio::spawn without any durability or lease/heartbeat, so crashes leave the
card permanently InProgress; fix by making the claim durable with a run
identifier and lease metadata (e.g., update_status should set status=InProgress
with run_id and a lease_expiry/heartbeat timestamp), then spawn a supervisor
task that (1) runs run_autonomous(&prompt, &run_id), (2) periodically renews the
lease/heartbeat for that run while running, and (3) retries or guarantees
write_back(&location_for_run, &card_id, &run_id, outcome) and clears the lease
on completion or failure; also update poll_once to detect and reclaim stale
InProgress records whose lease_expiry has passed so orphaned runs can be
retried.
| if let Some(link) = &envelope.card_link { | ||
| match dispatch_linked_card(link).await { | ||
| Ok(run_id) => { | ||
| tracing::info!( | ||
| card_id = %link.card_id, | ||
| run_id = %run_id, | ||
| "[triage::escalation] task-card dispatched to deterministic runner" | ||
| ); | ||
| events::publish_escalated(envelope, "task_dispatcher"); | ||
| } | ||
| Err(reason) => { | ||
| // A failed claim (another card already in progress, or | ||
| // the card vanished) is benign — the poller retries. | ||
| tracing::info!( | ||
| card_id = %link.card_id, | ||
| reason = %reason, | ||
| "[triage::escalation] task-card dispatch skipped (claim failed?)" | ||
| ); | ||
| } | ||
| } | ||
| return Ok(()); | ||
| } |
There was a problem hiding this comment.
Differentiate benign card-claim skips from real dispatcher failures.
This branch treats every dispatch_linked_card() error as a harmless skip and returns Ok(()). But dispatch_linked_card() also covers board-load and downstream dispatch work, so storage/runtime failures here will be downgraded to an info log with no TriggerEscalationFailed event and no error propagation. That makes linked-card triage look successful even when dispatch is actually broken.
Also applies to: 304-312
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/openhuman/agent/triage/escalation.rs` around lines 94 - 115, The current
handling of dispatch_linked_card(link) treats all errors as benign skips; change
it to distinguish claim-related benign errors from real failures: update the
match on dispatch_linked_card(link).await to pattern-match the error variants
(or call a helper like is_benign_claim_error) and, for benign claim failures
keep the tracing::info and return Ok(()), but for storage/runtime/dispatcher
failures emit the TriggerEscalationFailed event (e.g.,
events::publish_trigger_escalation_failed or similar), log at error level with
reason and card_id, and return Err(reason) so the failure is propagated; ensure
envelope.card_link branch still publishes events::publish_escalated only on
success and that failing paths use the new error-handling logic.
| SourceTarget::AgentTodoProactive => { | ||
| dispatch_triage(source, enriched).await?; | ||
| dispatch_triage(config, source, enriched, &card_id).await?; | ||
| Ok(card_id) |
There was a problem hiding this comment.
Don't fail the route after the card has already been persisted.
Once add_card() succeeds, this function has already mutated the board. Propagating a transient dispatch_triage() failure from here means callers see Err even though the card exists, which is a bad contract for retry/dedup flows and can create duplicate cards on retry. The proactive triage should be best-effort here and logged, while route_enriched still returns the new card_id.
Suggested change
SourceTarget::AgentTodoProactive => {
- dispatch_triage(config, source, enriched, &card_id).await?;
+ if let Err(err) = dispatch_triage(config, source, enriched, &card_id).await {
+ tracing::warn!(
+ source_id = %source.id,
+ card_id = %card_id,
+ external_id = %enriched.task.external_id,
+ error = %err,
+ "[task_sources:route] proactive triage failed after card creation; leaving card on board"
+ );
+ }
Ok(card_id)
}Also applies to: 188-253
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/openhuman/task_sources/route.rs` around lines 46 - 48, The match arm
handling SourceTarget::AgentTodoProactive (inside route_enriched) currently
awaits dispatch_triage(...) and propagates errors after add_card() has already
persisted the card; change it to call dispatch_triage(...).await but treat
failures as best-effort: capture the Result (e.g. if let Err(e) =
dispatch_triage(...).await { log the error with context including card_id and
source } ) and always return Ok(card_id). Apply the same pattern to the other
dispatch_triage calls referenced in the file (the blocks mentioned around the
188-253 range) so triage failures are logged but not returned as Err from
route_enriched.
|
Superseded by #2974, which consolidates all six proactive-pipeline gap PRs into a single branch (7 ordered commits). Closing in favour of that. |
Summary
agent/task_dispatcher.rs: the deterministic executor that turns an enriched task card into an autonomous agent run and writes the outcome back to the board.todocard each tick (gated byscheduler_gatecapacity) — the catch-all for cards with no proactive trigger.todo→in_progress), so nothing runs twice.done+ evidence on success,blocked+ reason on failure (G9a).Problem
After #2894/#2891, enriched cards landed on the board but nothing ran them deterministically and nothing wrote status back. The proactive arm executed a one-shot triage sub-agent on ingest but never touched the card; cards that arrived without a proactive trigger (
TodoOnly, manual) were never picked up. There was no Dispatcher and no board poller.Solution
dispatch_card): claim the card (todo→in_progress—enforce_single_in_progressmakes this a per-board mutual-exclusion lock), run one autonomousorchestratorturn (with_autonomous_iter_cap(200, agent.run_single(..)), mirroringskills::spawn_skill_run_background), thenwrite_back→done+evidence /blocked+reason. Detached; returns a run id.build_task_prompt): objective + plan + acceptance criteria + a source pointer instructing the agent tomemory_recallthe ingested repo/issue context (the deeper memory scoping is a later PR).start_board_poller/poll_once): picks the highest-urgencytodocard (source_metadata.urgency), gated by background-AI capacity; registered at both core boot sites.TriggerEnvelopegains an optionalcard_link;task_sources::routeattaches it;triage::apply_decisionroutes a linked card throughdispatch_card(not the one-shot sub-agent). Non-card triggers (composio/webhook/cron/notification) are unchanged. The claim deduplicates the two feeders.The executor is the default
orchestratoragent; resolving an assigned personality/skill is the next PR. External write-back (close the GitHub issue, etc.) is a later PR; this PR owns only the board.Submission Checklist
task_dispatcherunit tests: prompt shaping (objective/title fallback, plan+criteria, source+memory pointer, omission), poller selection (highest-urgency, status filtering, tie-break toward lower order, empty), truncation. Envelope card-link + triage reroute compile-covered.build_task_prompt,pick_next_todo,truncate_chars,card_urgency) is unit-tested;cargo testgreen. The detached run / write-back glue is integration-shaped (real agent) and exercised via the runner contract.Impact
AgentTodoProactivetask-source triggers: they now run via the autonomous dispatcher with board write-back instead of the one-shot triage sub-agent. Non-task-source triggers (composio/webhook/cron/notification) are untouched.card_linkis in-memory on the envelope; board fields come from feat(task-sources): enrich ingested cards with brief fields + source_metadata #2960).Related
AI Authored PR Metadata
Linear Issue
Commit & Branch
Summary by CodeRabbit
Release Notes