Python: Add Azure Cosmos history provider package#4271
Python: Add Azure Cosmos history provider package#4271eavanvalkenburg wants to merge 2 commits intomicrosoft:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new Python workspace package (agent-framework-azure-cosmos) that implements an Azure Cosmos DB–backed BaseHistoryProvider for persisting conversation history, along with unit tests and a runnable sample. This supports the Agent Framework’s pluggable “context/history provider” story similarly to existing integrations (e.g., Redis).
Changes:
- Introduce
agent-framework-azure-cosmospackage withCosmosHistoryProvider(Cosmos DB transactional batch writes + session partitioning). - Add unit tests and package-local sample/README for the Cosmos history provider.
- Wire the new package into the Python workspace (pyproject + uv.lock) and apply minor formatting cleanups in existing tests/modules.
Reviewed changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| python/uv.lock | Adds the new workspace member and locks azure-cosmos dependency. |
| python/pyproject.toml | Registers agent-framework-azure-cosmos as a uv workspace source. |
| python/packages/core/tests/core/test_skills.py | Minor whitespace-only change. |
| python/packages/core/tests/core/test_function_invocation_logic.py | Formatting of assertion messages (single-line f-strings). |
| python/packages/core/agent_framework/_skills.py | Minor formatting/line-wrapping adjustments. |
| python/packages/azure-cosmos/pyproject.toml | New package metadata, dependencies, and tooling config. |
| python/packages/azure-cosmos/agent_framework_azure_cosmos/_history_provider.py | Implements CosmosHistoryProvider (get/save/clear/list + batching + container creation). |
| python/packages/azure-cosmos/agent_framework_azure_cosmos/init.py | Exports CosmosHistoryProvider + version discovery. |
| python/packages/azure-cosmos/tests/test_cosmos_history_provider.py | New unit test suite for the provider (mocked Cosmos client/container). |
| python/packages/azure-cosmos/samples/cosmos_history_provider.py | Runnable sample demonstrating agent usage with Cosmos-backed history. |
| python/packages/azure-cosmos/samples/init.py | Samples package marker + docstring. |
| python/packages/azure-cosmos/samples/README.md | Sample runner documentation. |
| python/packages/azure-cosmos/README.md | Package-level “getting started” documentation. |
| python/packages/azure-cosmos/LICENSE | Package license file. |
| python/packages/azure-cosmos/AGENTS.md | Package agent documentation (class + import path). |
| python/packages/azure-ai-search/tests/test_aisearch_context_provider.py | Removes redundant local imports + minor formatting. |
| query = "SELECT c.message FROM c WHERE c.session_id = @session_id ORDER BY c.sort_key ASC" | ||
| parameters: list[dict[str, object]] = [{"name": "@session_id", "value": session_key}] |
There was a problem hiding this comment.
Documents include a source_id field, but reads/writes don’t use it: get_messages() queries only by session_id. If multiple history providers (or multiple source_ids) share the same container, histories will be mixed, and clear() may delete other providers’ data. Either remove source_id from stored documents, or (preferably) include it in the query filter (and in clear()/list_sessions() queries) so each provider instance is isolated.
| query = "SELECT c.message FROM c WHERE c.session_id = @session_id ORDER BY c.sort_key ASC" | |
| parameters: list[dict[str, object]] = [{"name": "@session_id", "value": session_key}] | |
| query = ( | |
| "SELECT c.message FROM c " | |
| "WHERE c.session_id = @session_id AND c.source_id = @source_id " | |
| "ORDER BY c.sort_key ASC" | |
| ) | |
| parameters: list[dict[str, object]] = [ | |
| {"name": "@session_id", "value": session_key}, | |
| {"name": "@source_id", "value": self.source_id}, | |
| ] |
| query = "SELECT c.id FROM c WHERE c.session_id = @session_id" | ||
| parameters: list[dict[str, object]] = [{"name": "@session_id", "value": session_key}] | ||
| items = container.query_items(query=query, parameters=parameters, partition_key=session_key) | ||
|
|
There was a problem hiding this comment.
clear() deletes all items for the session partition without filtering by source_id. If multiple provider instances share a container, calling clear() on one instance can delete another instance’s history. Consider including source_id in the delete query (and/or using a provider-specific partition key scheme).
| query = "SELECT DISTINCT VALUE c.session_id FROM c" | ||
| items = container.query_items(query=query, enable_cross_partition_query=True) |
There was a problem hiding this comment.
list_sessions() currently lists session IDs across the entire container. If the container can contain data for multiple providers/apps, consider filtering by source_id (or documenting that the container must be dedicated to a single provider instance) to avoid returning unrelated sessions.
| query = "SELECT DISTINCT VALUE c.session_id FROM c" | |
| items = container.query_items(query=query, enable_cross_partition_query=True) | |
| query = "SELECT DISTINCT VALUE c.session_id FROM c WHERE c.source_id = @source_id" | |
| parameters: list[dict[str, object]] = [{"name": "@source_id", "value": self.source_id}] | |
| items = container.query_items( | |
| query=query, | |
| parameters=parameters, | |
| enable_cross_partition_query=True, | |
| ) |
| credential = AzureCliCredential() | ||
| client = AzureOpenAIResponsesClient( | ||
| project_endpoint=project_endpoint, | ||
| deployment_name=deployment_name, | ||
| credential=credential, | ||
| ) | ||
|
|
||
| # 3. Create an agent that uses the history provider as a context provider. | ||
| async with ( | ||
| CosmosHistoryProvider( | ||
| endpoint=cosmos_endpoint, | ||
| database_name=cosmos_database_name, | ||
| container_name=cosmos_container_name, | ||
| credential=cosmos_key or credential, | ||
| ) as history_provider, | ||
| client.as_agent( | ||
| name="CosmosHistoryAgent", | ||
| instructions="You are a helpful assistant that remembers prior turns.", | ||
| context_providers=[history_provider], | ||
| default_options={"store": False}, | ||
| ) as agent, | ||
| ): | ||
| # 4. Create a session (session_id is used as the partition key). | ||
| session = agent.create_session() | ||
|
|
||
| # 5. Run a multi-turn conversation; history is persisted by CosmosHistoryProvider. | ||
| response1 = await agent.run("My name is Ada and I enjoy distributed systems.", session=session) | ||
| print(f"Assistant: {response1.text}") | ||
|
|
||
| response2 = await agent.run("What do you remember about me?", session=session) | ||
| print(f"Assistant: {response2.text}") | ||
| print(f"Container: {history_provider.container_name}") |
There was a problem hiding this comment.
AzureCliCredential() (aio) is created here but never closed. Please ensure the sample disposes it properly (e.g., async with AzureCliCredential() as credential: or await credential.close() in a finally) to avoid leaking network resources during repeated runs.
| credential = AzureCliCredential() | |
| client = AzureOpenAIResponsesClient( | |
| project_endpoint=project_endpoint, | |
| deployment_name=deployment_name, | |
| credential=credential, | |
| ) | |
| # 3. Create an agent that uses the history provider as a context provider. | |
| async with ( | |
| CosmosHistoryProvider( | |
| endpoint=cosmos_endpoint, | |
| database_name=cosmos_database_name, | |
| container_name=cosmos_container_name, | |
| credential=cosmos_key or credential, | |
| ) as history_provider, | |
| client.as_agent( | |
| name="CosmosHistoryAgent", | |
| instructions="You are a helpful assistant that remembers prior turns.", | |
| context_providers=[history_provider], | |
| default_options={"store": False}, | |
| ) as agent, | |
| ): | |
| # 4. Create a session (session_id is used as the partition key). | |
| session = agent.create_session() | |
| # 5. Run a multi-turn conversation; history is persisted by CosmosHistoryProvider. | |
| response1 = await agent.run("My name is Ada and I enjoy distributed systems.", session=session) | |
| print(f"Assistant: {response1.text}") | |
| response2 = await agent.run("What do you remember about me?", session=session) | |
| print(f"Assistant: {response2.text}") | |
| print(f"Container: {history_provider.container_name}") | |
| async with AzureCliCredential() as credential: | |
| client = AzureOpenAIResponsesClient( | |
| project_endpoint=project_endpoint, | |
| deployment_name=deployment_name, | |
| credential=credential, | |
| ) | |
| # 3. Create an agent that uses the history provider as a context provider. | |
| async with ( | |
| CosmosHistoryProvider( | |
| endpoint=cosmos_endpoint, | |
| database_name=cosmos_database_name, | |
| container_name=cosmos_container_name, | |
| credential=cosmos_key or credential, | |
| ) as history_provider, | |
| client.as_agent( | |
| name="CosmosHistoryAgent", | |
| instructions="You are a helpful assistant that remembers prior turns.", | |
| context_providers=[history_provider], | |
| default_options={"store": False}, | |
| ) as agent, | |
| ): | |
| # 4. Create a session (session_id is used as the partition key). | |
| session = agent.create_session() | |
| # 5. Run a multi-turn conversation; history is persisted by CosmosHistoryProvider. | |
| response1 = await agent.run("My name is Ada and I enjoy distributed systems.", session=session) | |
| print(f"Assistant: {response1.text}") | |
| response2 = await agent.run("What do you remember about me?", session=session) | |
| print(f"Assistant: {response2.text}") | |
| print(f"Container: {history_provider.container_name}") |
| """ | ||
| This sample demonstrates CosmosHistoryProvider as an agent context provider. | ||
|
|
||
| Key components: | ||
| - AzureOpenAIResponsesClient configured with an Azure AI project endpoint |
There was a problem hiding this comment.
For consistency with existing samples, consider moving this descriptive triple-quoted block to come after the load_dotenv() call (rather than before it). This matches the structure used throughout python/samples/* and keeps environment loading in a consistent place.
| # Load environment variables from .env file. | ||
| load_dotenv() |
There was a problem hiding this comment.
load_dotenv() is called after the descriptive triple-quoted block; in existing samples the .env load happens immediately after imports. Consider moving load_dotenv() up so env vars are loaded before any sample configuration text/instructions are presented, matching python/samples/SAMPLE_GUIDELINES.md.
| self._container: ContainerProxy | None = container_client | ||
| self._owns_client = False | ||
|
|
||
| if self._container is not None: |
There was a problem hiding this comment.
When container_client is provided, __init__ returns early, so database_name / container_name attributes are never set (and any provided database_name/container_name args are ignored). This can lead to AttributeError for callers that log/introspect these properties. Consider setting these attributes even in the injected-container path, or clearly documenting that they may be unset when container_client is used.
| if self._container is not None: | |
| if self._container is not None: | |
| # When a container_client is provided, we may still want database/container | |
| # names for logging or introspection purposes. Use any explicitly supplied | |
| # values; they may be None if not provided. | |
| self.database_name = database_name | |
| self.container_name = container_name |
Summary
agent-framework-azure-cosmosCosmosHistoryProviderfor conversation history in Azure Cosmos DBexecute_item_batchand addlist_sessionsValidation
uv run --directory packages/azure-cosmos poe testuv run --directory packages/azure-cosmos poe lintuv run --directory packages/azure-cosmos poe pyrightuv run --directory packages/azure-cosmos poe mypyFixes #1390