Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/uipath_langchain/agent/tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
"""Tool creation and management for LowCode agents."""

from .context_tool import create_context_tool
from .datafabric_tool import (
fetch_entity_schemas,
)
from .escalation_tool import create_escalation_tool
from .extraction_tool import create_ixp_extraction_tool
from .integration_tool import create_integration_tool
Expand All @@ -30,7 +27,6 @@
"create_escalation_tool",
"create_ixp_extraction_tool",
"create_ixp_escalation_tool",
"fetch_entity_schemas",
"UiPathToolNode",
"ToolWrapperMixin",
"wrap_tools_with_error_handling",
Expand Down
2 changes: 0 additions & 2 deletions src/uipath_langchain/agent/tools/datafabric_tool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

from .datafabric_tool import (
create_datafabric_query_tool,
fetch_entity_schemas,
)

__all__ = [
"create_datafabric_query_tool",
"fetch_entity_schemas",
]
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from langgraph.graph.message import add_messages
from langgraph.graph.state import CompiledStateGraph
from pydantic import BaseModel
from uipath.platform.entities import Entity, QueryRoutingOverrideContext
from uipath.platform.entities import EntitiesService, Entity

from ..datafabric_query_tool import DataFabricQueryTool
from . import datafabric_prompt_builder
Expand All @@ -42,18 +42,14 @@ class DataFabricSubgraphState(BaseModel):
class QueryExecutor:
"""Executes SQL queries against Data Fabric."""

def __init__(self, routing_context: QueryRoutingOverrideContext) -> None:
from uipath.platform import UiPath

self._sdk = UiPath()
self._routing_context = routing_context
def __init__(self, entities_service: EntitiesService) -> None:
self._entities = entities_service

async def __call__(self, sql_query: str) -> dict[str, Any]:
logger.debug("execute_sql called with SQL: %s", sql_query)
try:
records = await self._sdk.entities.query_entity_records_async(
records = await self._entities.query_entity_records_async(
sql_query=sql_query,
routing_context=self._routing_context,
)
return {
"records": records,
Expand Down Expand Up @@ -81,14 +77,14 @@ def __init__(
self,
llm: BaseChatModel,
entities: list[Entity],
routing_context: QueryRoutingOverrideContext,
entities_service: EntitiesService,
max_iterations: int = 25,
resource_description: str = "",
base_system_prompt: str = "",
) -> None:
self._max_iterations = max_iterations
self._execute_sql_tool = self._create_execute_sql_tool(
routing_context, entities
entities_service, entities
)
self._system_message = SystemMessage(
content=datafabric_prompt_builder.build(
Expand Down Expand Up @@ -175,7 +171,7 @@ def router(self, state: DataFabricSubgraphState) -> str:

def _create_execute_sql_tool(
self,
routing_context: QueryRoutingOverrideContext,
entities_service: EntitiesService,
entities: list[Entity],
) -> BaseTool:
"""Create the inner ``execute_sql`` tool."""
Expand All @@ -188,15 +184,15 @@ def _create_execute_sql_tool(
"tables and columns. Retry with a corrected query on errors."
),
args_schema=DataFabricExecuteSqlInput,
coroutine=QueryExecutor(routing_context),
coroutine=QueryExecutor(entities_service),
metadata={"tool_type": "datafabric_sql"},
)

@staticmethod
def create(
llm: BaseChatModel,
entities: list[Entity],
routing_context: QueryRoutingOverrideContext,
entities_service: EntitiesService,
max_iterations: int = 25,
resource_description: str = "",
base_system_prompt: str = "",
Expand All @@ -205,7 +201,7 @@ def create(
graph = DataFabricGraph(
llm,
entities,
routing_context,
entities_service,
max_iterations,
resource_description,
base_system_prompt,
Expand Down
74 changes: 19 additions & 55 deletions src/uipath_langchain/agent/tools/datafabric_tool/datafabric_tool.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
"""Data Fabric tool creation and resource detection.

This module provides:
1. An agentic ``query_datafabric`` tool with inner LLM sub-graph
2. Entity schema fetching from the Data Fabric API
This module provides an agentic ``query_datafabric`` tool with an inner
LLM sub-graph.

The tool accepts natural language queries, runs an inner LangGraph
sub-graph for SQL generation + execution + self-correction, and
Expand All @@ -21,7 +20,7 @@
from langchain_core.tools import BaseTool
from langgraph.graph.state import CompiledStateGraph
from uipath.agent.models.agent import AgentContextResourceConfig
from uipath.platform.entities import Entity, EntityRouting, QueryRoutingOverrideContext
from uipath.platform.entities import DataFabricEntityItem

from ..base_uipath_structured_tool import BaseUiPathStructuredTool
from .models import DataFabricQueryInput
Expand All @@ -34,28 +33,27 @@
class DataFabricTextQueryHandler:
"""Manages lazy initialization and invocation of the Data Fabric sub-graph.

On first call, fetches entity schemas from the DF API and compiles
the inner LangGraph sub-graph. Subsequent calls reuse the cached graph.
On first call, resolves entity schemas and routing via the platform
layer and compiles the inner LangGraph sub-graph. Subsequent calls
reuse the cached graph.
"""

def __init__(
self,
entity_identifiers: list[str],
routing_context: QueryRoutingOverrideContext,
entity_set: list[DataFabricEntityItem],
llm: BaseChatModel,
resource_description: str = "",
base_system_prompt: str = "",
) -> None:
self._entity_identifiers = entity_identifiers
self._routing_context = routing_context
self._entity_set = entity_set
self._llm = llm
self._resource_description = resource_description
self._base_system_prompt = base_system_prompt
self._compiled: CompiledStateGraph[Any] | None = None
self._init_lock = asyncio.Lock()

async def _ensure_datafabric_graph(self) -> CompiledStateGraph[Any]:
"""Lazy-init: fetch schemas + build sub-graph on first call.
"""Lazy-init: resolve entities + build sub-graph on first call.

Uses asyncio.Lock because the outer agent supports parallel
tool calls — two concurrent invocations could race on first call.
Expand All @@ -67,18 +65,23 @@ async def _ensure_datafabric_graph(self) -> CompiledStateGraph[Any]:
if self._compiled is not None:
return self._compiled

from uipath.platform import UiPath

from .datafabric_subgraph import DataFabricGraph

entities = await fetch_entity_schemas(self._entity_identifiers)
if not entities:
sdk = UiPath()
resolution = await sdk.entities.resolve_entity_set_async(
self._entity_set
)
if not resolution.entities:
raise ValueError(
"No Data Fabric entity schemas could be fetched. "
"Check entity identifiers and permissions."
)
self._compiled = DataFabricGraph.create(
llm=self._llm,
entities=entities,
routing_context=self._routing_context,
entities=resolution.entities,
entities_service=resolution.entities_service,
resource_description=self._resource_description,
base_system_prompt=self._base_system_prompt,
)
Expand All @@ -98,44 +101,6 @@ async def __call__(self, user_query: str) -> str:
return "Unable to generate an answer from the available data."


async def _fetch_single_entity(sdk: Any, identifier: str) -> Entity | None:
"""Fetch a single entity by identifier, returning None on failure."""
try:
entity = await sdk.entities.retrieve_async(identifier)
logger.info("Fetched schema for entity '%s'", entity.display_name)
return entity
except Exception:
logger.warning("Failed to fetch entity '%s'", identifier, exc_info=True)
return None


async def fetch_entity_schemas(entity_identifiers: list[str]) -> list[Entity]:
"""Fetch entity metadata from Data Fabric concurrently."""
from uipath.platform import UiPath

sdk = UiPath()
results = await asyncio.gather(
*[_fetch_single_entity(sdk, eid) for eid in entity_identifiers]
)
return [e for e in results if e is not None]


def _build_routing_context(
resource: AgentContextResourceConfig,
) -> QueryRoutingOverrideContext:
"""Build query routing context from entity set items.

Maps each entity to its folder so the backend resolves
entities at folder level instead of tenant level.
"""
return QueryRoutingOverrideContext(
entity_routings=[
EntityRouting(entity_name=item.name, folder_id=item.folder_id)
for item in (resource.entity_set or [])
]
)


def create_datafabric_query_tool(
resource: AgentContextResourceConfig,
llm: BaseChatModel,
Expand All @@ -153,8 +118,7 @@ def create_datafabric_query_tool(
"""
config = agent_config or {}
handler = DataFabricTextQueryHandler(
entity_identifiers=resource.datafabric_entity_identifiers,
routing_context=_build_routing_context(resource),
entity_set=resource.entity_set or [],
llm=llm,
resource_description=resource.description or "",
base_system_prompt=config.get(BASE_SYSTEM_PROMPT, ""),
Expand Down
Loading