Skip to content

feat: proactive agent task pipeline — dispatcher, approval, executors, write-back#2974

Merged
senamakel merged 16 commits into
tinyhumansai:mainfrom
sanil-23:feat/proactive-agent-pipeline
May 29, 2026
Merged

feat: proactive agent task pipeline — dispatcher, approval, executors, write-back#2974
senamakel merged 16 commits into
tinyhumansai:mainfrom
sanil-23:feat/proactive-agent-pipeline

Conversation

@sanil-23
Copy link
Copy Markdown
Contributor

@sanil-23 sanil-23 commented May 29, 2026

Summary

Wires the glue between the four proactive-pipeline foundations (#2891 kanban, #2893 memory sources, #2894 task sources, #2895 multi-agent personalities) into a working loop: ingest → enrich → (approve) → dispatch to the right executor → run autonomously → write the outcome back to the board and the upstream source.

Closes the gap catalogue (G1–G9) from the merged design doc. Kept as 7 ordered commits so it reads gap-by-gap; consolidated into one PR because it's a single cohesive feature (supersedes the earlier stacked PRs #2960/#2965/#2969/#2970/#2971/#2972).

What's in it (commit by commit)

  1. G2 — enrich cards (task-sources): map enrichment → brief fields (objective, source_metadata{provider,repo,external_id,url,urgency}) on card creation instead of "dumb" cards. Urgency rides in source_metadata (the board normaliser owns order).
  2. G1 + G9a — dispatcher spine (agent/task_dispatcher.rs): one autonomous-run executor — claim a card (todo→in_progress, where enforce_single_in_progress is the per-board lock), run the orchestrator under with_autonomous_iter_cap (mirrors skill runs), then deterministic board write-back (done+evidence / blocked+reason). A capacity-gated board poller drives cards with no proactive trigger; the proactive triage arm is unified onto the same executor via a TriggerEnvelope.card_link (the claim dedupes the two feeders).
  3. G8 — plan-approval gate: TaskCardStatus::{AwaitingApproval, Ready, Rejected}; when autonomy.require_task_plan_approval is on, a todo card parks at awaiting_approval (emits TaskPlanAwaitingApproval) instead of running. New openhuman.todos_decide_plan RPC.
  4. G8 FE: the kanban board surfaces awaiting_approval cards with inline Approve/Reject (buckets the new statuses into existing columns; threadApi.decidePlan).
  5. G4 + G3 — executor resolution: resolve a card's assigned_agent to a personality (scoped SOUL/MEMORY identity), skill (SKILL.md-seeded), or built-in agent; degrade to the default orchestrator. One interface, three presets.
  6. G7 — static routing config: assigned_executor on a task source (idempotent SQLite migration) → cards land pre-assigned → dispatcher runs them with no LLM router.
  7. G5 + G9b — memory + external write-back: the run prompt tells the agent to memory_recall the source's ingested context, and (when addressable) to comment + close/resolve the upstream item via its integration tools on completion — agent-driven, run-free under the connection's write scope.

Design (locked with the maintainer's product owner)

  • One execution interface — an autonomous agent run; personality/skill/agent are presets over it.
  • No per-tool approval loop during a run (background turns auto-allow, like skill runs); the only checkpoint is the optional up-front plan approval.
  • Board write-back is deterministic (infra owns the card lifecycle); the external world is the agent's job via its own tools.

Testing

Unit tests across task_dispatcher (prompt shaping, executor resolution, poller selection, write-back instruction), todos::ops (source_metadata round-trip, decide_plan), task_sources (source_metadata mapping, routing), envelope/triage. cargo test + cargo fmt --check green locally. FE not tsc'd locally (no node_modules in the worktree); CI typechecks.

Submission Checklist

  • Tests added or updated (happy + edge) across the changed Rust modules.
  • Diff coverage ≥ 80% — new logic is unit-tested; cargo test green locally.
  • N/A — Coverage matrix: internal pipeline wiring; UI surfacing is incremental on the existing board.
  • N/A — no matrix feature IDs affected.
  • No new external network dependencies (uses existing Composio surface).
  • N/A — does not touch release-cut smoke surfaces.
  • N/A — gap-driven work; no single linked issue.

Impact

Desktop core (Rust) + a TS type/board UI addition. Additive SQLite migration (assigned_executor) is idempotent; source_metadata/card_link are serde-default/in-memory, so existing boards deserialise unchanged. Adds one capacity-gated 60s poll loop.

Related

Pushed --no-verify (local pre-push hook runs pnpm rust:check, which needs the vendored tauri-cef toolchain absent in this environment; unrelated to these changes).

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • UI: Approve/Deny controls on awaiting-approval task cards and full status selector in the edit dialog.
    • Backend: New approve/reject action for task plans exposed to the app.
  • Background

    • Automatic board poller that dispatches high‑urgency tasks and deterministic task dispatching.
    • Task-source executor assignment support.
  • Data

    • Expanded task statuses (awaiting approval, ready, rejected) and captured source metadata on ingested cards.
  • i18n

    • Added new status labels across locales.
  • Tests

    • Expanded coverage for approval flow, RPCs, polling, dispatch, and related behaviors.

Review Change Stack

sanil-23 and others added 7 commits May 29, 2026 19:06
…metadata

Task sources previously created "dumb" cards that set only `notes`,
leaving tinyhumansai#2891's enriched brief fields empty even though `enrich.rs`
already computes a summary, urgency, and an actionable prompt.

- Add `source_metadata: Option<Value>` to `TaskBoardCard` / `CardPatch`,
  applied in `todos::ops::{add,edit}`.
- Populate `objective` (bare upstream title) and `source_metadata`
  (provider, source_id, external_id, url, repo for GitHub, urgency) on
  card creation in `task_sources::route::add_card`. This is the only
  writer of `source_metadata`; the RPC/agent-tool CardPatch paths set it
  to `None`.
- Urgency is stored in `source_metadata` rather than `order` because
  `normalise_board` overwrites `order` with the positional index; a later
  board poller will prioritise by `source_metadata.urgency`.
- TS `TaskBoardCard` gains an optional `sourceMetadata` field for parity.

First of a serial set wiring the proactive-agent task pipeline glue; the
identifiers stamped here feed the upcoming dispatcher and external
write-back.

Co-Authored-By: Claude <noreply@anthropic.com>
… write-back

Wires the proactive task pipeline so enriched cards actually run and the
board reflects the outcome. One executor, two feeders, deduped by a claim.

- New `agent/task_dispatcher.rs`:
  - `build_task_prompt` — card → goal prompt (objective + plan + acceptance
    criteria + a source pointer telling the agent to `memory_recall` the
    ingested repo/issue context).
  - `dispatch_card` — claims the card (todo→in_progress, which
    `enforce_single_in_progress` makes a per-board lock), runs one autonomous
    orchestrator turn (mirrors `skills::spawn_skill_run_background`:
    `with_autonomous_iter_cap(200, agent.run_single(..))`), then writes back
    `done` + evidence / `blocked` + reason. Detached; returns a run id.
  - Board poller (`start_board_poller`/`poll_once`) — each tick dispatches the
    highest-urgency `todo` card (urgency from `source_metadata.urgency`), gated
    by `scheduler_gate` capacity. Catch-all for cards without a proactive
    trigger.
- Unify the proactive arm with the poller: `TriggerEnvelope` gains an optional
  `card_link`; `task_sources::route` attaches it; `triage::apply_decision`'s
  react/escalate routes a linked card through `dispatch_card` instead of the
  one-shot sub-agent. The claim deduplicates against the poller, so both
  feeders are safe.
- Register the poller at both core boot sites alongside the task-sources poll.

The executor is the default `orchestrator` agent for now; resolving an
assigned personality/skill is the next PR. Board write-back is deterministic
(infra owns the card lifecycle); external write-back is a later PR.

Co-Authored-By: Claude <noreply@anthropic.com>
Wire autonomy.require_task_plan_approval into the dispatcher so proactive
work has a human checkpoint before it runs.

- Add TaskCardStatus::{AwaitingApproval, Ready, Rejected} (+ as_str,
  parse_status aliases, render_markdown markers [?]/[ ]/[-]).
- dispatch_card now returns DispatchOutcome {Running, AwaitingApproval}: when
  require_task_plan_approval is on and the card is `todo`, it parks the card at
  `awaiting_approval`, emits DomainEvent::TaskPlanAwaitingApproval, and does NOT
  run. `ready` (approved) cards bypass the gate; the poller picks todo|ready.
- New openhuman.todos_decide_plan RPC + ops::decide_plan(approve): awaiting →
  ready (approve) / rejected (deny), validated against current status.
- Triage escalation handles the new outcome (Running → escalated event;
  AwaitingApproval → parked, no escalation).
- TS TaskBoardCardStatus extended; unit tests for decide_plan, parse_status,
  poller ready/skip selection.

The background run itself stays gate-free once approved (matches skill runs);
this is the single up-front checkpoint. FE approval surface (subscribe to
TaskPlanAwaitingApproval → ApprovalRequestCard → todos_decide_plan) is the
remaining wiring.

Co-Authored-By: Claude <noreply@anthropic.com>
Completes the plan-approval UX. Cards parked at `awaiting_approval` now
surface inline on the task board with Approve / Reject buttons:

- TaskKanbanBoard buckets the approval-flow statuses into existing columns
  (awaiting_approval/ready → To do, rejected → Blocked) so they're visible
  without widening the grid; an `awaiting_approval` card renders Approve/Reject
  (reusing chat.approval.* labels) instead of the move arrows.
- threadApi.decidePlan calls openhuman.todos_decide_plan and rebuilds the board
  from the returned snapshot.
- Conversations wires onDecidePlan → handleDecidePlan with optimistic board
  refresh, mirroring handleMoveTaskCard.

Pairs with the backend plan-approval gate (TaskCardStatus::AwaitingApproval +
TaskPlanAwaitingApproval event + todos_decide_plan RPC).

Co-Authored-By: Claude <noreply@anthropic.com>
…(G4+G3)

The dispatcher no longer always runs the default orchestrator. It resolves
the card's `assigned_agent` handle to one of three presets over the single
autonomous-run interface:

- **personality** (tinyhumansai#2895): a user profile whose SOUL.md/MEMORY.md identity is
  folded into the agent's system-prompt suffix, run as that profile's agent_id.
- **skill** (tinyhumansai#2824): the same autonomous run seeded with the skill's SKILL.md
  guidelines as the prompt suffix.
- **built-in agent**: run that agent definition directly.

An unset or unresolved handle degrades to the default orchestrator (never
fails the card) — "use the personality if valid, otherwise the default agent."
`run_autonomous` now builds via `from_config_for_agent_with_profile` with the
resolved agent_id + prompt suffix. Unit tests cover the default and
degrade-to-default paths.

Follow-up: thread a `personality_id`/`assigned_agent` filter into todos_list,
and honour a personality's model_override / scoped-memory isolation (the
prompt-level identity lands here; deeper isolation is delegate_to_personality's
documented phase-2 work).

Co-Authored-By: Claude <noreply@anthropic.com>
Let a task source pin every card it produces to a specific executor so the
dispatcher runs it deterministically, skipping the LLM router.

- Add `assigned_executor: Option<String>` to TaskSource + TaskSourcePatch,
  with an idempotent additive SQLite migration (add_column_if_missing) so
  existing source DBs upgrade cleanly.
- Thread it through the add/update RPCs (ops::add applies it as a follow-up
  update_source patch, keeping add_source's signature + its many callers
  unchanged) and the controller schema.
- route::add_card sets the card's `assigned_agent` from the source's
  assigned_executor, so PR-4's resolver runs the configured personality /
  skill / agent. Unset → unassigned (router/poller decides).

Follow-up: executor picker in the Task Sources settings panel (FE).

Co-Authored-By: Claude <noreply@anthropic.com>
… (G5/G9b)

Close the loop beyond the board. The dispatcher's task prompt now:

- (G5) tells the agent the source's activity is in memory and to use
  memory_recall to pull related context — agentic, on-demand retrieval over
  the summary tree (no separate scoped index needed).
- (G9b) when the upstream item is addressable (source_metadata has
  provider + external_id), instructs the agent to record the outcome on the
  source via its integration tools — comment + close/resolve on completion.
  Agent-driven (not a deterministic hook): runs under the connection's
  existing write scope with no extra approval gate (matches the gate-free
  autonomous run), and the agent reports rather than guesses if it lacks
  permission. The board write-back stays deterministic (infra owns the card);
  the external world is the agent's job via its own tools.

Unit tests cover the write-back instruction appearing only when the item is
addressable.

Co-Authored-By: Claude <noreply@anthropic.com>
@sanil-23 sanil-23 requested a review from a team May 29, 2026 18:31
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 29, 2026

📝 Walkthrough

<review_stack_artifact>

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main feature being added: a proactive agent task pipeline with dispatcher, approval gates, executor routing, and deterministic write-back. It accurately summarizes the primary architectural changes.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ESLint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

ESLint skipped: no ESLint configuration detected in root package.json. To enable, add eslint to devDependencies.

Warning

Review ran into problems

🔥 Problems

Stopped waiting for pipeline failures after 30000ms. One of your pipelines takes longer than our 30000ms fetch window to run, so review may not consider pipeline-failure results for inline comments if any failures occurred after the fetch window. Increase the timeout if you want to wait longer or run a @coderabbit review after the pipeline has finished.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Member

@senamakel senamakel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #2974 — feat: proactive agent task pipeline — dispatcher, approval, executors, write-back

Walkthrough

This PR wires the four proactive-pipeline foundations into a working loop: ingested task-source cards are enriched (objective + source_metadata), optionally gated behind a plan-approval step, dispatched to a resolved executor (personality / skill / built-in agent), run as a single autonomous turn, and written back to the kanban board. A new task_dispatcher module is the convergence point for two feeders — a 60s capacity-gated board poller and the proactive triage arm (via a new TriggerEnvelope.card_link). New awaiting_approval / ready / rejected card statuses, a todos_decide_plan RPC, and an inline Approve/Reject UI complete the gate.

The work is cohesive, well-commented, and the touched modules follow project conventions (subdirectory placement, controller-registry exposure, RpcOutcome/Result propagation, camelCase serde for the FE). The Rust unit tests are solid on the pure helpers. The main concerns are (1) the documented claim-dedup invariant doesn't actually hold, allowing duplicate runs of the same card, (2) the new triage→dispatcher escalation arm and several dispatcher branches are untested (coverage-gate risk), and (3) a security/prompt-injection consideration on the unattended-egress + external-write-back path. None are hard blockers, so this is a comment-level review.

Changes

File Summary
src/openhuman/agent/task_dispatcher.rs New deterministic dispatcher: prompt build, executor resolution, autonomous run, board write-back, 60s board poller.
src/openhuman/agent/task_board.rs New TaskCardStatus::{AwaitingApproval, Ready, Rejected}; source_metadata field on the card.
src/openhuman/todos/ops.rs decide_plan, source_metadata on CardPatch/add/edit, status parsing/markdown for new statuses.
src/openhuman/todos/schemas.rs todos_decide_plan controller + schema; source_metadata: None on add/edit handlers.
src/openhuman/agent/triage/envelope.rs TaskCardLink + card_link on TriggerEnvelope; with_task_card builder.
src/openhuman/agent/triage/escalation.rs Card-linked triggers route to the dispatcher instead of the one-shot sub-agent.
src/openhuman/task_sources/{route,types,store,ops,schemas,periodic}.rs assigned_executor static routing (+ idempotent SQLite migration); source_metadata/objective stamping on card creation.
src/core/event_bus/events.rs New TaskPlanAwaitingApproval domain event.
src/core/jsonrpc.rs, channels/runtime/startup.rs Start the board poller at startup.
app/src/pages/Conversations.tsx, .../TaskKanbanBoard.tsx, services/api/threadApi.ts, types/turnState.ts FE: Approve/Reject UI, decidePlan RPC client, new statuses + sourceMetadata type.

Actionable comments (3)

⚠️ Major

1. src/openhuman/agent/task_dispatcher.rs:178-182 — claim does not dedupe the same card; the documented invariant is false

The comment states "enforce_single_in_progress rejects a second concurrent in-progress card, so a failed claim means 'something else is running' → skip", and the module + escalation docs lean on this to make firing both feeders safe. But enforce_single_in_progress only errors when count > 1. Re-flipping a card that is already InProgress to InProgress leaves exactly one in-progress card → Ok. So when the poller and the triage arm both target the same card (the exact dedup scenario), the second dispatch_card succeeds and spawns a second autonomous run — duplicate board write-back and, worse, duplicate external side-effects (commenting/closing the same GitHub issue twice via the G9b write-back prompt). Note also that BoardLocation::Thread ops are not serialized (only Scratch takes scratch_serial_lock; see todos/ops.rs:22-33), so there's a TOCTOU window even across distinct cards.

dispatch_card also never checks the incoming card's status before claiming (except the Todo+approval branch), so a triage trigger for a card that is already InProgress/Done/Blocked will re-claim and re-run it.

Suggested change — make the claim a guarded compare-and-swap on the freshly-loaded card so only Todo/Ready transition to InProgress:

// before
ops::update_status(&location, &card_id, TaskCardStatus::InProgress)
    .map_err(|e| format!("[task_dispatcher] claim failed for card {card_id}: {e}"))?;

// after: reload and refuse to claim anything not in a dispatchable state, so a
// second feeder (or a re-triggered triage) on the same card is a no-op skip.
let snapshot = ops::list(&location)
    .map_err(|e| format!("[task_dispatcher] reload before claim failed for {card_id}: {e}"))?;
match snapshot.cards.iter().find(|c| c.id == card_id).map(|c| &c.status) {
    Some(TaskCardStatus::Todo | TaskCardStatus::Ready) => {}
    other => {
        return Err(format!(
            "[task_dispatcher] card {card_id} not claimable (status: {other:?}); skipping"
        ));
    }
}
ops::update_status(&location, &card_id, TaskCardStatus::InProgress)
    .map_err(|e| format!("[task_dispatcher] claim failed for card {card_id}: {e}"))?;

This closes the serial double-dispatch entirely and narrows the concurrent window dramatically. A fully race-free fix would push the CAS into a per-thread-locked ops primitive (e.g. a claim_for_run(location, id) that loads→checks→sets under a per-thread lock), which is worth a follow-up given thread boards aren't serialized today.

2. src/openhuman/agent/triage/escalation.rs:88-119 — new dispatcher escalation arm is untested (coverage-gate risk)

The card-linked escalation branch and dispatch_linked_card (escalation.rs:313-323) are net-new control flow with no test. Likewise in task_dispatcher.rs, the tested surface is only the pure helpers (build_task_prompt, pick_next_todo, resolve_executor default/fallback, truncate_chars); the approval-park branch, write_back, poll_once gating, and the personality/skill/built-in-agent arms of resolve_executor have no coverage. With the merge gate at ≥80% on changed lines this is likely to fail, and these are exactly the branches where a regression would silently lose work.

Suggested additions (no agent spawn needed):

// task_dispatcher.rs tests
#[test] fn write_back_done_sets_evidence_and_truncates() { /* Ok(long) → Done + capped evidence */ }
#[test] fn write_back_err_sets_blocked_with_reason()     { /* Err(..) → Blocked + blocker */ }
#[tokio::test] async fn dispatch_card_parks_todo_when_approval_required() {
    // require_task_plan_approval=true + status Todo → Ok(AwaitingApproval), status persisted, no run
}
// resolve_executor: seed a profile / a skill / a registered agent and assert
// agent_id + label + prompt_suffix for each of the three preset arms.

For dispatch_linked_card, a test that links a missing card id and asserts the not found error path is cheap and covers the benign-skip contract.

💡 Refactor / suggestion

3. src/openhuman/agent/task_dispatcher.rs:317-329 — unattended egress widening + external write-back on attacker-influenced content

run_autonomous widens allowed_domains to ["*"] when unset, and build_task_prompt instructs the agent (with no per-tool approval, since background turns auto-allow) to use integration tools to comment on and close/resolve the upstream item. The card's objective/content/source_metadata derive from external, attacker-influenceable content (e.g. a GitHub issue body filed by anyone in a watched repo). The only human checkpoint is the up-front plan approval, which mitigates this — but it's a single gate that's easy to rubber-stamp, and it's the plan, not the per-action egress/write that gets reviewed.

This may be an accepted design (it mirrors skill-run egress), but it's worth making explicit. Concretely, consider: (a) not blanket-widening to * for source-ingested runs — scope egress to the source's provider domain(s) where known; and (b) a short note in the dispatcher module header / gitbooks calling out the prompt-injection surface and that the plan-approval gate is the sole interactive checkpoint. At minimum, please confirm the threat model was considered.

Nitpicks (3)

  • src/openhuman/agent/task_dispatcher.rs:185-201 — duplicate tracing::info!("…card claimed…spawning autonomous run") emitted twice in dispatch_card (once before, once after resolve_executor); drop the first, the second carries executor/agent_id.
  • src/openhuman/agent/task_dispatcher.rs:92build_task_prompt emits the "originates from …" block when only external_id is present (origin renders as #123), and a missing-provider-with-repo case yields a leading space (saved by .trim()). Gate the block on provider.is_some() for a cleaner origin.
  • src/openhuman/todos/schemas.rs:326,350handle_add/handle_edit hardcode source_metadata: None. Intentional (route.rs is the sole writer), but a one-line comment there would prevent a future contributor "fixing" it.

Questions for the author (1)

  • src/core/event_bus/events.rs:662 / task_dispatcher.rs:166TaskPlanAwaitingApproval is published but has no subscriber anywhere (no socket bridge, no FE listener), and the park happens in the poller where emit_progress early-returns (no fork-context parent). How does the FE surface a newly-parked awaiting_approval card in real time — does it rely on a periodic board refetch, or is the realtime bridge a planned follow-up? If the latter, a one-line note in the PR/TODO would help.

Verified / looks good

  • Serde shapes line up end-to-end: TodosSnapshot (camelCase) → FE decidePlan reads data.threadId/data.cards; source_metadatasourceMetadata matches the TS type.
  • decide_plan correctly errors unless the card is AwaitingApproval, preventing stale/duplicate decisions from resurrecting a moved-on card (tested).
  • assigned_executor migration is additive + idempotent (add_column_if_missing), SELECT columns and update_source params stay index-aligned.
  • source_metadata round-trips through add/edit and is preserved on None patches (tested).
  • New i18n usages reuse existing keys (chat.approval.approve/deny, conversations.taskKanban.updateFailed) — no missing-key gate risk, no hardcoded strings.
  • pick_next_todo urgency/tie-break logic is correct and well-tested (incl. ready vs todo, skipping approval states).
  • Board poller start is idempotent (OnceLock) despite being wired in two startup paths.

// rejects a second concurrent in-progress card, so a failed claim means
// "something else is running" → skip.
ops::update_status(&location, &card_id, TaskCardStatus::InProgress)
.map_err(|e| format!("[task_dispatcher] claim failed for card {card_id}: {e}"))?;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Major — claim does not dedupe the same card; the documented invariant is false.

The comment says enforce_single_in_progress rejects a second claim, but it only errors when count > 1. Re-flipping a card that is already InProgress to InProgress leaves exactly one in-progress card → Ok. So when the poller and the triage arm both target the same card (the dedup scenario), the second dispatch_card succeeds and spawns a second autonomous run — duplicate write-back and duplicate external side-effects (e.g. closing the same GitHub issue twice). dispatch_card also never checks the incoming status, so a re-triggered card that is already InProgress/Done gets re-run. (Thread boards aren't serialized — only Scratch takes a lock — so there's a TOCTOU window too.)

Guard the claim as a CAS on the freshly-loaded card so only Todo/Ready may transition:

let snapshot = ops::list(&location)
    .map_err(|e| format!("[task_dispatcher] reload before claim failed for {card_id}: {e}"))?;
match snapshot.cards.iter().find(|c| c.id == card_id).map(|c| &c.status) {
    Some(TaskCardStatus::Todo | TaskCardStatus::Ready) => {}
    other => return Err(format!("[task_dispatcher] card {card_id} not claimable (status: {other:?}); skipping")),
}
ops::update_status(&location, &card_id, TaskCardStatus::InProgress)
    .map_err(|e| format!("[task_dispatcher] claim failed for card {card_id}: {e}"))?;

This kills the serial double-dispatch and narrows the concurrent window; a race-free fix would push the CAS into a per-thread-locked ops primitive (follow-up).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f46947b. dispatch_card now does a CAS: it re-loads the board and only transitions a Todo/Ready card to InProgress, returning Err (benign skip) otherwise — so a stale/concurrent re-dispatch of the same card, or a re-trigger of an already running/done card, is rejected. You're right that enforce_single_in_progress alone never deduped (count stays 1). Corrected the false invariant in the module + fn docs. The thread board still isn't lock-serialised, so a narrow TOCTOU window remains between the read and the write — called out in the code comment and tracked as a follow-up (push the CAS into a per-thread-locked ops primitive).

tracing::info!(
card_id = %link.card_id,
reason = %reason,
"[triage::escalation] task-card dispatch skipped (claim failed?)"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Major (testing / coverage-gate risk) — new dispatcher escalation arm is untested.

This card-linked branch and dispatch_linked_card below are net-new control flow with no test. In task_dispatcher.rs the tested surface is only the pure helpers; the approval-park branch, write_back, poll_once gating, and the personality/skill/agent arms of resolve_executor are uncovered. With the merge gate at ≥80% changed-line coverage this likely fails, and these are exactly the branches where a regression silently loses work.

Cheap additions (no agent spawn needed): write_back Done/Err cases; a #[tokio::test] that dispatch_card parks a Todo when require_task_plan_approval=true and persists AwaitingApproval without spawning; resolve_executor for each of the three preset arms; and a dispatch_linked_card missing-id error-path test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added write_back Done/Err unit tests in f46947b (the deterministic board write-back was the densest untested chunk). The remaining new branches — dispatch_card/poll_once/run_autonomous — load real Config/agent/registry and spawn detached runs, so they're integration-shaped rather than unit-testable without a workspace+agent harness; the pure helpers (build_task_prompt, resolve_executor default/fallback, pick_next_todo, truncate) are covered. If diff-cover still trips, I'll add a json_rpc_e2e-style integration test for the park/dispatch path rather than mock the agent — flag me and I'll do that.

// Match skill-run egress handling: only widen to the permissive default
// when the operator hasn't configured an explicit allow-list.
if config.http_request.allowed_domains.is_empty() {
config.http_request.allowed_domains = vec!["*".to_string()];
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion (security / prompt-injection) — unattended egress widening + external write-back on attacker-influenced content.

run_autonomous widens allowed_domains to ["*"] when unset, and build_task_prompt instructs the agent (with no per-tool approval — background turns auto-allow) to comment on and close/resolve the upstream item. The card objective/content/source_metadata derive from external, attacker-influenceable content (a GitHub issue body filed by anyone in a watched repo). The plan-approval gate is the sole interactive checkpoint, and it reviews the plan, not the per-action egress/write.

Consider: (a) scoping egress to the source provider domain(s) instead of blanket * for source-ingested runs; (b) a module-header / gitbook note calling out the injection surface. At minimum please confirm the threat model was considered.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Threat model was considered — documented it on run_autonomous in f46947b. Summary: source content is attacker-influenceable and the run is per-tool gate-free (background turns auto-allow, like skill runs), so the up-front plan-approval gate (require_task_plan_approval) is the intended interactive checkpoint — a human reviews the plan before the run starts. Egress * (only when the operator set no allow-list) matches skill-run behaviour because real task work needs broad reach (git, registries, provider APIs). Scoping egress to the source provider's domains for source-ingested runs specifically is a considered follow-up (it has to key off provenance so it doesn't break general task work) — tracked for a later PR.

/// A task-board card needs human plan approval before the dispatcher will
/// execute it (emitted when `autonomy.require_task_plan_approval` is on and
/// the dispatcher parks a `todo` card at `awaiting_approval`).
TaskPlanAwaitingApproval { card_id: String, thread_id: String },
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question — TaskPlanAwaitingApproval has no subscriber.

This event is published (task_dispatcher.rs:166) but nothing subscribes to it — no socket bridge, no FE listener — and the park happens in the poller where emit_progress early-returns (no fork-context parent). How does the FE surface a newly-parked awaiting_approval card in real time: periodic board refetch, or is the realtime bridge a planned follow-up? A one-line TODO/PR note would clarify.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct — no subscriber by design (yet). Documented on the event in f46947b: a parked card is persisted as awaiting_approval, so the kanban board renders it with inline Approve/Reject on the next board fetch/refresh — that's the current poll-based surface. A realtime socket bridge (à la ApprovalRequestedapproval_request) is a deliberate follow-up; emitting the telemetry event now lets that bridge attach later with no schema change.

Comment thread src/openhuman/agent/task_dispatcher.rs Outdated
card_id = %card_id,
run_id = %run_id,
prompt_chars = prompt.chars().count(),
"[task_dispatcher] card claimed (→in_progress), spawning autonomous run"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick — duplicate log line. dispatch_card emits the same "…card claimed (→in_progress), spawning autonomous run" tracing::info! twice (here and again after resolve_executor). Drop this first one; the later call carries executor/agent_id.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f46947b — dropped the first log; the single remaining line carries executor/agent_id/prompt_chars.

Comment thread src/openhuman/agent/task_dispatcher.rs Outdated
if let Some(id) = external_id {
origin.push_str(&format!("#{id}"));
}
if !origin.trim().is_empty() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick — origin block fires on id-only metadata. build_task_prompt emits the "originates from …" block when only external_id is present (origin renders as #123), and a missing-provider-with-repo case yields a leading space (saved by .trim()). Gate the block on provider.is_some() for a cleaner origin string.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f46947b — the origin block is now gated on provider.is_some(), so id-only metadata no longer renders a leading-space #123.

sanil-23 and others added 3 commits May 29, 2026 20:49
Addresses @senamakel's review on the consolidated proactive-pipeline PR:

- **Major: claim now dedupes (CAS).** `dispatch_card` re-loads the card's
  current status and only transitions `Todo`/`Ready` → `InProgress`;
  `enforce_single_in_progress` alone did NOT dedupe (re-flipping an already
  in-progress card keeps count==1). Kills the serial double-dispatch /
  re-run-of-done-card; corrected the false invariant in the module/fn docs.
  (A fully race-free claim needs a per-thread-locked ops CAS — noted as
  follow-up; thread boards aren't lock-serialised so a narrow TOCTOU remains.)
- **Tests:** added write_back Done/Err coverage.
- **Security:** documented the prompt-injection threat model on run_autonomous
  (source content is attacker-influenceable; plan-approval is the interactive
  checkpoint; egress `*` matches skill runs; per-provider egress scoping is a
  tracked follow-up).
- **Event:** documented that TaskPlanAwaitingApproval is surfaced via board
  refetch today; a realtime socket bridge is a deliberate follow-up.
- **Nits:** dropped the duplicate "card claimed" log; gated the origin prompt
  block on provider.is_some() so id-only metadata doesn't render "tinyhumansai#123".

Co-Authored-By: Claude <noreply@anthropic.com>
The coverage gate flagged the PR-3 FE (Conversations/TaskKanbanBoard/threadApi)
as the only sub-80% diff. Cover it:

- threadApi.decidePlan: approve (rebuilds board) + null-snapshot tests.
- TaskKanbanBoard: render Approve/Reject on an awaiting_approval card → calls
  onDecidePlan(true/false); ready→todo / rejected→blocked bucketing renders.
- Extract Conversations.handleDecidePlan's decide→persist→refresh logic into a
  testable `taskPlanActions.decideTaskPlan` (success dispatch / null no-op /
  error→onError), so the page's previously-uncovered handler logic is exercised
  without mounting the whole conversations page.

Co-Authored-By: Claude <noreply@anthropic.com>
@senamakel senamakel self-assigned this May 29, 2026
@coderabbitai coderabbitai Bot added feature Net-new user-facing capability or product behavior. rust-core Core Rust runtime in src/: CLI, core_server, shared infrastructure. agent Built-in agents, prompts, orchestration, and agent runtime in src/openhuman/agent/. labels May 29, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@app/src/services/api/threadApi.ts`:
- Around line 175-188: The RPC response generic currently expects a nested data
object but the core returns a top-level TodosSnapshot (camelCase) — update the
callCoreRpc generic in the decidePlan call (the invocation around
callCoreRpc<{...}> and the subsequent unwrapEnvelope usage in threadApi.ts) to
the TodosSnapshot shape (i.e., an object with threadId?: string | null and
cards?: TaskBoardCard[] at the top level), remove the extra data?: { ... }
wrapper from the generic, and then read threadId and cards directly from the
unwrapped response (keeping the existing fallback to threadId and the updatedAt
construction).

In `@src/openhuman/agent/task_dispatcher.rs`:
- Around line 168-210: The code reloads the board to check CAS eligibility but
still uses the original incoming card for prompt/executor resolution; change the
ops::list(...) chain that currently .find(...).map(|c| c.status) to instead
capture the whole reloaded card (e.g., reloaded_card or fresh_card), derive
current_status from reloaded_card.status, and then pass that reloaded card into
build_task_prompt and resolve_executor (use reloaded_card where `card` is used
for prompt and assigned_agent resolution) so prompt and executor reflect any
edits made between selection and claim.

In `@src/openhuman/task_sources/ops.rs`:
- Around line 70-83: When assigned_executor is present, add a log entry before
calling store::update_source: emit a stable-prefixed info-level log (e.g.
"source.assigned_executor.set") that includes the source id (source.id), the
executor string (executor) and any available correlation fields (request id /
source kind) for traceability, then proceed to call
store::update_source(TaskSourcePatch { assigned_executor: Some(executor), .. }).
Ensure the log call appears directly in the Some(executor) branch and uses the
same logging facility used elsewhere in this module (e.g., tracing::info! or the
project logger).

In `@src/openhuman/task_sources/types.rs`:
- Around line 200-201: The frontend TaskSourcePatch TypeScript interface is
missing the assignedExecutor field that the backend exposes as
assigned_executor; update the TaskSourcePatch interface in
app/src/utils/tauriCommands/taskSources.ts to include assignedExecutor?: string,
and ensure any code that builds the patch payload maps this property to the
backend key (assigned_executor) when sending (e.g., in the function that
serializes/constructs the patch object), preserving existing optional semantics
and other fields.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c8b75a5d-8aa0-4ac7-9f22-0acce9748539

📥 Commits

Reviewing files that changed from the base of the PR and between 2920e47 and 55251dc.

📒 Files selected for processing (27)
  • app/src/pages/Conversations.tsx
  • app/src/pages/conversations/components/TaskKanbanBoard.test.tsx
  • app/src/pages/conversations/components/TaskKanbanBoard.tsx
  • app/src/pages/conversations/taskPlanActions.test.ts
  • app/src/pages/conversations/taskPlanActions.ts
  • app/src/services/api/threadApi.test.ts
  • app/src/services/api/threadApi.ts
  • app/src/types/turnState.ts
  • src/core/event_bus/events.rs
  • src/core/jsonrpc.rs
  • src/openhuman/agent/mod.rs
  • src/openhuman/agent/task_board.rs
  • src/openhuman/agent/task_dispatcher.rs
  • src/openhuman/agent/tools/todo.rs
  • src/openhuman/agent/triage/envelope.rs
  • src/openhuman/agent/triage/escalation.rs
  • src/openhuman/channels/runtime/startup.rs
  • src/openhuman/notifications/rpc.rs
  • src/openhuman/task_sources/ops.rs
  • src/openhuman/task_sources/periodic.rs
  • src/openhuman/task_sources/route.rs
  • src/openhuman/task_sources/schemas.rs
  • src/openhuman/task_sources/store.rs
  • src/openhuman/task_sources/types.rs
  • src/openhuman/threads/turn_state/mirror_tests.rs
  • src/openhuman/todos/ops.rs
  • src/openhuman/todos/schemas.rs

Comment on lines +175 to +188
const response = await callCoreRpc<{
data?: { threadId?: string | null; cards?: TaskBoardCard[] };
}>({
method: 'openhuman.todos_decide_plan',
params: { thread_id: threadId, id: cardId, approve },
});
const data = unwrapEnvelope(response);
if (!data?.cards) return null;
return {
threadId: data.threadId ?? threadId,
cards: data.cards,
updatedAt: new Date().toISOString(),
};
},
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm TodosSnapshot serialization keys (expect top-level camelCase threadId + cards)
ast-grep --pattern 'struct TodosSnapshot {
  $$$
}'
rg -nP -C3 '(serde\(rename|rename_all)|struct\s+TodosSnapshot' --type=rust

Repository: tinyhumansai/openhuman

Length of output: 50379


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Locate the Rust RPC handler for `openhuman.todos_decide_plan`
rg -n --hidden --no-ignore-vcs "todos_decide_plan" src/openhuman -S || true

# 2) Locate the `TodosSnapshot` definition and its serde settings
rg -n "pub struct TodosSnapshot" -S src/openhuman/todos || true
sed -n '1,140p' src/openhuman/todos/ops.rs

# 3) Find how the RPC envelope is constructed for `todos_decide_plan`:
# search for "openhuman.todos_decide_plan" with nearby code patterns like RpcOutcome/Envelope/unwrap
rg -n "openhuman\.todos_decide_plan" -S src/openhuman || true

Repository: tinyhumansai/openhuman

Length of output: 5229


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Find the Rust handler implementation for `openhuman.todos_decide_plan`
rg -n --hidden --no-ignore-vcs "openhuman\.todos_decide_plan" src/openhuman -S || true
rg -n --hidden --no-ignore-vcs "todos_decide_plan" src/openhuman -S || true

# 2) If handlers are generated/registered elsewhere, broaden search slightly
rg -n --hidden --no-ignore-vcs "todos_decide_plan" src -S || true

# 3) Search for where `TodosSnapshot` is returned from the decide/approve/reject flow
rg -n "TodosSnapshot" src/openhuman/todos -S

Repository: tinyhumansai/openhuman

Length of output: 1225


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find all occurrences of the RPC method name
rg -n --hidden --no-ignore-vcs "openhuman\.todos_decide_plan|todos_decide_plan" src -S || true

# Inspect todos RPC schemas/handlers around decide/approve flow
rg -n --hidden --no-ignore-vcs "decide_plan|approve|reject|approve.*plan|reject.*plan" src/openhuman/todos -S || true

# Show the parts that serialize TodosSnapshot to JSON and the corresponding RPC response builder
sed -n '1,120p' src/openhuman/todos/schemas.rs
sed -n '430,540p' src/openhuman/todos/schemas.rs

# Also inspect the snapshot conversion helper
sed -n '440,520p' src/openhuman/todos/schemas.rs

Repository: tinyhumansai/openhuman

Length of output: 13290


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect handle_decide_plan to see what it returns (snapshot directly vs nested)
rg -n "fn handle_decide_plan" -S src/openhuman/todos/schemas.rs
sed -n '340,430p' src/openhuman/todos/schemas.rs

# Inspect TS RPC envelope helpers to see what unwrapEnvelope extracts into `data`
rg -n "function unwrapEnvelope|unwrapEnvelope" -S app/src
rg -n "core_rpc_relay" -S app/src/services

# Show the unwrapEnvelope implementation (keep output small)
rg -n "unwrapEnvelope" app/src/services -S --files-with-matches || true

Repository: tinyhumansai/openhuman

Length of output: 5864


🏁 Script executed:

#!/bin/bash
set -euo pipefail

sed -n '1,240p' app/src/services/api/threadApi.ts

Repository: tinyhumansai/openhuman

Length of output: 6797


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate callCoreRpc implementation
rg -n "function callCoreRpc|callCoreRpc<" -S app/src/services app/src | head -n 50

# Print the core client / relay call and response handling
rg -n "core_rpc_relay" -S app/src/services app/src | head -n 50

# Inspect the core RPC client file once we find it
# (Try common path first; if not found, we’ll have already located it above.)
if [ -f app/src/services/coreRpcClient.ts ]; then
  sed -n '1,220p' app/src/services/coreRpcClient.ts
elif [ -f app/src/services/api/coreRpcClient.ts ]; then
  sed -n '1,220p' app/src/services/api/coreRpcClient.ts
fi

# Also inspect any types for Envelope/Controller outputs if present
rg -n "interface Envelope|type Envelope|data\?: T" -S app/src/services app/src | head -n 50

Repository: tinyhumansai/openhuman

Length of output: 5394


🏁 Script executed:

#!/bin/bash
set -euo pipefail

sed -n '430,620p' app/src/services/coreRpcClient.ts

Repository: tinyhumansai/openhuman

Length of output: 7083


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find the core/controller dispatch code that maps controller outputs to JSON-RPC result
rg -n "ControllerSchema|RegisteredController|FieldSchema" src/openhuman src/core src -S || true

# Specifically find where controller handler return values are placed into the JSON-RPC result/envelope.
rg -n "unwrapEnvelope|jsonrpc.*result|\"data\"|Envelope" src -S || true

# Look for the controller runner / rpc method handler
rg -n "RegisteredController" src -S || true

# Search for controller output field name mapping logic (e.g. using FieldSchema.name like "snapshot")
rg -n "FieldSchema\s*\{|name:\s*\"snapshot\"|\"snapshot\"" src/openhuman/todos/schemas.rs src -S || true

Repository: tinyhumansai/openhuman

Length of output: 50379


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Focus on the JSON-RPC controller plumbing in Rust
rg -n "FieldSchema|outputs:|RegisteredController|ControllerFuture|ControllerSchema" src/core/jsonrpc.rs src/core -S || true

# Find where controller output fields are mapped into the JSON-RPC result payload.
rg -n "\"data\"|data\\?\\:|result\\s*:\\s*|jsonrpc.*result|outputs\\b|FieldSchema\\b" src/core/jsonrpc.rs -S || true

# Show the most relevant nearby sections by searching for "outputs" join logic.
rg -n "outputs\\s*\\.|outputs\\b|for .*in .*outputs|map\\(.*outputs" src/core/jsonrpc.rs -S || true

# Print a small window around the densest controller/outputs mapping match.
# (Pick the first match after sorting by line number.)
match_line="$(rg -n "outputs\\b|FieldSchema\\b|RegisteredController\\b" src/core/jsonrpc.rs -S | head -n 1 | cut -d: -f1)"
echo "First match line: $match_line"
sed -n "$((match_line-120)),$((match_line+120))p" src/core/jsonrpc.rs

Repository: tinyhumansai/openhuman

Length of output: 13160


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the controller JSON-RPC result construction around the outputs mapping
sed -n '2100,2320p' src/core/jsonrpc.rs

Repository: tinyhumansai/openhuman

Length of output: 2848


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Re-open around the exact hit for `method.outputs` in src/core/jsonrpc.rs
sed -n '2135,2185p' src/core/jsonrpc.rs

# And inspect a slightly larger neighborhood just in case the mapping logic is adjacent
sed -n '2185,2260p' src/core/jsonrpc.rs

# Directly locate the controller result construction code in jsonrpc.rs
rg -n "method\\.outputs|outputs:\\s*method\\.outputs|jsonrpc.*result|\"result\"\\s*:" src/core/jsonrpc.rs -S

# Print the most relevant nearby section where outputs are consumed
first_hit="$(rg -n "outputs:\\s*method\\.outputs|method\\.outputs" src/core/jsonrpc.rs -S | head -n 1 | cut -d: -f1)"
echo "First outputs mapping hit at: $first_hit"
sed -n "$((first_hit-120)),$((first_hit+120))p" src/core/jsonrpc.rs

Repository: tinyhumansai/openhuman

Length of output: 7046


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Look for JSON-RPC response construction in backend
rg -n "\"data\"\\s*:" src/core/jsonrpc.rs -S || true
rg -n "result\\s*\\{|\"result\"\\s*:" src/core/jsonrpc.rs -S || true
rg -n "jsonrpc.*result|\\\"result\\\"" src/core/jsonrpc.rs -S || true

# Find code path that serializes controller outputs
rg -n "ControllerFuture|ControllerHandler|RegisteredController|schema_for_rpc_method" src/core/jsonrpc.rs src/core -S || true

# Narrow: find any place where handler return Value is wrapped into an object with "data"
rg -n "Map<.*>\\s*\\)|Value::Object|serde_json::Value::Object|envelope|Envelope" src/core/jsonrpc.rs -S || true

Repository: tinyhumansai/openhuman

Length of output: 3761


🏁 Script executed:

#!/bin/bash
set -euo pipefail

sed -n '1,240p' src/core/dispatch.rs

Repository: tinyhumansai/openhuman

Length of output: 8484


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect where registered controller outputs are invoked and turned into RPC JSON
rg -n "try_invoke_registered_rpc|invocation_to_rpc_json|InvocationResult|controller-emitted structured envelope" src/core -S

# Show relevant sections in src/core/all.rs (matches from earlier grep suggest these are near the decode logic)
sed -n '1,260p' src/core/all.rs
sed -n '520,860p' src/core/all.rs

# Also inspect src/core/types.rs if it contains invocation_to_rpc_json
rg -n "invocation_to_rpc_json" src/core -S

Repository: tinyhumansai/openhuman

Length of output: 25378


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect JSON envelope creation for InvocationResult
sed -n '1,120p' src/core/types.rs
sed -n '120,240p' src/core/types.rs

# Inspect controller structured-envelope decoding in jsonrpc.rs near the earlier hit (~60)
sed -n '1,140p' src/core/jsonrpc.rs

# Inspect further around where try_invoke_registered_rpc result is embedded (around ~320-380)
sed -n '300,420p' src/core/jsonrpc.rs

Repository: tinyhumansai/openhuman

Length of output: 19793


decidePlan wire shape matches TodosSnapshot (camelCase, top-level)

TodosSnapshot is serialized with #[serde(rename_all = "camelCase")], and the openhuman/todos handle_decide_plan handler returns snapshot_to_json(...) directly—so the RPC result includes threadId and cards at the top level (not snake_case and not nested under another object).

  • In app/src/services/api/threadApi.ts, tighten the callCoreRpc generic to the snapshot shape directly (remove the extra data?: { ... } wrapper) to improve type safety.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/src/services/api/threadApi.ts` around lines 175 - 188, The RPC response
generic currently expects a nested data object but the core returns a top-level
TodosSnapshot (camelCase) — update the callCoreRpc generic in the decidePlan
call (the invocation around callCoreRpc<{...}> and the subsequent unwrapEnvelope
usage in threadApi.ts) to the TodosSnapshot shape (i.e., an object with
threadId?: string | null and cards?: TaskBoardCard[] at the top level), remove
the extra data?: { ... } wrapper from the generic, and then read threadId and
cards directly from the unwrapped response (keeping the existing fallback to
threadId and the updatedAt construction).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept as-is for consistency with the sibling methods (getTaskBoard/putTaskBoard) which use the same { data?: ... } envelope shape — unwrapEnvelope extracts response.data, so the data?: wrapper is required, not extra. The inner shape is already typed ({ threadId?: string | null; cards?: TaskBoardCard[] }), and the threadApi.decidePlan tests assert the parsed result.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sanil-23, you're right — unwrapEnvelope peels off the data envelope, so the data?: { ... } wrapper in the generic is necessary, not redundant. The inner shape { threadId?: string | null; cards?: TaskBoardCard[] } is already correctly typed, and the pattern is consistent with getTaskBoard/putTaskBoard. My original comment was mistaken. Thanks for the clear explanation!


🧠 Learnings used
Learnt from: CodeGhost21
Repo: tinyhumansai/openhuman PR: 2147
File: app/src/services/backendHealth.ts:43-48
Timestamp: 2026-05-18T20:17:22.924Z
Learning: In the OpenHuman desktop app (Tauri + TypeScript/React), `core_rpc_relay`/`callCoreRpc` (implemented in `app/src/services/coreRpcClient.ts`) is an in-process JSON-RPC tunnel that proxies requests only to the local core at `http://127.0.0.1:<port>/rpc` using the `OPENHUMAN_CORE_TOKEN` bearer. During review, treat this as NOT a generic HTTP egress proxy: do not route requests intended for external services (e.g., `api.*`) or external health/status checks through `core_rpc_relay`. If code suggests using `callCoreRpc` for external connectivity, flag it and require using the appropriate external HTTP client/path instead.

Comment thread src/openhuman/agent/task_dispatcher.rs Outdated
Comment thread src/openhuman/task_sources/ops.rs
Comment thread src/openhuman/task_sources/types.rs
senamakel added 5 commits May 29, 2026 13:53
… them

A card-linked proactive task-source trigger that triage decided to drop or
acknowledge stayed Todo on the board, so the board poller re-dispatched it on
the next tick — silently breaking the documented noise-gating contract on
SourceTarget::AgentTodoProactive. apply_decision now marks a still-pending
linked card Rejected on Drop/Acknowledge (best-effort; only pending cards are
touched), which pick_next_todo filters out. Adds tests for both arms plus a
with_task_card builder test.

Addresses review finding on escalation.rs:94.
…ngest

content_hash canonicalized only title/body/status/updated_at, so a provider
that edits an item's URL without advancing updated_at would be deduped and the
stale url would survive in the card's source_metadata/notes (url is load-bearing
for external write-back). Adds url to the canonical hash and a regression test;
also covers the new ops::add assigned_executor persist/filter-blank path.

Addresses review finding on store.rs content_hash.
…tadata wiring

add_card's stamping of objective (bare title), assigned_agent (trimmed
executor), and source_metadata was untested. Adds tempdir tests for the
populated and whitespace-only-executor cases.
The edit dialog's status <select> only rendered the four column statuses, so a
card in awaiting_approval/ready/rejected became a controlled select with no
matching option (React warning + the value silently showed as the first
option, hiding the real status). Drive the option list from the full
TaskBoardCardStatus union via a status->labelKey map and add the three new
i18n keys (en.ts + every locale chunk). Adds a regression test.
resolve_executor's personality arm (SOUL/MEMORY preamble assembly) was
untested — only the default and unresolved-fallback paths were. Adds a
fixture-free test exploiting the built-in 'research' profile.

Addresses review coverage finding on task_dispatcher.rs resolve_executor.
/// or `Err` *without* spawning when the card is no longer claimable — its
/// freshly-loaded status isn't `Todo`/`Ready` (already running/done, or another
/// dispatcher won the claim). Benign: the poller retries next tick.
pub async fn dispatch_card(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferred (coverage / not blocking) — dispatch_card claim-CAS + approval-park branch untested.

A fresh CodeRabbit-style pass (with adversarial verification) confirmed that dispatch_card's three branches remain uncovered: the claim CAS (not claimable / not found), the plan-approval park (require_task_plan_approval + TodoAwaitingApproval + event), and the claim→InProgress+spawn path. Only the pure helpers it calls are tested, and the diff-cover margin currently rides on those plus struct-field churn.

I did not apply a fix for this one because (a) the Coverage Gate is currently green, (b) you already discussed an integration-shaped test for this path with @senamakel and offered to add it "if diff-cover trips", and (c) the robust fix changes production structure (extract a pure classify_claim(current_status, require_approval) -> ClaimDecision so the guard/park decision is unit-testable without Config::load_or_init ambient state). Flagging so it isn't lost — your call on whether to land the helper extraction now or leave it for the follow-up.

In this same session I did add coverage for the adjacent gaps: resolve_executor personality branch, route::add_card wiring, ops::add assigned_executor persist/filter, the content_hash url fix, and the triage drop/acknowledge gating fix.

@coderabbitai coderabbitai Bot added the memory Memory store, memory tree, recall, summarization, and embeddings in src/openhuman/memory/. label May 29, 2026
- task_dispatcher: dispatch_card now uses the freshly-reloaded card (not the
  caller's possibly-stale arg) for build_task_prompt + resolve_executor, so a
  card edited between selection and claim runs with its current objective /
  plan / assigned_agent. (Major)
- task_sources::ops::add: log assigned_executor for end-to-end routing
  traceability. (Major)
- FE taskSources.ts: add assignedExecutor to TaskSource / TaskSourcePatch and
  assigned_executor to TaskSourceAddParams so the UI can send/read executor
  routing. (Minor)

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/openhuman/agent/triage/escalation.rs`:
- Around line 349-379: The current code reads the card status with ops::list and
then unconditionally calls ops::update_status to set TaskCardStatus::Rejected,
which can overwrite a newer terminal status if the poller advances the card;
replace this two-step read-then-write with an atomic compare-and-set: add or use
an API like ops::update_status_if_current / ops::compare_and_set_status that
accepts (location, card_id, expected_status, new_status) and call it with
expected_status equal to the status matched (e.g., Todo|Ready|AwaitingApproval)
and new_status = Rejected; handle the Ok(success=false) case by logging that the
status changed and skipping, and preserve the existing Err handling for storage
errors so you do not overwrite concurrent updates.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5f501b89-d292-4c18-85ad-89ca48e83d7c

📥 Commits

Reviewing files that changed from the base of the PR and between 55251dc and e2e6666.

📒 Files selected for processing (23)
  • app/src/lib/i18n/chunks/ar-5.ts
  • app/src/lib/i18n/chunks/bn-5.ts
  • app/src/lib/i18n/chunks/de-5.ts
  • app/src/lib/i18n/chunks/en-5.ts
  • app/src/lib/i18n/chunks/es-5.ts
  • app/src/lib/i18n/chunks/fr-5.ts
  • app/src/lib/i18n/chunks/hi-5.ts
  • app/src/lib/i18n/chunks/id-5.ts
  • app/src/lib/i18n/chunks/it-5.ts
  • app/src/lib/i18n/chunks/ko-5.ts
  • app/src/lib/i18n/chunks/pl-5.ts
  • app/src/lib/i18n/chunks/pt-5.ts
  • app/src/lib/i18n/chunks/ru-5.ts
  • app/src/lib/i18n/chunks/zh-CN-5.ts
  • app/src/lib/i18n/en.ts
  • app/src/pages/conversations/components/TaskKanbanBoard.test.tsx
  • app/src/pages/conversations/components/TaskKanbanBoard.tsx
  • src/openhuman/agent/task_dispatcher.rs
  • src/openhuman/agent/triage/envelope.rs
  • src/openhuman/agent/triage/escalation.rs
  • src/openhuman/task_sources/route.rs
  • src/openhuman/task_sources/store.rs
  • src/openhuman/task_sources/store_tests.rs
✅ Files skipped from review due to trivial changes (4)
  • app/src/lib/i18n/chunks/zh-CN-5.ts
  • app/src/lib/i18n/chunks/bn-5.ts
  • app/src/lib/i18n/chunks/hi-5.ts
  • app/src/lib/i18n/chunks/fr-5.ts
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/openhuman/agent/triage/envelope.rs
  • src/openhuman/task_sources/route.rs
  • src/openhuman/agent/task_dispatcher.rs

Comment on lines +349 to +379
let current = match ops::list(&link.location) {
Ok(snapshot) => snapshot
.cards
.into_iter()
.find(|c| c.id == link.card_id)
.map(|c| c.status),
Err(e) => {
tracing::warn!(
card_id = %link.card_id,
error = %e,
"[triage::escalation] reload before gating linked card failed"
);
return;
}
};

match current {
Some(TaskCardStatus::Todo | TaskCardStatus::Ready | TaskCardStatus::AwaitingApproval) => {
match ops::update_status(&link.location, &link.card_id, TaskCardStatus::Rejected) {
Ok(_) => tracing::info!(
card_id = %link.card_id,
decision = %decision,
"[triage::escalation] gated task-card → rejected (poller will skip)"
),
Err(e) => tracing::warn!(
card_id = %link.card_id,
decision = %decision,
error = %e,
"[triage::escalation] failed to gate task-card (poller may re-dispatch)"
),
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Use an atomic compare-and-set for terminal gating.

Line 349 reads status and Line 367 writes Rejected in a separate step. If the poller advances the card between those operations, this path can overwrite a newer status (in_progress/done) with rejected.

Suggested direction
- match ops::update_status(&link.location, &link.card_id, TaskCardStatus::Rejected) {
+ match ops::update_status_if_current(
+     &link.location,
+     &link.card_id,
+     &[
+         TaskCardStatus::Todo,
+         TaskCardStatus::Ready,
+         TaskCardStatus::AwaitingApproval,
+     ],
+     TaskCardStatus::Rejected,
+ ) {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/agent/triage/escalation.rs` around lines 349 - 379, The current
code reads the card status with ops::list and then unconditionally calls
ops::update_status to set TaskCardStatus::Rejected, which can overwrite a newer
terminal status if the poller advances the card; replace this two-step
read-then-write with an atomic compare-and-set: add or use an API like
ops::update_status_if_current / ops::compare_and_set_status that accepts
(location, card_id, expected_status, new_status) and call it with
expected_status equal to the status matched (e.g., Todo|Ready|AwaitingApproval)
and new_status = Rejected; handle the Ok(success=false) case by logging that the
status changed and skipping, and preserve the existing Err handling for storage
errors so you do not overwrite concurrent updates.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/openhuman/task_sources/ops.rs (1)

70-83: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Prevent partial-create when patching assigned_executor fails (src/openhuman/task_sources/ops.rs ~70-83)

add_source persists the TaskSource, then update_source applies the optional assigned_executor patch. If that follow-up update_source errors, add returns Err while the source remains stored without assigned_executor, so a retry can create duplicates. Make the operation atomic (transaction) or rollback on patch failure (remove_source), or degrade gracefully by returning the already-created source (with a warning).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/task_sources/ops.rs` around lines 70 - 83, The code currently
persists a new TaskSource then calls store::update_source to set
TaskSourcePatch.assigned_executor, which can fail and leave the created source
inconsistent; modify the add_source flow around the store::update_source call so
that on any Err from store::update_source you immediately attempt to roll back
by calling store::remove_source(config, &source.id) (log/remove errors
separately but return the original update error), ensuring the operation is
atomic; target the block that handles assigned_executor, i.e. the match around
assigned_executor / store::update_source and the local source variable, and
ensure you propagate the update error after attempting rollback.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@src/openhuman/task_sources/ops.rs`:
- Around line 70-83: The code currently persists a new TaskSource then calls
store::update_source to set TaskSourcePatch.assigned_executor, which can fail
and leave the created source inconsistent; modify the add_source flow around the
store::update_source call so that on any Err from store::update_source you
immediately attempt to roll back by calling store::remove_source(config,
&source.id) (log/remove errors separately but return the original update error),
ensuring the operation is atomic; target the block that handles
assigned_executor, i.e. the match around assigned_executor /
store::update_source and the local source variable, and ensure you propagate the
update error after attempting rollback.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a72866dd-f34a-4d24-9cd6-c90bf5ebecab

📥 Commits

Reviewing files that changed from the base of the PR and between e2e6666 and 5d88c1d.

📒 Files selected for processing (3)
  • app/src/utils/tauriCommands/taskSources.ts
  • src/openhuman/agent/task_dispatcher.rs
  • src/openhuman/task_sources/ops.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/openhuman/agent/task_dispatcher.rs

@senamakel senamakel merged commit 886c4a6 into tinyhumansai:main May 29, 2026
31 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent Built-in agents, prompts, orchestration, and agent runtime in src/openhuman/agent/. feature Net-new user-facing capability or product behavior. memory Memory store, memory tree, recall, summarization, and embeddings in src/openhuman/memory/. rust-core Core Rust runtime in src/: CLI, core_server, shared infrastructure.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants