From de7b1b2cca6b0e5204d0881bdafc9e966369af49 Mon Sep 17 00:00:00 2001 From: kunaluipath Date: Wed, 20 May 2026 18:31:33 +0530 Subject: [PATCH] feat: bind HITL escalation recipients to upstream tool outputs Adds support for resolving escalation recipients at runtime from the output of upstream tools, in addition to the existing literal / agent-input bindings. Works for User, Group, Workload, RoundRobin, and CustomAssignees criteria. - _extract_tool_output_value: walks state.messages backwards, finds the latest ToolMessage by name, parses content as JSON, extracts a top-level field (fail-loud on missing tool / missing path). - _build_tool_output_task_recipient: maps the extracted value to the appropriate TaskRecipient shape per criteria (list to Workload-style, string to single, CustomAssignees comma-split). - resolve_recipient_value: new ToolOutputRecipient branch. - resolve_channel_recipients: threads tool_messages through; tool-output takes precedence over CustomAssignees aggregation. - escalation_wrapper: captures state.messages into tool.metadata so escalation_tool_fn can read it without changing public signatures. - create_escalation_tool: auto-augments tool description with a hint listing the tool dependencies for the LLM to plan tool calls. Backwards-compatible: existing recipients without source parse and resolve identically. Co-Authored-By: Claude Opus 4.7 --- .../guardrails/actions/escalate_action.py | 9 + .../agent/tools/escalation_tool.py | 266 +++++++- tests/agent/tools/test_escalation_tool.py | 577 +++++++++++++++++- 3 files changed, 847 insertions(+), 5 deletions(-) diff --git a/src/uipath_langchain/agent/guardrails/actions/escalate_action.py b/src/uipath_langchain/agent/guardrails/actions/escalate_action.py index ab8d235df..9a513f63d 100644 --- a/src/uipath_langchain/agent/guardrails/actions/escalate_action.py +++ b/src/uipath_langchain/agent/guardrails/actions/escalate_action.py @@ -16,7 +16,10 @@ from uipath.agent.models.agent import ( AgentEscalationRecipient, AssetRecipient, + CustomAssigneesRecipient, + RoundRobinRecipient, StandardRecipient, + WorkloadRecipient, ) from uipath.platform import UiPath from uipath.platform.action_center.tasks import Task, TaskRecipient @@ -140,6 +143,12 @@ async def _create_task_node( metadata["escalation_data"]["assigned_to"] = ( task_recipient.value if task_recipient else None ) + elif isinstance(self.recipient, (WorkloadRecipient, RoundRobinRecipient)): + metadata["escalation_data"]["assigned_to"] = self.recipient.display_name + elif isinstance(self.recipient, CustomAssigneesRecipient): + metadata["escalation_data"]["assigned_to"] = ( + self.recipient.display_name or self.recipient.value + ) # Validate message count based on execution stage _validate_message_count(state, execution_stage) diff --git a/src/uipath_langchain/agent/tools/escalation_tool.py b/src/uipath_langchain/agent/tools/escalation_tool.py index 72aebeece..d2e9a13a2 100644 --- a/src/uipath_langchain/agent/tools/escalation_tool.py +++ b/src/uipath_langchain/agent/tools/escalation_tool.py @@ -4,8 +4,9 @@ import logging import os from enum import Enum -from typing import Any, Literal +from typing import Any, Literal, Sequence +from langchain_core.messages import BaseMessage, ToolMessage from langchain_core.messages.tool import ToolCall from langchain_core.tools import BaseTool, StructuredTool from pydantic import BaseModel @@ -17,8 +18,12 @@ ArgumentEmailRecipient, ArgumentGroupNameRecipient, AssetRecipient, + CustomAssigneesRecipient, LowCodeAgentDefinition, + RoundRobinRecipient, StandardRecipient, + ToolOutputRecipient, + WorkloadRecipient, ) from uipath.agent.utils.text_tokens import safe_get_nested from uipath.eval.mocks import mockable @@ -66,11 +71,137 @@ class EscalationAction(str, Enum): END = "end" +_logger = logging.getLogger(__name__) + + +def _extract_tool_output_value( + tool_messages: Sequence[BaseMessage], + tool_name: str, + output_path: str, +) -> Any: + """Walk the agent's message history backwards for the latest ToolMessage matching + ``tool_name``, parse its content as JSON, and return the field at ``output_path``. + + ``output_path`` is a top-level field name (v1). If the path is empty, the whole + parsed content is returned. Raises ``ValueError`` (fail-loud) when the tool was + never called or the path doesn't exist. + """ + for msg in reversed(tool_messages): + if isinstance(msg, ToolMessage) and getattr(msg, "name", None) == tool_name: + content = msg.content + # ToolMessage content is typically a string (the stringified tool output). + # If it's already structured, use it as-is. + parsed: Any + if isinstance(content, str): + try: + parsed = json.loads(content) + except (json.JSONDecodeError, ValueError): + parsed = content + else: + parsed = content + + if not output_path: + return parsed + + if isinstance(parsed, dict): + if output_path not in parsed: + raise ValueError( + f"Tool '{tool_name}' output does not contain field " + f"'{output_path}'. Available fields: {list(parsed.keys())}." + ) + return parsed[output_path] + + raise ValueError( + f"Tool '{tool_name}' output is not a JSON object — cannot extract " + f"field '{output_path}' from output of type {type(parsed).__name__}." + ) + + raise ValueError( + f"Tool '{tool_name}' has not been called yet; cannot resolve recipient " + f"binding (expected tool output field '{output_path}'). Make sure the agent " + f"invokes '{tool_name}' before this escalation." + ) + + +def _build_tool_output_task_recipient( + recipient_type: AgentEscalationRecipientType, + value: Any, +) -> TaskRecipient | None: + """Map an extracted tool-output value to a TaskRecipient appropriate for the + target criteria type. Lists of emails go through the Workload path (matching + CustomAssignees semantics); single strings go through the type-specific path. + """ + if isinstance(value, list): + # Filter to truthy strings — tool outputs may contain nulls/empty entries. + emails = [str(v) for v in value if v] + if not emails: + raise ValueError( + f"Tool-output recipient resolved to an empty list for criteria " + f"{recipient_type.value}." + ) + return TaskRecipient( + value=emails[0], + values=emails, + type=TaskRecipientType.WORKLOAD, + ) + + value_str = str(value) if value is not None else "" + if not value_str: + raise ValueError( + f"Tool-output recipient resolved to an empty value for criteria " + f"{recipient_type.value}." + ) + + if recipient_type == AgentEscalationRecipientType.USER_ID: + return TaskRecipient(value=value_str, type=TaskRecipientType.USER_ID) + if recipient_type == AgentEscalationRecipientType.GROUP_ID: + return TaskRecipient(value=value_str, type=TaskRecipientType.GROUP_ID) + if recipient_type == AgentEscalationRecipientType.WORKLOAD: + return TaskRecipient( + value=value_str, + values=[value_str], + type=TaskRecipientType.WORKLOAD, + ) + if recipient_type == AgentEscalationRecipientType.ROUND_ROBIN: + return TaskRecipient( + value=value_str, + values=[value_str], + type=TaskRecipientType.ROUND_ROBIN, + ) + # CustomAssignees with a single string value — treat as comma-separated emails. + if recipient_type == AgentEscalationRecipientType.CUSTOM_ASSIGNEES: + emails = [s.strip() for s in value_str.split(",") if s.strip()] + if not emails: + return None + return TaskRecipient( + value=emails[0], + values=emails, + type=TaskRecipientType.WORKLOAD, + ) + return None + + async def resolve_recipient_value( recipient: AgentEscalationRecipient, input_args: dict[str, Any] | None = None, + tool_messages: Sequence[BaseMessage] | None = None, ) -> TaskRecipient | None: - """Resolve recipient value based on recipient type.""" + """Resolve recipient value based on recipient type. + + ``tool_messages`` is the agent's full message history (passed through from + the escalation wrapper). It's only consulted for ``ToolOutputRecipient``; + other recipient types ignore it. + """ + if isinstance(recipient, ToolOutputRecipient): + # Fail loud: a misconfigured tool-output binding should not silently + # create an unassigned task. + value = _extract_tool_output_value( + tool_messages or [], + recipient.tool_name, + recipient.output_path, + ) + return _build_tool_output_task_recipient(recipient.type, value) + if isinstance(recipient, AssetRecipient): value = await resolve_asset(recipient.asset_name, get_execution_folder_path()) type = None @@ -100,6 +231,34 @@ async def resolve_recipient_value( value=value, type=TaskRecipientType.GROUP_NAME, displayName=value ) + if isinstance(recipient, WorkloadRecipient): + # Action Center expects the group NAME in assigneeNamesOrEmails; + # `value` on the agent model is the group identifier, `display_name` is the name. + return TaskRecipient( + value=recipient.display_name, + type=TaskRecipientType.WORKLOAD, + displayName=recipient.display_name, + ) + + if isinstance(recipient, RoundRobinRecipient): + return TaskRecipient( + value=recipient.display_name, + type=TaskRecipientType.ROUND_ROBIN, + displayName=recipient.display_name, + ) + + if isinstance(recipient, CustomAssigneesRecipient): + # A single CustomAssignees recipient becomes a one-element Workload assignment. + # Multi-assignee aggregation across recipients[] is handled by resolve_channel_recipients. + if not recipient.value: + return None + return TaskRecipient( + value=recipient.value, + values=[recipient.value], + type=TaskRecipientType.WORKLOAD, + displayName=recipient.display_name, + ) + if isinstance(recipient, StandardRecipient): type = TaskRecipientType(recipient.type) if recipient.type == AgentEscalationRecipientType.USER_EMAIL: @@ -111,6 +270,50 @@ async def resolve_recipient_value( return None +async def resolve_channel_recipients( + recipients: list[AgentEscalationRecipient], + input_args: dict[str, Any] | None = None, + tool_messages: Sequence[BaseMessage] | None = None, +) -> TaskRecipient | None: + """Resolve a channel's full recipients list into a single TaskRecipient. + + For ``CustomAssignees`` channels — which carry one recipient per assignee email — + all values are collected into a single Workload assignment with the full email list. + For all other types only the first recipient is used (the channel always has one). + + ``tool_messages`` is the agent's message history, threaded through to support + ``ToolOutputRecipient`` resolution. + """ + if not recipients: + return None + + # Tool-output binding takes precedence over per-type aggregation: if the first + # recipient is a tool-output, we delegate to the resolver and let it figure + # out the right TaskRecipient shape for the criteria type. + if isinstance(recipients[0], ToolOutputRecipient): + return await resolve_recipient_value( + recipients[0], input_args=input_args, tool_messages=tool_messages + ) + + if isinstance(recipients[0], CustomAssigneesRecipient): + emails = [ + r.value + for r in recipients + if isinstance(r, CustomAssigneesRecipient) and r.value + ] + if not emails: + return None + return TaskRecipient( + value=emails[0], + values=emails, + type=TaskRecipientType.WORKLOAD, + ) + + return await resolve_recipient_value( + recipients[0], input_args=input_args, tool_messages=tool_messages + ) + + async def resolve_asset(asset_name: str, folder_path: str | None) -> str | None: """Retrieve asset value.""" try: @@ -282,8 +485,17 @@ async def escalation_tool_fn(**kwargs: Any) -> dict[str, Any]: agent_input: dict[str, Any] = ( tool.metadata.get("agent_input") if tool.metadata else None ) or {} + # Tool-output recipient bindings resolve by walking the agent's message + # history. The wrapper stashes them in metadata before invoking the tool. + tool_messages: list[BaseMessage] = ( + tool.metadata.get("agent_messages") if tool.metadata else None + ) or [] recipient: TaskRecipient | None = ( - await resolve_recipient_value(channel.recipients[0], input_args=agent_input) + await resolve_channel_recipients( + channel.recipients, + input_args=agent_input, + tool_messages=tool_messages, + ) if channel.recipients else None ) @@ -452,6 +664,18 @@ async def escalation_wrapper( k: v for k, v in state_dict.items() if k not in internal_fields } + # Expose the raw message history to the tool fn so it can resolve + # `ToolOutputRecipient` bindings against prior tool calls. We pull + # directly from `state` (not `state_dict`) so we preserve the original + # message objects (sanitized dicts lose `isinstance(..., ToolMessage)`). + # `state` may be either a Pydantic model (runtime) or a plain dict (tests). + raw_messages = ( + getattr(state, "messages", None) + if not isinstance(state, dict) + else state.get("messages") + ) + tool.metadata["agent_messages"] = list(raw_messages or []) + tool.metadata["_call_id"] = call.get("id") tool.metadata["_call_args"] = dict(call.get("args", {})) @@ -478,9 +702,43 @@ async def escalation_wrapper( "assigned_to": result.get("assigned_to"), } + # Augment the description so the LLM understands tool-output recipient + # dependencies: when a recipient is bound to the output of a specific tool, + # the LLM must call that tool first before invoking this escalation. Without + # this hint the dependency is invisible to the LLM (it doesn't see the + # recipient binding, only the tool's input schema). + description = resource.description + tool_output_deps = [ + (r.tool_name, r.output_path) + for r in channel.recipients + if isinstance(r, ToolOutputRecipient) + ] + if tool_output_deps: + # Deduplicate while preserving order. + seen: set[tuple[str, str]] = set() + unique_deps: list[tuple[str, str]] = [] + for dep in tool_output_deps: + if dep not in seen: + seen.add(dep) + unique_deps.append(dep) + dep_lines = "\n".join( + f"- Output of tool `{tn}` (field `{op}`)" + if op + else f"- Output of tool `{tn}`" + for tn, op in unique_deps + ) + description = ( + f"{description or ''}\n\n" + "**Recipient routing notes:** this escalation's task assignment is " + "derived from the output of upstream tools. Before invoking this " + "escalation, make sure the following tools have been called and their " + "outputs are available in the agent's tool message history:\n" + f"{dep_lines}" + ).strip() + tool = StructuredToolWithArgumentProperties( name=tool_name, - description=resource.description, + description=description, args_schema=input_model, output_type=output_model, coroutine=escalation_tool_fn, diff --git a/tests/agent/tools/test_escalation_tool.py b/tests/agent/tools/test_escalation_tool.py index 78b685c8c..b3b04f91b 100644 --- a/tests/agent/tools/test_escalation_tool.py +++ b/tests/agent/tools/test_escalation_tool.py @@ -4,14 +4,18 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from langchain_core.messages import ToolCall +from langchain_core.messages import AIMessage, HumanMessage, ToolCall, ToolMessage from uipath.agent.models.agent import ( AgentEscalationChannel, AgentEscalationChannelProperties, AgentEscalationRecipientType, AgentEscalationResourceConfig, AssetRecipient, + CustomAssigneesRecipient, + RoundRobinRecipient, StandardRecipient, + ToolOutputRecipient, + WorkloadRecipient, ) from uipath.platform.action_center.tasks import Task, TaskRecipient, TaskRecipientType @@ -21,9 +25,11 @@ ) from uipath_langchain.agent.tools.escalation_tool import ( _build_escalation_memory_payload, + _extract_tool_output_value, _parse_task_data, create_escalation_tool, resolve_asset, + resolve_channel_recipients, resolve_recipient_value, ) @@ -206,6 +212,186 @@ async def test_resolve_recipient_no_value(self): assert result is None + @pytest.mark.asyncio + async def test_resolve_recipient_workload(self): + """WorkloadRecipient resolves to TaskRecipient with WORKLOAD type using displayName.""" + recipient = WorkloadRecipient( + type=AgentEscalationRecipientType.WORKLOAD, + value="group-id-1", + displayName="Support Team", + ) + + result = await resolve_recipient_value(recipient) + + assert result == TaskRecipient( + value="Support Team", + type=TaskRecipientType.WORKLOAD, + displayName="Support Team", + ) + + @pytest.mark.asyncio + async def test_resolve_recipient_round_robin(self): + """RoundRobinRecipient resolves to TaskRecipient with ROUND_ROBIN type.""" + recipient = RoundRobinRecipient( + type=AgentEscalationRecipientType.ROUND_ROBIN, + value="group-id-1", + displayName="Support Team", + ) + + result = await resolve_recipient_value(recipient) + + assert result == TaskRecipient( + value="Support Team", + type=TaskRecipientType.ROUND_ROBIN, + displayName="Support Team", + ) + + @pytest.mark.asyncio + async def test_resolve_recipient_custom_assignees_single(self): + """A single CustomAssigneesRecipient becomes a one-element Workload assignment.""" + recipient = CustomAssigneesRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + value="alice@example.com", + displayName="Alice", + ) + + result = await resolve_recipient_value(recipient) + + assert result == TaskRecipient( + value="alice@example.com", + values=["alice@example.com"], + type=TaskRecipientType.WORKLOAD, + displayName="Alice", + ) + + @pytest.mark.asyncio + async def test_resolve_recipient_custom_assignees_empty_value_returns_none(self): + """Empty-value CustomAssignees sentinel resolves to None.""" + recipient = CustomAssigneesRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + value="", + ) + + result = await resolve_recipient_value(recipient) + + assert result is None + + +class TestResolveChannelRecipients: + """Tests for the channel-level recipient aggregator.""" + + @pytest.mark.asyncio + async def test_empty_recipients_returns_none(self): + result = await resolve_channel_recipients([]) + assert result is None + + @pytest.mark.asyncio + async def test_single_workload_recipient_delegates_to_resolve_recipient_value(self): + recipient = WorkloadRecipient( + type=AgentEscalationRecipientType.WORKLOAD, + value="group-1", + displayName="Support Team", + ) + + result = await resolve_channel_recipients([recipient]) + + assert result == TaskRecipient( + value="Support Team", + type=TaskRecipientType.WORKLOAD, + displayName="Support Team", + ) + + @pytest.mark.asyncio + async def test_multiple_custom_assignees_collected_into_single_workload(self): + """Multiple CustomAssignees recipients aggregate into one Workload + values list.""" + recipients = [ + CustomAssigneesRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + value="alice@example.com", + displayName="Alice", + ), + CustomAssigneesRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + value="bob@example.com", + displayName="Bob", + ), + ] + + result = await resolve_channel_recipients(recipients) + + assert result == TaskRecipient( + value="alice@example.com", + values=["alice@example.com", "bob@example.com"], + type=TaskRecipientType.WORKLOAD, + ) + + @pytest.mark.asyncio + async def test_single_custom_assignee_collected_into_single_workload(self): + recipients = [ + CustomAssigneesRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + value="alice@example.com", + ), + ] + + result = await resolve_channel_recipients(recipients) + + assert result == TaskRecipient( + value="alice@example.com", + values=["alice@example.com"], + type=TaskRecipientType.WORKLOAD, + ) + + @pytest.mark.asyncio + async def test_custom_assignees_filters_empty_sentinel_values(self): + """Empty-value sentinel CustomAssignees are skipped, not included in the array.""" + recipients = [ + CustomAssigneesRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + value="", + ), + CustomAssigneesRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + value="alice@example.com", + ), + ] + + result = await resolve_channel_recipients(recipients) + + assert result == TaskRecipient( + value="alice@example.com", + values=["alice@example.com"], + type=TaskRecipientType.WORKLOAD, + ) + + @pytest.mark.asyncio + async def test_all_empty_custom_assignees_returns_none(self): + recipients = [ + CustomAssigneesRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + value="", + ), + ] + + result = await resolve_channel_recipients(recipients) + assert result is None + + @pytest.mark.asyncio + async def test_standard_recipient_uses_first_recipient_only(self): + """Non-CustomAssignees channels only use recipients[0].""" + recipient = StandardRecipient( + type=AgentEscalationRecipientType.USER_EMAIL, + value="alice@example.com", + ) + + result = await resolve_channel_recipients([recipient]) + + assert result == TaskRecipient( + value="alice@example.com", + type=TaskRecipientType.EMAIL, + displayName="alice@example.com", + ) + class TestEscalationToolMetadata: """Test that escalation tool has correct metadata for observability.""" @@ -1320,3 +1506,392 @@ def test_builds_trace_and_search_payloads(self): } assert attributes == {"arguments": serialized_input} assert "escalation-input" not in attributes + + +class TestExtractToolOutputValue: + """Tests for the helper that walks message history to extract tool output values.""" + + def test_extracts_top_level_field_from_json_content(self): + messages = [ + HumanMessage(content="hello"), + ToolMessage( + name="API workflow A", + tool_call_id="1", + content='{"includeEmails": ["alice@x.com", "bob@x.com"], "expiresAt": "2026-01-01"}', + ), + ] + result = _extract_tool_output_value(messages, "API workflow A", "includeEmails") + assert result == ["alice@x.com", "bob@x.com"] + + def test_returns_whole_parsed_content_when_path_is_empty(self): + messages = [ + ToolMessage( + name="getApprovers", + tool_call_id="1", + content='["alice@x.com", "bob@x.com"]', + ), + ] + result = _extract_tool_output_value(messages, "getApprovers", "") + assert result == ["alice@x.com", "bob@x.com"] + + def test_picks_latest_invocation_when_called_multiple_times(self): + messages = [ + ToolMessage( + name="A", tool_call_id="1", content='{"emails": ["old@x.com"]}' + ), + ToolMessage( + name="A", tool_call_id="2", content='{"emails": ["new@x.com"]}' + ), + ] + result = _extract_tool_output_value(messages, "A", "emails") + assert result == ["new@x.com"] + + def test_raises_when_tool_was_never_called(self): + messages = [ + ToolMessage(name="getUsers", tool_call_id="1", content="[]"), + ] + with pytest.raises(ValueError, match="has not been called yet"): + _extract_tool_output_value(messages, "API workflow A", "emails") + + def test_raises_when_field_missing_from_output(self): + messages = [ + ToolMessage(name="A", tool_call_id="1", content='{"otherField": []}'), + ] + with pytest.raises(ValueError, match="does not contain field 'emails'"): + _extract_tool_output_value(messages, "A", "emails") + + def test_raises_when_output_is_not_a_json_object_but_path_requested(self): + messages = [ + ToolMessage(name="A", tool_call_id="1", content='["a", "b"]'), + ] + with pytest.raises(ValueError, match="is not a JSON object"): + _extract_tool_output_value(messages, "A", "emails") + + def test_handles_non_json_string_content_gracefully(self): + # If the content isn't JSON, treat it as a raw string (the whole "output"). + messages = [ + ToolMessage(name="A", tool_call_id="1", content="some raw text"), + ] + # With no path, the raw string is returned. + result = _extract_tool_output_value(messages, "A", "") + assert result == "some raw text" + + def test_ignores_non_matching_tool_messages(self): + messages = [ + ToolMessage( + name="otherTool", tool_call_id="1", content='{"emails": ["wrong"]}' + ), + ToolMessage( + name="A", tool_call_id="2", content='{"emails": ["right@x.com"]}' + ), + AIMessage(content="thinking"), + ] + result = _extract_tool_output_value(messages, "A", "emails") + assert result == ["right@x.com"] + + def test_returns_empty_string_when_content_is_empty_and_no_path(self): + # Empty content with no path falls through the JSON parse to the raw "" string. + messages = [ToolMessage(name="A", tool_call_id="1", content="")] + result = _extract_tool_output_value(messages, "A", "") + assert result == "" + + def test_raises_when_malformed_json_and_path_requested(self): + # Malformed JSON content falls back to the raw string; attempting to extract a + # path then fails the same way as any other non-object output. + messages = [ + ToolMessage(name="A", tool_call_id="1", content='{"emails": ["a@x.com"'), + ] + with pytest.raises(ValueError, match="is not a JSON object"): + _extract_tool_output_value(messages, "A", "emails") + + def test_returns_null_when_field_value_is_explicitly_null(self): + # A null field value at the requested path is returned as Python None; the + # downstream recipient builder is the one that rejects None/empty values. + messages = [ + ToolMessage(name="A", tool_call_id="1", content='{"emails": null}'), + ] + result = _extract_tool_output_value(messages, "A", "emails") + assert result is None + + +class TestToolOutputRecipientResolution: + """Tests for resolving ToolOutputRecipient via resolve_recipient_value.""" + + @pytest.mark.asyncio + async def test_custom_assignees_with_array_of_emails(self): + recipient = ToolOutputRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + source="toolOutput", + toolName="API workflow A", + outputPath="includeEmails", + ) + messages = [ + ToolMessage( + name="API workflow A", + tool_call_id="1", + content='{"includeEmails": ["alice@x.com", "bob@x.com"]}', + ), + ] + + result = await resolve_recipient_value(recipient, tool_messages=messages) + + assert result == TaskRecipient( + value="alice@x.com", + values=["alice@x.com", "bob@x.com"], + type=TaskRecipientType.WORKLOAD, + ) + + @pytest.mark.asyncio + async def test_custom_assignees_with_comma_separated_string(self): + recipient = ToolOutputRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + source="toolOutput", + toolName="A", + outputPath="emails", + ) + messages = [ + ToolMessage( + name="A", + tool_call_id="1", + content='{"emails": "alice@x.com, bob@x.com, carol@x.com"}', + ), + ] + + result = await resolve_recipient_value(recipient, tool_messages=messages) + + assert result is not None + assert result.type == TaskRecipientType.WORKLOAD + assert result.values == ["alice@x.com", "bob@x.com", "carol@x.com"] + + @pytest.mark.asyncio + async def test_workload_with_single_group_name_string(self): + recipient = ToolOutputRecipient( + type=AgentEscalationRecipientType.WORKLOAD, + source="toolOutput", + toolName="findGroup", + outputPath="groupName", + ) + messages = [ + ToolMessage( + name="findGroup", + tool_call_id="1", + content='{"groupName": "Support Team"}', + ), + ] + + result = await resolve_recipient_value(recipient, tool_messages=messages) + + assert result == TaskRecipient( + value="Support Team", + values=["Support Team"], + type=TaskRecipientType.WORKLOAD, + ) + + @pytest.mark.asyncio + async def test_round_robin_with_single_group_name(self): + recipient = ToolOutputRecipient( + type=AgentEscalationRecipientType.ROUND_ROBIN, + source="toolOutput", + toolName="findGroup", + outputPath="groupName", + ) + messages = [ + ToolMessage( + name="findGroup", + tool_call_id="1", + content='{"groupName": "Support Team"}', + ), + ] + + result = await resolve_recipient_value(recipient, tool_messages=messages) + + assert result == TaskRecipient( + value="Support Team", + values=["Support Team"], + type=TaskRecipientType.ROUND_ROBIN, + ) + + @pytest.mark.asyncio + async def test_user_with_string_value(self): + recipient = ToolOutputRecipient( + type=AgentEscalationRecipientType.USER_ID, + source="toolOutput", + toolName="findUser", + outputPath="userId", + ) + messages = [ + ToolMessage( + name="findUser", + tool_call_id="1", + content='{"userId": "user-123"}', + ), + ] + + result = await resolve_recipient_value(recipient, tool_messages=messages) + + assert result == TaskRecipient(value="user-123", type=TaskRecipientType.USER_ID) + + @pytest.mark.asyncio + async def test_raises_when_tool_not_called(self): + recipient = ToolOutputRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + source="toolOutput", + toolName="API workflow A", + outputPath="includeEmails", + ) + with pytest.raises(ValueError, match="has not been called yet"): + await resolve_recipient_value(recipient, tool_messages=[]) + + @pytest.mark.asyncio + async def test_raises_when_resolved_value_is_empty_list(self): + recipient = ToolOutputRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + source="toolOutput", + toolName="A", + outputPath="emails", + ) + messages = [ + ToolMessage(name="A", tool_call_id="1", content='{"emails": []}'), + ] + with pytest.raises(ValueError, match="empty list"): + await resolve_recipient_value(recipient, tool_messages=messages) + + +class TestResolveChannelRecipientsWithToolOutput: + """Tests for resolve_channel_recipients with tool-output bindings.""" + + @pytest.mark.asyncio + async def test_tool_output_recipient_delegates_to_resolver(self): + recipient = ToolOutputRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + source="toolOutput", + toolName="A", + outputPath="emails", + ) + messages = [ + ToolMessage( + name="A", + tool_call_id="1", + content='{"emails": ["a@b.com", "c@d.com"]}', + ), + ] + + result = await resolve_channel_recipients([recipient], tool_messages=messages) + + assert result is not None + assert result.values == ["a@b.com", "c@d.com"] + + @pytest.mark.asyncio + async def test_tool_output_takes_precedence_over_custom_assignees_aggregation(self): + # Even if subsequent recipients are CustomAssigneesRecipient, a leading + # ToolOutputRecipient owns the entire channel resolution. + recipients = [ + ToolOutputRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + source="toolOutput", + toolName="A", + outputPath="emails", + ), + CustomAssigneesRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + value="manual@x.com", + ), + ] + messages = [ + ToolMessage( + name="A", + tool_call_id="1", + content='{"emails": ["resolved@x.com"]}', + ), + ] + + result = await resolve_channel_recipients(recipients, tool_messages=messages) + + assert result is not None + # Tool-output wins; manual recipient is ignored. + assert result.values == ["resolved@x.com"] + + +class TestEscalationToolDescriptionAugmentation: + """Tests for the LLM-facing description hint added when tool-output bindings exist.""" + + def _make_channel(self, recipients: list) -> AgentEscalationChannel: + return AgentEscalationChannel( + id="ch-1", + type="actionCenter", + name="ch", + description="", + inputSchema={"type": "object", "properties": {}}, + outputSchema={"type": "object", "properties": {}}, + recipients=recipients, + properties=AgentEscalationChannelProperties( + appName="my-app", + resourceKey="rk", + folderName=None, + appVersion=1, + isActionableMessageEnabled=False, + actionableMessageMetaData=None, + ), + ) + + def _make_resource( + self, channel: AgentEscalationChannel + ) -> AgentEscalationResourceConfig: + return AgentEscalationResourceConfig( + resourceType="escalation", + id="esc-1", + name="approve_expense", + description="Escalate an expense for approval.", + channels=[channel], + ) + + def test_description_unchanged_when_no_tool_output_bindings(self): + channel = self._make_channel( + recipients=[ + StandardRecipient( + type=AgentEscalationRecipientType.USER_ID, + value="u1", + displayName="User 1", + ) + ] + ) + tool = create_escalation_tool(self._make_resource(channel)) + assert tool.description == "Escalate an expense for approval." + assert "Recipient routing notes" not in tool.description + + def test_description_includes_dependency_hint_for_tool_output_binding(self): + channel = self._make_channel( + recipients=[ + ToolOutputRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + source="toolOutput", + toolName="API workflow A", + outputPath="includeEmails", + ) + ] + ) + tool = create_escalation_tool(self._make_resource(channel)) + assert "Recipient routing notes" in tool.description + assert "API workflow A" in tool.description + assert "includeEmails" in tool.description + + def test_description_deduplicates_repeated_tool_dependencies(self): + channel = self._make_channel( + recipients=[ + ToolOutputRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + source="toolOutput", + toolName="A", + outputPath="emails", + ), + ToolOutputRecipient( + type=AgentEscalationRecipientType.CUSTOM_ASSIGNEES, + source="toolOutput", + toolName="A", + outputPath="emails", + ), + ] + ) + tool = create_escalation_tool(self._make_resource(channel)) + # The hint mentions tool A exactly once. + assert tool.description.count("`A`") == 1