Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/packages/core/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ agent_framework/
- **`SessionContext`** - Context object for session-scoped data during agent runs
- **`ContextProvider`** - Base class for context providers (RAG, memory systems)
- **`HistoryProvider`** - Base class for conversation history storage
- **`InMemoryHistoryProvider`** - Built-in session-state history provider for local runs
- **`FileHistoryProvider`** - JSON Lines file-backed history provider storing one file per session with one message record per line

### Skills (`_skills.py`)

Expand Down
2 changes: 2 additions & 0 deletions python/packages/core/agent_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
from ._sessions import (
AgentSession,
ContextProvider,
FileHistoryProvider,
HistoryProvider,
InMemoryHistoryProvider,
SessionContext,
Expand Down Expand Up @@ -318,6 +319,7 @@
"FanInEdgeGroup",
"FanOutEdgeGroup",
"FileCheckpointStorage",
"FileHistoryProvider",
"FinalT",
"FinishReason",
"FinishReasonLiteral",
Expand Down
1 change: 1 addition & 0 deletions python/packages/core/agent_framework/_feature_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ExperimentalFeature(str, Enum):
"""

EVALS = "EVALS"
FILE_HISTORY = "FILE_HISTORY"
SKILLS = "SKILLS"


Expand Down
265 changes: 264 additions & 1 deletion python/packages/core/agent_framework/_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,24 @@
- HistoryProvider: Base class for history storage providers
- AgentSession: Lightweight session state container
- InMemoryHistoryProvider: Built-in in-memory history provider
- FileHistoryProvider: Built-in JSON Lines file history provider
"""

from __future__ import annotations

import asyncio
import copy
import json
import threading
import uuid
import weakref
from abc import abstractmethod
from base64 import urlsafe_b64encode
from collections.abc import Awaitable, Callable, Mapping, Sequence
from typing import TYPE_CHECKING, Any, ClassVar, TypeGuard, cast
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, TypeAlias, TypeGuard, cast

from ._feature_stage import ExperimentalFeature, experimental
from ._middleware import ChatContext, ChatMiddleware
from ._types import AgentResponse, ChatResponse, Message, ResponseStream
from .exceptions import ChatClientInvalidResponseException
Expand All @@ -30,6 +38,17 @@
# Registry of known types for state deserialization
_STATE_TYPE_REGISTRY: dict[str, type] = {}

JsonDumps: TypeAlias = Callable[[Any], str | bytes]
JsonLoads: TypeAlias = Callable[[str | bytes], Any]


def _default_json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False)


def _default_json_loads(value: str | bytes) -> Any:
return json.loads(value)


def _is_middleware_sequence(
middleware: MiddlewareTypes | Sequence[MiddlewareTypes],
Expand Down Expand Up @@ -837,3 +856,247 @@ async def save_messages(
return
existing = state.get("messages", [])
state["messages"] = [*existing, *messages]


@experimental(feature_id=ExperimentalFeature.FILE_HISTORY)
class FileHistoryProvider(HistoryProvider):
"""File-backed history provider that stores one JSON Lines file per session.

Each persisted message is written as a single JSON object per line. The
provider does not serialize full session snapshots into the file. By default
it uses the standard library ``json`` module, but callers can inject
alternative ``dumps`` and ``loads`` callables compatible with the JSON
Lines format.

Security posture:
Persisted history is stored as plaintext JSONL on the local filesystem.
Treat ``storage_path`` as trusted application storage, not as a secret
store. Encoded fallback filenames and resolved-path validation help
prevent path traversal via ``session_id``, but they do not encrypt file
contents or provide cross-process / cross-host locking. Use OS-level
file permissions, trusted directories, and carefully review what agent
or tool output is allowed to be persisted.
"""

DEFAULT_SOURCE_ID: ClassVar[str] = "file_history"
DEFAULT_SESSION_FILE_STEM: ClassVar[str] = "default"
FILE_EXTENSION: ClassVar[str] = ".jsonl"
_FILE_LOCK_STRIPE_COUNT: ClassVar[int] = 64
_ENCODED_SESSION_PREFIX: ClassVar[str] = "~session-"
_FILE_WRITE_LOCKS: ClassVar[tuple[threading.Lock, ...]] = tuple(
threading.Lock() for _ in range(_FILE_LOCK_STRIPE_COUNT)
)
_WINDOWS_RESERVED_FILE_STEMS: ClassVar[frozenset[str]] = frozenset({
"CON",
"PRN",
"AUX",
"NUL",
"COM1",
"COM2",
"COM3",
"COM4",
"COM5",
"COM6",
"COM7",
"COM8",
"COM9",
"LPT1",
"LPT2",
"LPT3",
"LPT4",
"LPT5",
"LPT6",
"LPT7",
"LPT8",
"LPT9",
})

def __init__(
self,
storage_path: str | Path,
*,
source_id: str = DEFAULT_SOURCE_ID,
load_messages: bool = True,
store_inputs: bool = True,
store_context_messages: bool = False,
store_context_from: set[str] | None = None,
store_outputs: bool = True,
skip_excluded: bool = False,
dumps: JsonDumps | None = None,
loads: JsonLoads | None = None,
) -> None:
"""Initialize the file history provider.

Args:
storage_path: Directory path where session history files will be stored.

Keyword Args:
source_id: Unique identifier for this provider instance.
load_messages: Whether to load messages before invocation.
store_inputs: Whether to store input messages.
store_context_messages: Whether to store context from other providers.
store_context_from: If set, only store context from these source_ids.
store_outputs: Whether to store response messages.
skip_excluded: When True, ``get_messages`` omits messages whose
``additional_properties["_excluded"]`` is truthy.
dumps: Callable that serializes a message payload dict to JSON text
or UTF-8 bytes. The returned JSON must fit on a single line.
loads: Callable that deserializes JSON text or bytes back to a
message payload dict.
"""
super().__init__(
source_id=source_id,
load_messages=load_messages,
store_inputs=store_inputs,
store_context_messages=store_context_messages,
store_context_from=store_context_from,
store_outputs=store_outputs,
)
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self._storage_root = self.storage_path.resolve()
self.skip_excluded = skip_excluded
self.dumps = dumps or _default_json_dumps
self.loads = loads or _default_json_loads
self._async_write_locks_by_loop: weakref.WeakKeyDictionary[
asyncio.AbstractEventLoop,
tuple[asyncio.Lock, ...],
] = weakref.WeakKeyDictionary()

async def get_messages(
self,
session_id: str | None,
*,
state: dict[str, Any] | None = None,
**kwargs: Any,
) -> list[Message]:
"""Retrieve messages from the session's JSON Lines file."""
del state, kwargs
file_path = self._session_file_path(session_id)
async_lock = self._session_async_write_lock(file_path)
thread_lock = self._session_write_lock(file_path)

def _read_messages() -> list[Message]:
with thread_lock:
if not file_path.exists():
return []

messages: list[Message] = []
with file_path.open(encoding="utf-8") as file_handle:
for line_number, line in enumerate(file_handle, start=1):
serialized = line.strip()
if not serialized:
continue
try:
payload = self.loads(serialized)
except (TypeError, ValueError) as exc:
raise ValueError(
f"Failed to deserialize history line {line_number} from '{file_path}'."
) from exc
if not isinstance(payload, Mapping):
raise ValueError(
f"History line {line_number} in '{file_path}' did not deserialize to a mapping."
)

try:
message = Message.from_dict(dict(cast(Mapping[str, Any], payload)))
except ValueError as exc:
raise ValueError(
f"History line {line_number} in '{file_path}' is not a valid Message payload."
) from exc
messages.append(message)
return messages

async with async_lock:
messages = await asyncio.to_thread(_read_messages)
if self.skip_excluded:
messages = [m for m in messages if not m.additional_properties.get("_excluded", False)]
return messages

async def save_messages(
self,
session_id: str | None,
messages: Sequence[Message],
*,
state: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""Append messages to the session's JSON Lines file."""
del state, kwargs
if not messages:
return

file_path = self._session_file_path(session_id)
async_lock = self._session_async_write_lock(file_path)
file_lock = self._session_write_lock(file_path)

def _append_messages() -> None:
with file_lock, file_path.open("a", encoding="utf-8") as file_handle:
for message in messages:
file_handle.write(f"{self._serialize_message(message)}\n")

async with async_lock:
await asyncio.to_thread(_append_messages)

def _serialize_message(self, message: Message) -> str:
"""Serialize a message payload to a single JSON Lines record."""
serialized = self.dumps(message.to_dict())
if isinstance(serialized, bytes):
serialized_text = serialized.decode("utf-8")
elif isinstance(serialized, str):
serialized_text = serialized
else:
raise TypeError("FileHistoryProvider.dumps must return str or bytes.")

if "\n" in serialized_text or "\r" in serialized_text:
raise ValueError("FileHistoryProvider.dumps must return single-line JSON for JSON Lines storage.")
return serialized_text

def _session_file_path(self, session_id: str | None) -> Path:
"""Resolve the on-disk history file path for a session."""
file_path = (self._storage_root / f"{self._session_file_stem(session_id)}{self.FILE_EXTENSION}").resolve()
if not file_path.is_relative_to(self._storage_root):
raise ValueError(f"Session history path escaped storage directory: {session_id!r}")
return file_path

def _session_file_stem(self, session_id: str | None) -> str:
"""Return the filename stem for a session."""
raw_session_id = session_id or self.DEFAULT_SESSION_FILE_STEM
if self._is_literal_session_file_stem_safe(raw_session_id):
return raw_session_id

encoded_session_id = urlsafe_b64encode(raw_session_id.encode("utf-8")).decode("ascii").rstrip("=")
return f"{self._ENCODED_SESSION_PREFIX}{encoded_session_id or self.DEFAULT_SESSION_FILE_STEM}"

def _session_async_write_lock(self, file_path: Path) -> asyncio.Lock:
"""Return the event-loop-local async lock for a session history file."""
loop = asyncio.get_running_loop()
locks = self._async_write_locks_by_loop.get(loop)
if locks is None:
locks = tuple(asyncio.Lock() for _ in range(self._FILE_LOCK_STRIPE_COUNT))
self._async_write_locks_by_loop[loop] = locks
return locks[self._lock_index(file_path)]

@classmethod
def _session_write_lock(cls, file_path: Path) -> threading.Lock:
"""Return the process-local thread lock for a session history file."""
return cls._FILE_WRITE_LOCKS[cls._lock_index(file_path)]

@classmethod
def _lock_index(cls, file_path: Path) -> int:
"""Map a session history file to a bounded lock stripe."""
return hash(file_path) % cls._FILE_LOCK_STRIPE_COUNT

@classmethod
def _is_literal_session_file_stem_safe(cls, session_id: str) -> bool:
"""Return whether the session ID can be used directly as a filename stem."""
if (
not session_id
or session_id.startswith(".")
or session_id.endswith((" ", "."))
or session_id.upper() in cls._WINDOWS_RESERVED_FILE_STEMS
):
return False
if any(ord(character) < 32 for character in session_id):
return False
return all(character.isalnum() or character in "._-" for character in session_id)
Loading
Loading