forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoverride_result_with_middleware.py
More file actions
111 lines (91 loc) · 4.25 KB
/
override_result_with_middleware.py
File metadata and controls
111 lines (91 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from collections.abc import AsyncIterable, Awaitable, Callable
from random import randint
from typing import Annotated
from agent_framework import (
AgentRunContext,
AgentRunResponse,
AgentRunResponseUpdate,
ChatMessage,
Role,
TextContent,
)
from agent_framework.azure import AzureAIAgentClient
from azure.identity.aio import AzureCliCredential
from pydantic import Field
"""
Result Override with Middleware (Regular and Streaming)
This sample demonstrates how to use middleware to intercept and modify function results
after execution, supporting both regular and streaming agent responses. The example shows:
- How to execute the original function first and then modify its result
- Replacing function outputs with custom messages or transformed data
- Using middleware for result filtering, formatting, or enhancement
- Detecting streaming vs non-streaming execution using context.is_streaming
- Overriding streaming results with custom async generators
The weather override middleware lets the original weather function execute normally,
then replaces its result with a custom "perfect weather" message. For streaming responses,
it creates a custom async generator that yields the override message in chunks.
"""
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def weather_override_middleware(
context: AgentRunContext, next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
"""Middleware that overrides weather results for both streaming and non-streaming cases."""
# Let the original agent execution complete first
await next(context)
# Check if there's a result to override (agent called weather function)
if context.result is not None:
# Create custom weather message
chunks = [
"Weather Advisory - ",
"due to special atmospheric conditions, ",
"all locations are experiencing perfect weather today! ",
"Temperature is a comfortable 22°C with gentle breezes. ",
"Perfect day for outdoor activities!",
]
if context.is_streaming:
# For streaming: create an async generator that yields chunks
async def override_stream() -> AsyncIterable[AgentRunResponseUpdate]:
for chunk in chunks:
yield AgentRunResponseUpdate(contents=[TextContent(text=chunk)])
context.result = override_stream()
else:
# For non-streaming: just replace with the string message
custom_message = "".join(chunks)
context.result = AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, text=custom_message)])
async def main() -> None:
"""Example demonstrating result override with middleware for both streaming and non-streaming."""
print("=== Result Override Middleware Example ===")
# For authentication, run `az login` command in terminal or replace AzureCliCredential with preferred
# authentication option.
async with (
AzureCliCredential() as credential,
AzureAIAgentClient(async_credential=credential).create_agent(
name="WeatherAgent",
instructions="You are a helpful weather assistant. Use the weather tool to get current conditions.",
tools=get_weather,
middleware=weather_override_middleware,
) as agent,
):
# Non-streaming example
print("\n--- Non-streaming Example ---")
query = "What's the weather like in Seattle?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result}")
# Streaming example
print("\n--- Streaming Example ---")
query = "What's the weather like in Portland?"
print(f"User: {query}")
print("Agent: ", end="", flush=True)
async for chunk in agent.run_stream(query):
if chunk.text:
print(chunk.text, end="", flush=True)
if __name__ == "__main__":
asyncio.run(main())