Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
from uipath.agent.models.agent import (
AgentEscalationRecipient,
AssetRecipient,
CustomAssigneesRecipient,
RoundRobinRecipient,
StandardRecipient,
WorkloadRecipient,
)
from uipath.platform import UiPath
from uipath.platform.action_center.tasks import Task, TaskRecipient
Expand Down Expand Up @@ -140,6 +143,12 @@ async def _create_task_node(
metadata["escalation_data"]["assigned_to"] = (
task_recipient.value if task_recipient else None
)
elif isinstance(self.recipient, (WorkloadRecipient, RoundRobinRecipient)):
metadata["escalation_data"]["assigned_to"] = self.recipient.display_name
elif isinstance(self.recipient, CustomAssigneesRecipient):
metadata["escalation_data"]["assigned_to"] = (
self.recipient.display_name or self.recipient.value
)

# Validate message count based on execution stage
_validate_message_count(state, execution_stage)
Expand Down
266 changes: 262 additions & 4 deletions src/uipath_langchain/agent/tools/escalation_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import logging
import os
from enum import Enum
from typing import Any, Literal
from typing import Any, Literal, Sequence

from langchain_core.messages import BaseMessage, ToolMessage
from langchain_core.messages.tool import ToolCall
from langchain_core.tools import BaseTool, StructuredTool
from pydantic import BaseModel
Expand All @@ -17,8 +18,12 @@
ArgumentEmailRecipient,
ArgumentGroupNameRecipient,
AssetRecipient,
CustomAssigneesRecipient,
LowCodeAgentDefinition,
RoundRobinRecipient,
StandardRecipient,
ToolOutputRecipient,
WorkloadRecipient,
)
from uipath.agent.utils.text_tokens import safe_get_nested
from uipath.eval.mocks import mockable
Expand Down Expand Up @@ -66,11 +71,137 @@
END = "end"


_logger = logging.getLogger(__name__)


def _extract_tool_output_value(

Check failure on line 77 in src/uipath_langchain/agent/tools/escalation_tool.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 22 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=UiPath_uipath-langchain-python&issues=AZ5Fh5-sbZI0YepzRHrR&open=AZ5Fh5-sbZI0YepzRHrR&pullRequest=867
tool_messages: Sequence[BaseMessage],
tool_name: str,
output_path: str,
) -> Any:
"""Walk the agent's message history backwards for the latest ToolMessage matching
``tool_name``, parse its content as JSON, and return the field at ``output_path``.

``output_path`` is a top-level field name (v1). If the path is empty, the whole
parsed content is returned. Raises ``ValueError`` (fail-loud) when the tool was
never called or the path doesn't exist.
"""
for msg in reversed(tool_messages):
if isinstance(msg, ToolMessage) and getattr(msg, "name", None) == tool_name:
content = msg.content
# ToolMessage content is typically a string (the stringified tool output).
# If it's already structured, use it as-is.
parsed: Any
if isinstance(content, str):
try:
parsed = json.loads(content)
except (json.JSONDecodeError, ValueError):

Check warning on line 98 in src/uipath_langchain/agent/tools/escalation_tool.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this redundant Exception class; it derives from another which is already caught.

See more on https://sonarcloud.io/project/issues?id=UiPath_uipath-langchain-python&issues=AZ5Fh5-sbZI0YepzRHrS&open=AZ5Fh5-sbZI0YepzRHrS&pullRequest=867
parsed = content
else:
parsed = content

if not output_path:
return parsed

if isinstance(parsed, dict):
if output_path not in parsed:
raise ValueError(
f"Tool '{tool_name}' output does not contain field "
f"'{output_path}'. Available fields: {list(parsed.keys())}."
)
return parsed[output_path]

raise ValueError(
f"Tool '{tool_name}' output is not a JSON object — cannot extract "
f"field '{output_path}' from output of type {type(parsed).__name__}."
)

raise ValueError(
f"Tool '{tool_name}' has not been called yet; cannot resolve recipient "
f"binding (expected tool output field '{output_path}'). Make sure the agent "
f"invokes '{tool_name}' before this escalation."
)


def _build_tool_output_task_recipient(
recipient_type: AgentEscalationRecipientType,
value: Any,
) -> TaskRecipient | None:
"""Map an extracted tool-output value to a TaskRecipient appropriate for the
target criteria type. Lists of emails go through the Workload path (matching
CustomAssignees semantics); single strings go through the type-specific path.
"""
if isinstance(value, list):
# Filter to truthy strings — tool outputs may contain nulls/empty entries.
emails = [str(v) for v in value if v]
if not emails:
raise ValueError(
f"Tool-output recipient resolved to an empty list for criteria "
f"{recipient_type.value}."
)
return TaskRecipient(
value=emails[0],
values=emails,
type=TaskRecipientType.WORKLOAD,
)

value_str = str(value) if value is not None else ""
if not value_str:
raise ValueError(
f"Tool-output recipient resolved to an empty value for criteria "
f"{recipient_type.value}."
)

if recipient_type == AgentEscalationRecipientType.USER_ID:
return TaskRecipient(value=value_str, type=TaskRecipientType.USER_ID)
if recipient_type == AgentEscalationRecipientType.GROUP_ID:
return TaskRecipient(value=value_str, type=TaskRecipientType.GROUP_ID)
if recipient_type == AgentEscalationRecipientType.WORKLOAD:
return TaskRecipient(
value=value_str,
values=[value_str],
type=TaskRecipientType.WORKLOAD,
)
if recipient_type == AgentEscalationRecipientType.ROUND_ROBIN:
return TaskRecipient(
value=value_str,
values=[value_str],
type=TaskRecipientType.ROUND_ROBIN,
)
# CustomAssignees with a single string value — treat as comma-separated emails.
if recipient_type == AgentEscalationRecipientType.CUSTOM_ASSIGNEES:
emails = [s.strip() for s in value_str.split(",") if s.strip()]
if not emails:
return None
return TaskRecipient(
value=emails[0],
values=emails,
type=TaskRecipientType.WORKLOAD,
)
return None


async def resolve_recipient_value(

Check failure on line 184 in src/uipath_langchain/agent/tools/escalation_tool.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 22 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=UiPath_uipath-langchain-python&issues=AZ5Fh5-sbZI0YepzRHrT&open=AZ5Fh5-sbZI0YepzRHrT&pullRequest=867
recipient: AgentEscalationRecipient,
input_args: dict[str, Any] | None = None,
tool_messages: Sequence[BaseMessage] | None = None,
) -> TaskRecipient | None:
"""Resolve recipient value based on recipient type."""
"""Resolve recipient value based on recipient type.

``tool_messages`` is the agent's full message history (passed through from
the escalation wrapper). It's only consulted for ``ToolOutputRecipient``;
other recipient types ignore it.
"""
if isinstance(recipient, ToolOutputRecipient):
# Fail loud: a misconfigured tool-output binding should not silently
# create an unassigned task.
value = _extract_tool_output_value(
tool_messages or [],
recipient.tool_name,
recipient.output_path,
)
return _build_tool_output_task_recipient(recipient.type, value)

if isinstance(recipient, AssetRecipient):
value = await resolve_asset(recipient.asset_name, get_execution_folder_path())
type = None
Expand Down Expand Up @@ -100,6 +231,34 @@
value=value, type=TaskRecipientType.GROUP_NAME, displayName=value
)

if isinstance(recipient, WorkloadRecipient):
# Action Center expects the group NAME in assigneeNamesOrEmails;
# `value` on the agent model is the group identifier, `display_name` is the name.
return TaskRecipient(
value=recipient.display_name,
type=TaskRecipientType.WORKLOAD,
displayName=recipient.display_name,
)

if isinstance(recipient, RoundRobinRecipient):
return TaskRecipient(
value=recipient.display_name,
type=TaskRecipientType.ROUND_ROBIN,
displayName=recipient.display_name,
)

if isinstance(recipient, CustomAssigneesRecipient):
# A single CustomAssignees recipient becomes a one-element Workload assignment.
# Multi-assignee aggregation across recipients[] is handled by resolve_channel_recipients.
if not recipient.value:
return None
return TaskRecipient(
value=recipient.value,
values=[recipient.value],
type=TaskRecipientType.WORKLOAD,
displayName=recipient.display_name,
)

if isinstance(recipient, StandardRecipient):
type = TaskRecipientType(recipient.type)
if recipient.type == AgentEscalationRecipientType.USER_EMAIL:
Expand All @@ -111,6 +270,50 @@
return None


async def resolve_channel_recipients(
recipients: list[AgentEscalationRecipient],
input_args: dict[str, Any] | None = None,
tool_messages: Sequence[BaseMessage] | None = None,
) -> TaskRecipient | None:
"""Resolve a channel's full recipients list into a single TaskRecipient.

For ``CustomAssignees`` channels — which carry one recipient per assignee email —
all values are collected into a single Workload assignment with the full email list.
For all other types only the first recipient is used (the channel always has one).

``tool_messages`` is the agent's message history, threaded through to support
``ToolOutputRecipient`` resolution.
"""
if not recipients:
return None

# Tool-output binding takes precedence over per-type aggregation: if the first
# recipient is a tool-output, we delegate to the resolver and let it figure
# out the right TaskRecipient shape for the criteria type.
if isinstance(recipients[0], ToolOutputRecipient):
return await resolve_recipient_value(
recipients[0], input_args=input_args, tool_messages=tool_messages
)

if isinstance(recipients[0], CustomAssigneesRecipient):
emails = [
r.value
for r in recipients
if isinstance(r, CustomAssigneesRecipient) and r.value
]
if not emails:
return None
return TaskRecipient(
value=emails[0],
values=emails,
type=TaskRecipientType.WORKLOAD,
)

return await resolve_recipient_value(
recipients[0], input_args=input_args, tool_messages=tool_messages
)


async def resolve_asset(asset_name: str, folder_path: str | None) -> str | None:
"""Retrieve asset value."""
try:
Expand Down Expand Up @@ -282,8 +485,17 @@
agent_input: dict[str, Any] = (
tool.metadata.get("agent_input") if tool.metadata else None
) or {}
# Tool-output recipient bindings resolve by walking the agent's message
# history. The wrapper stashes them in metadata before invoking the tool.
tool_messages: list[BaseMessage] = (
tool.metadata.get("agent_messages") if tool.metadata else None
) or []
recipient: TaskRecipient | None = (
await resolve_recipient_value(channel.recipients[0], input_args=agent_input)
await resolve_channel_recipients(
channel.recipients,
input_args=agent_input,
tool_messages=tool_messages,
)
if channel.recipients
else None
)
Expand Down Expand Up @@ -452,6 +664,18 @@
k: v for k, v in state_dict.items() if k not in internal_fields
}

# Expose the raw message history to the tool fn so it can resolve
# `ToolOutputRecipient` bindings against prior tool calls. We pull
# directly from `state` (not `state_dict`) so we preserve the original
# message objects (sanitized dicts lose `isinstance(..., ToolMessage)`).
# `state` may be either a Pydantic model (runtime) or a plain dict (tests).
raw_messages = (
getattr(state, "messages", None)
if not isinstance(state, dict)
else state.get("messages")
)
tool.metadata["agent_messages"] = list(raw_messages or [])

tool.metadata["_call_id"] = call.get("id")
tool.metadata["_call_args"] = dict(call.get("args", {}))

Expand All @@ -478,9 +702,43 @@
"assigned_to": result.get("assigned_to"),
}

# Augment the description so the LLM understands tool-output recipient
# dependencies: when a recipient is bound to the output of a specific tool,
# the LLM must call that tool first before invoking this escalation. Without
# this hint the dependency is invisible to the LLM (it doesn't see the
# recipient binding, only the tool's input schema).
description = resource.description
tool_output_deps = [
(r.tool_name, r.output_path)
for r in channel.recipients
if isinstance(r, ToolOutputRecipient)
]
if tool_output_deps:
# Deduplicate while preserving order.
seen: set[tuple[str, str]] = set()
unique_deps: list[tuple[str, str]] = []
for dep in tool_output_deps:
if dep not in seen:
seen.add(dep)
unique_deps.append(dep)
dep_lines = "\n".join(
f"- Output of tool `{tn}` (field `{op}`)"
if op
else f"- Output of tool `{tn}`"
for tn, op in unique_deps
)
description = (
f"{description or ''}\n\n"
"**Recipient routing notes:** this escalation's task assignment is "
"derived from the output of upstream tools. Before invoking this "
"escalation, make sure the following tools have been called and their "
"outputs are available in the agent's tool message history:\n"
f"{dep_lines}"
).strip()

tool = StructuredToolWithArgumentProperties(
name=tool_name,
description=resource.description,
description=description,
args_schema=input_model,
output_type=output_model,
coroutine=escalation_tool_fn,
Expand Down
Loading
Loading