From 702ffc802fff44423be2b7b88cef44d58deb9c99 Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Fri, 22 May 2026 14:11:23 +0300 Subject: [PATCH] fix: auto-resume conversational llamaindex agents --- packages/uipath-llamaindex/pyproject.toml | 2 +- .../src/uipath_llamaindex/runtime/factory.py | 18 ++ .../src/uipath_llamaindex/runtime/runtime.py | 23 ++- .../runtime/test_factory_is_conversational.py | 96 +++++++++ .../runtime/test_runtime_resume_decision.py | 184 ++++++++++++++++++ packages/uipath-llamaindex/uv.lock | 4 +- 6 files changed, 322 insertions(+), 5 deletions(-) create mode 100644 packages/uipath-llamaindex/tests/runtime/test_factory_is_conversational.py create mode 100644 packages/uipath-llamaindex/tests/runtime/test_runtime_resume_decision.py diff --git a/packages/uipath-llamaindex/pyproject.toml b/packages/uipath-llamaindex/pyproject.toml index 4f81c206..5bfe8348 100644 --- a/packages/uipath-llamaindex/pyproject.toml +++ b/packages/uipath-llamaindex/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-llamaindex" -version = "0.5.13" +version = "0.5.14" description = "Python SDK that enables developers to build and deploy LlamaIndex agents to the UiPath Cloud Platform" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/factory.py b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/factory.py index d9535faf..54253afa 100644 --- a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/factory.py +++ b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/factory.py @@ -1,7 +1,9 @@ """Factory for creating LlamaIndex runtimes from llama_index.json configuration.""" import asyncio +import json import os +from functools import cached_property from openinference.instrumentation.llama_index import ( LlamaIndexInstrumentor, @@ -109,6 +111,21 @@ def _load_config(self) -> LlamaIndexConfig: self._config = LlamaIndexConfig() return self._config + @cached_property + def is_conversational(self) -> bool: + """Read runtimeOptions.isConversational from uipath.json. + + Returns False when the file is absent, unreadable, or the flag is + missing. + """ + try: + with open(self.context.config_path, "r") as f: + config = json.load(f) + except (OSError, json.JSONDecodeError): + return False + + return bool(config.get("runtimeOptions", {}).get("isConversational", False)) + async def _load_workflow(self, entrypoint: str) -> Workflow: """ Load a workflow for the given entrypoint. @@ -254,6 +271,7 @@ async def _create_runtime_instance( entrypoint=entrypoint, storage=storage, debug_mode=self.context.command == "debug", + is_conversational=self.is_conversational, ) trigger_manager = UiPathResumeTriggerHandler() diff --git a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py index 9951afa1..3fb31ab5 100644 --- a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py +++ b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py @@ -65,6 +65,7 @@ def __init__( entrypoint: str | None = None, storage: SqliteResumableStorage | None = None, debug_mode: bool = False, + is_conversational: bool = False, ): """ Initialize the runtime. @@ -73,13 +74,21 @@ def __init__( workflow: The Workflow to execute runtime_id: Unique identifier for this runtime instance entrypoint: Optional entrypoint name (for schema generation) + storage: Optional storage backend for workflow context + debug_mode: Whether to enable breakpoint injection + is_conversational: When True, the workflow accumulates state + across turns. Any execution that finds prior stored context + continues from it; only the first turn (no stored context) + starts a fresh workflow run. """ self.workflow: Workflow = workflow self.runtime_id: str = runtime_id or "default" self.entrypoint: str | None = entrypoint self.storage: SqliteResumableStorage | None = storage self.debug_mode: bool = debug_mode + self.is_conversational: bool = is_conversational self._context: Context | None = None + self._has_prior_state: bool = False if debug_mode: inject_breakpoints(self.workflow) @@ -139,10 +148,13 @@ async def _run_workflow( Core workflow execution logic used by both execute() and stream(). """ workflow_input = UiPathChatMessagesMapper.map_input(input or {}) - is_resuming = bool(options and options.resume) self._context = await self._load_context() + is_resuming = bool(options and options.resume) or ( + self.is_conversational and self._has_prior_state + ) + if is_resuming: handler: WorkflowHandler = self.workflow.run(ctx=self._context) if workflow_input: @@ -329,7 +341,13 @@ def _create_runtime_error(self, e: Exception) -> UiPathLlamaIndexRuntimeError: ) async def _load_context(self) -> Context: - """Load the workflow context from storage.""" + """Load the workflow context from storage, or create a fresh one. + + Sets ``self._has_prior_state`` to indicate whether the returned + context was restored from storage (True) or freshly created (False). + """ + self._has_prior_state = False + if not self.storage: return Context(self.workflow) @@ -339,6 +357,7 @@ async def _load_context(self) -> Context: from workflows.context.serializers import JsonPickleSerializer serializer = JsonPickleSerializer() + self._has_prior_state = True return Context.from_dict( self.workflow, context_dict, diff --git a/packages/uipath-llamaindex/tests/runtime/test_factory_is_conversational.py b/packages/uipath-llamaindex/tests/runtime/test_factory_is_conversational.py new file mode 100644 index 00000000..a71cd112 --- /dev/null +++ b/packages/uipath-llamaindex/tests/runtime/test_factory_is_conversational.py @@ -0,0 +1,96 @@ +"""Tests for `UiPathLlamaIndexRuntimeFactory.is_conversational`.""" + +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +from uipath_llamaindex.runtime.factory import UiPathLlamaIndexRuntimeFactory + + +def _make_factory(config_path: str) -> UiPathLlamaIndexRuntimeFactory: + """Build a factory with a minimal context pointing at the given config path. + + We bypass `__init__` to avoid the instrumentation side effects in + `_setup_instrumentation`, which are not relevant to this property. + """ + factory = UiPathLlamaIndexRuntimeFactory.__new__(UiPathLlamaIndexRuntimeFactory) + factory.context = MagicMock() + factory.context.config_path = config_path + return factory + + +def test_is_conversational_true_when_flag_set(tmp_path: Path): + config_path = tmp_path / "uipath.json" + config_path.write_text(json.dumps({"runtimeOptions": {"isConversational": True}})) + + factory = _make_factory(str(config_path)) + + assert factory.is_conversational is True + + +def test_is_conversational_false_when_flag_set_false(tmp_path: Path): + config_path = tmp_path / "uipath.json" + config_path.write_text(json.dumps({"runtimeOptions": {"isConversational": False}})) + + factory = _make_factory(str(config_path)) + + assert factory.is_conversational is False + + +def test_is_conversational_false_when_flag_missing(tmp_path: Path): + config_path = tmp_path / "uipath.json" + config_path.write_text(json.dumps({"runtimeOptions": {}})) + + factory = _make_factory(str(config_path)) + + assert factory.is_conversational is False + + +def test_is_conversational_false_when_runtime_options_missing(tmp_path: Path): + config_path = tmp_path / "uipath.json" + config_path.write_text(json.dumps({})) + + factory = _make_factory(str(config_path)) + + assert factory.is_conversational is False + + +def test_is_conversational_false_when_file_missing(tmp_path: Path): + factory = _make_factory(str(tmp_path / "does-not-exist.json")) + + assert factory.is_conversational is False + + +def test_is_conversational_false_when_file_unparseable(tmp_path: Path): + config_path = tmp_path / "uipath.json" + config_path.write_text("{ not json") + + factory = _make_factory(str(config_path)) + + assert factory.is_conversational is False + + +def test_is_conversational_false_when_file_unreadable(tmp_path: Path): + config_path = tmp_path / "uipath.json" + config_path.write_text(json.dumps({"runtimeOptions": {"isConversational": True}})) + + factory = _make_factory(str(config_path)) + + with patch("builtins.open", side_effect=PermissionError("denied")): + assert factory.is_conversational is False + + +def test_is_conversational_is_cached(tmp_path: Path): + """`cached_property` should only read the file once across accesses.""" + config_path = tmp_path / "uipath.json" + config_path.write_text(json.dumps({"runtimeOptions": {"isConversational": True}})) + + factory = _make_factory(str(config_path)) + + # First access reads the file. + assert factory.is_conversational is True + + # If we now delete the file, a re-read would return False — but the cached + # value must still be True. + config_path.unlink() + assert factory.is_conversational is True diff --git a/packages/uipath-llamaindex/tests/runtime/test_runtime_resume_decision.py b/packages/uipath-llamaindex/tests/runtime/test_runtime_resume_decision.py new file mode 100644 index 00000000..eba9f27f --- /dev/null +++ b/packages/uipath-llamaindex/tests/runtime/test_runtime_resume_decision.py @@ -0,0 +1,184 @@ +"""Tests for the resume-vs-start branch selection in `UiPathLlamaIndexRuntime`. + +Covers three scenarios: + 1. First turn with no stored context starts via `start_event`. + 2. Subsequent turn with stored context resumes and sends `HumanResponseEvent`, + even when `options.resume` is False, provided `is_conversational=True`. + 3. Non-conversational agents keep the existing behavior: without + `options.resume`, a fresh `start_event` is used even when context exists. +""" + +from typing import Any, AsyncIterator +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from uipath.runtime import UiPathExecuteOptions +from workflows.events import HumanResponseEvent + +from uipath_llamaindex.runtime.runtime import UiPathLlamaIndexRuntime + + +class _FakeStartEvent: + """Stand-in for the workflow's start event class.""" + + def __init__(self, **kwargs: Any) -> None: + self.kwargs = kwargs + + +class _FakeHandler: + """Stand-in for WorkflowHandler: empty event stream, awaitable, mockable ctx.""" + + def __init__(self) -> None: + self.ctx = MagicMock() + self.ctx.send_event = MagicMock() + self.cancel_run = AsyncMock() + + def stream_events(self, *, expose_internal: bool = False) -> AsyncIterator[Any]: + async def _gen() -> AsyncIterator[Any]: + if False: # pragma: no cover - typing only + yield None + + return _gen() + + def __await__(self): + async def _result(): + return {} + + return _result().__await__() + + +def _make_handler() -> _FakeHandler: + return _FakeHandler() + + +def _make_workflow() -> MagicMock: + workflow = MagicMock() + workflow._start_event_class = _FakeStartEvent + workflow.run = MagicMock(return_value=_make_handler()) + return workflow + + +def _make_runtime( + workflow: MagicMock, + *, + is_conversational: bool, + stored_context: dict[str, Any] | None, +) -> UiPathLlamaIndexRuntime: + storage = MagicMock() + storage.load_context = AsyncMock(return_value=stored_context) + storage.save_context = AsyncMock() + storage.get_value = AsyncMock(return_value=None) + storage.set_value = AsyncMock() + + return UiPathLlamaIndexRuntime( + workflow=workflow, + runtime_id="test-runtime", + storage=storage, + is_conversational=is_conversational, + ) + + +async def _drive( + runtime: UiPathLlamaIndexRuntime, + input: dict[str, Any] | None, + options: UiPathExecuteOptions | None, +) -> None: + """Consume the workflow event stream to completion.""" + async for _ in runtime._run_workflow(input, options, stream_events=False): + pass + + +@pytest.mark.asyncio +async def test_first_turn_no_stored_context_starts_fresh(): + """Conversational agent, turn 1: no prior state -> start_event path.""" + workflow = _make_workflow() + runtime = _make_runtime(workflow, is_conversational=True, stored_context=None) + + with patch.object( + UiPathLlamaIndexRuntime, "_load_context", autospec=True + ) as load_ctx: + # Simulate the real method: no stored context -> fresh Context, flag stays False. + async def fake_load(self): + self._has_prior_state = False + return MagicMock() + + load_ctx.side_effect = fake_load + await _drive(runtime, input={"messages": "hi"}, options=None) + + # Workflow was started with a start_event (not resumed). + assert workflow.run.call_count == 1 + call_kwargs = workflow.run.call_args.kwargs + assert "start_event" in call_kwargs, ( + "first turn must use start_event, got: %s" % call_kwargs + ) + + # No HumanResponseEvent was injected. + sent_events = [ + c.args[0] for c in workflow.run.return_value.ctx.send_event.call_args_list + ] + assert not any(isinstance(e, HumanResponseEvent) for e in sent_events) + + +@pytest.mark.asyncio +async def test_subsequent_turn_with_stored_context_auto_resumes(): + """Conversational agent, turn 2: prior state -> resume without options.resume.""" + workflow = _make_workflow() + runtime = _make_runtime( + workflow, is_conversational=True, stored_context={"some": "state"} + ) + + with patch.object( + UiPathLlamaIndexRuntime, "_load_context", autospec=True + ) as load_ctx: + + async def fake_load(self): + self._has_prior_state = True + return MagicMock() + + load_ctx.side_effect = fake_load + await _drive( + runtime, + input={"messages": "follow-up"}, + options=None, # no explicit resume signal + ) + + # Workflow was resumed with ctx only — no start_event. + assert workflow.run.call_count == 1 + call_kwargs = workflow.run.call_args.kwargs + assert "start_event" not in call_kwargs, ( + "conversational resume must omit start_event, got: %s" % call_kwargs + ) + assert "ctx" in call_kwargs + + # A HumanResponseEvent was injected with the new input. + sent_events = [ + c.args[0] for c in workflow.run.return_value.ctx.send_event.call_args_list + ] + assert any(isinstance(e, HumanResponseEvent) for e in sent_events) + + +@pytest.mark.asyncio +async def test_non_conversational_with_stored_context_still_starts_fresh(): + """Non-conversational agent: stored context alone does not trigger resume.""" + workflow = _make_workflow() + runtime = _make_runtime( + workflow, is_conversational=False, stored_context={"some": "state"} + ) + + with patch.object( + UiPathLlamaIndexRuntime, "_load_context", autospec=True + ) as load_ctx: + + async def fake_load(self): + self._has_prior_state = True + return MagicMock() + + load_ctx.side_effect = fake_load + await _drive(runtime, input={"messages": "hi"}, options=None) + + # Without an explicit resume signal and without is_conversational, we start fresh. + assert workflow.run.call_count == 1 + call_kwargs = workflow.run.call_args.kwargs + assert "start_event" in call_kwargs, ( + "non-conversational agent without options.resume must use start_event" + ) diff --git a/packages/uipath-llamaindex/uv.lock b/packages/uipath-llamaindex/uv.lock index 26f3c9e5..2da9bb5c 100644 --- a/packages/uipath-llamaindex/uv.lock +++ b/packages/uipath-llamaindex/uv.lock @@ -7,7 +7,7 @@ resolution-markers = [ ] [options] -exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for backwards compatibility when using relative exclude-newer values. +exclude-newer = "2026-05-20T10:21:30.5200072Z" exclude-newer-span = "P2D" [options.exclude-newer-package] @@ -3506,7 +3506,7 @@ wheels = [ [[package]] name = "uipath-llamaindex" -version = "0.5.13" +version = "0.5.14" source = { editable = "." } dependencies = [ { name = "aiosqlite" },