diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 0000000..5d5b6d4 --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,24 @@ +# Based on Azure Functions Python image with Core Tools +# Based on Azure Functions Python image with Core Tools +FROM mcr.microsoft.com/azure-functions/python:4-python3.11-core-tools + +# Update and install dependencies +RUN apt-get update \ + && apt-get install -y \ + curl \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + + # Workaround for core tools hasn't support APT install for MCP version 4.0.7332-preview1 yet. +RUN npm uninstall -g azure-functions-core-tools-4 -y +# Install Azurite globally using npm +RUN npm install -g azurite +RUN npm install -g azure-functions-core-tools@4.0.7332-preview1 + +# Set up Azure Functions environment variables +ENV AzureFunctionsJobHost__Logging__Console__IsEnabled=true + +# Expose ports +EXPOSE 7071 + diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..2390576 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,24 @@ +{ + "name": "Azure Functions & Python 3", + "dockerFile": "Dockerfile", + "forwardPorts": [ 7071 ], + + // Configure tool-specific properties. + "customizations": { + // Configure properties specific to VS Code. + "vscode": { + // Add the IDs of extensions you want installed when the container is created. + "extensions": [ + "ms-azuretools.vscode-azurefunctions", + "ms-azuretools.vscode-docker", + "ms-python.python" + ] + } + }, + + // Use 'postCreateCommand' to run commands after the container is created. + // "postCreateCommand": "npm install", + + // Set `remoteUser` to `root` to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. + "remoteUser": "vscode" +} \ No newline at end of file diff --git a/.gitattribute b/.gitattribute new file mode 100644 index 0000000..b4da691 --- /dev/null +++ b/.gitattribute @@ -0,0 +1,4 @@ + +* text=auto eol=lf +*.{cmd,[cC][mM][dD]} text eol=crlf +*.{bat,[bB][aA][tT]} text eol=crlf diff --git a/README.md b/README.md index 698f964..423fefc 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ The libraries are supported on Function Apps using Python 3.9 or later. For more ## SDK Support * Azure Storage Blob +* Azure Functions Agents Framework +* Azure Functions Agents Durable Extension ## Need help? diff --git a/azurefunctions-agents-durable/README.md b/azurefunctions-agents-durable/README.md new file mode 100644 index 0000000..e69de29 diff --git a/azurefunctions-agents-durable/azurefunctions/__init__.py b/azurefunctions-agents-durable/azurefunctions/__init__.py new file mode 100644 index 0000000..5b7f7a9 --- /dev/null +++ b/azurefunctions-agents-durable/azurefunctions/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. diff --git a/azurefunctions-agents-durable/azurefunctions/agents/__init__.py b/azurefunctions-agents-durable/azurefunctions/agents/__init__.py new file mode 100644 index 0000000..5b7f7a9 --- /dev/null +++ b/azurefunctions-agents-durable/azurefunctions/agents/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. diff --git a/azurefunctions-agents-durable/azurefunctions/agents/durable/__init__.py b/azurefunctions-agents-durable/azurefunctions/agents/durable/__init__.py new file mode 100644 index 0000000..033aa36 --- /dev/null +++ b/azurefunctions-agents-durable/azurefunctions/agents/durable/__init__.py @@ -0,0 +1,79 @@ +"""Azure Functions Agents Durable Package. + +This package provides a framework for calling agents from Durable Functions orchestrators +without needing explicit activity triggers. It supports multiple communication modes +including HTTP, MCP (Model Context Protocol), and Agent-to-Agent communication. + +Key Features: +- Activity trigger-less notation for orchestrators +- Support for HTTP, MCP, A2A_TASK, and A2A_SYNC call modes +- Enhanced MCP support with both stdio and SSE clients +- Decorator-based framework for easy integration +- Configuration-driven agent management + +Example Usage: + from azure.functions import FunctionApp + from azurefunctions.agents.durable import DFAgentFramework, orchestrator, AgentConfig, CallMode + + app = FunctionApp() + framework = DFAgentFramework(app) + + # Register agents + framework.register_agent(AgentConfig( + name="my_mcp_agent", + call_mode=CallMode.MCP, + client_type="stdio", + extra_config={ + "command": "python", + "args": ["-m", "my_mcp_server"] + } + )) + + @orchestrator(framework) + async def my_orchestrator(context, agents): + tools = await agents.list_mcp_tools("my_mcp_agent") + result = await agents.call_mcp_tool("my_mcp_agent", "my_tool", {"arg": "value"}) + return result +""" + +from .types import CallMode, AgentConfig, AgentCallRequest, AgentCallResponse +from .framework import DFAgentFramework, AgentCaller +from .decorators import orchestrator, activity_with_agent_support, register_orchestrator_with_agents +from .call_modes import ( + BaseAgentCaller, + HttpAgentCaller, + MCPAgentCaller, + A2ATaskAgentCaller, + A2ASyncAgentCaller +) +from .call_modes.mcp_caller import MCPClientHelper + +__version__ = "0.1.0" + +__all__ = [ + # Core types + "CallMode", + "AgentConfig", + "AgentCallRequest", + "AgentCallResponse", + + # Framework classes + "DFAgentFramework", + "AgentCaller", + + # Decorators + "orchestrator", + "activity_with_agent_support", + "register_orchestrator_with_agents", + + # MCP Helper + "MCPClientHelper", + "MCPAgentCaller", + + # Caller implementations + "BaseAgentCaller", + "HttpAgentCaller", + "MCPAgentCaller", + "A2ATaskAgentCaller", + "A2ASyncAgentCaller", +] \ No newline at end of file diff --git a/azurefunctions-agents-durable/azurefunctions/agents/durable/activity.py b/azurefunctions-agents-durable/azurefunctions/agents/durable/activity.py new file mode 100644 index 0000000..e69de29 diff --git a/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/__init__.py b/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/__init__.py new file mode 100644 index 0000000..e2600ff --- /dev/null +++ b/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/__init__.py @@ -0,0 +1,14 @@ +"""Call mode implementations for the Durable Functions Agents framework.""" + +from .base import BaseAgentCaller +from .http_caller import HttpAgentCaller +from .mcp_caller import MCPAgentCaller +from .a2a_caller import A2ATaskAgentCaller, A2ASyncAgentCaller + +__all__ = [ + "BaseAgentCaller", + "HttpAgentCaller", + "MCPAgentCaller", + "A2ATaskAgentCaller", + "A2ASyncAgentCaller" +] \ No newline at end of file diff --git a/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/a2a_caller.py b/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/a2a_caller.py new file mode 100644 index 0000000..025c839 --- /dev/null +++ b/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/a2a_caller.py @@ -0,0 +1,187 @@ +"""Agent-to-Agent (A2A) caller implementations.""" + +import json +import logging +from typing import Any, Dict, Optional, Union +import azure.functions as func +import azure.durable_functions as df +from azure.durable_functions import DurableOrchestrationClient + +from .base import BaseAgentCaller +from ..types import AgentCallResponse + + +logger = logging.getLogger(__name__) + + +class A2ATaskAgentCaller(BaseAgentCaller): + """Agent caller for task-based A2A communication.""" + def register_activities(self) -> None: + """Register A2A task activity function.""" + activity_name = self.get_activity_name("default") + + @self.app.activity_trigger(arg_name="req", activity_name=activity_name) + async def a2a_task_activity(req: str) -> str: + """Activity function for A2A task agent calls.""" + try: + request_data = json.loads(req) + response = await self._execute_a2a_task_call( + request_data["method"], + request_data.get("args", {}), + request_data.get("kwargs", {}) + ) + return json.dumps(response.__dict__) + except Exception as e: + logger.exception(f"Error in A2A task activity for {self.config.name}") + error_response = AgentCallResponse.error_response(str(e)) + return json.dumps(error_response.__dict__) + + def get_activity_name(self, method: str) -> str: + """Get the activity name for A2A task calls.""" + return f"call_a2a_task_agent_{self.config.name}" + + def call_agent(self, context: df.DurableOrchestrationContext, method: str, + args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any: + """Call the A2A task agent using the orchestrator context. + + Args: + context: Durable orchestrator context (required) + method: The method to call on the agent + args: Positional arguments for the method + kwargs: Keyword arguments for the method + + Returns: + Result from the A2A task agent call + """ + args = args or {} + kwargs = kwargs or {} + + if context is None: + raise ValueError("context parameter is required for durable orchestrator framework") + + # Use activity pattern with orchestrator context + activity_name = self.get_activity_name("default") + request_data = { + "method": method, + "args": args, + "kwargs": kwargs + } + return context.call_activity(activity_name, json.dumps(request_data)) + + async def _execute_a2a_task_call(self, method: str, args: Dict[str, Any], kwargs: Dict[str, Any]) -> AgentCallResponse: + """Execute A2A task call by starting a new orchestration.""" + try: + # Extract orchestration function name from config + extra_config = self.config.extra_config or {} + orchestration_name = extra_config.get("orchestration_name", "agent_orchestrator") + + # Prepare the input for the target orchestration + orchestration_input = { + "method": method, + "args": args, + "kwargs": kwargs, + "agent_name": self.config.name + } + + # This would typically be called from an orchestrator context + # For now, we'll simulate the call + # In a real scenario, you'd use: + # client = DurableOrchestrationClient(...) + # instance_id = await client.start_new(orchestration_name, None, orchestration_input) + # result = await client.wait_for_completion_or_create_check_status_response(instance_id, timeout_in_milliseconds=30000) + + # Simulated response for demonstration + return AgentCallResponse.success_response({ + "message": f"A2A task call to {self.config.name}.{method} initiated", + "orchestration_name": orchestration_name, + "input": orchestration_input + }) + + except Exception as e: + logger.exception(f"Error executing A2A task call for {self.config.name}") + return AgentCallResponse.error_response(str(e)) + + +class A2ASyncAgentCaller(BaseAgentCaller): + """Agent caller for synchronous A2A communication.""" + def register_activities(self) -> None: + """Register A2A sync activity function.""" + activity_name = self.get_activity_name("default") + + @self.app.activity_trigger(arg_name="req", activity_name=activity_name) + async def a2a_sync_activity(req: str) -> str: + """Activity function for A2A sync agent calls.""" + try: + request_data = json.loads(req) + response = await self._execute_a2a_sync_call( + request_data["method"], + request_data.get("args", {}), + request_data.get("kwargs", {}) + ) + return json.dumps(response.__dict__) + except Exception as e: + logger.exception(f"Error in A2A sync activity for {self.config.name}") + error_response = AgentCallResponse.error_response(str(e)) + return json.dumps(error_response.__dict__) + + def get_activity_name(self, method: str) -> str: + """Get the activity name for A2A sync calls.""" + return f"call_a2a_sync_agent_{self.config.name}" + + def call_agent(self, context: df.DurableOrchestrationContext, method: str, + args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any: + """Call the A2A sync agent using the orchestrator context. + + Args: + context: Durable orchestrator context (required) + method: The method to call on the agent + args: Positional arguments for the method + kwargs: Keyword arguments for the method + + Returns: + Result from the A2A sync agent call + """ + args = args or {} + kwargs = kwargs or {} + + if context is None: + raise ValueError("context parameter is required for durable orchestrator framework") + + # Use activity pattern with orchestrator context + activity_name = self.get_activity_name("default") + request_data = { + "method": method, + "args": args, + "kwargs": kwargs + } + result = yield context.call_activity(activity_name, json.dumps(request_data)) + return json.loads(result) + + async def _execute_a2a_sync_call(self, method: str, args: Dict[str, Any], kwargs: Dict[str, Any]) -> AgentCallResponse: + """Execute synchronous A2A call through sub-orchestration.""" + try: + # Extract sub-orchestration details from config + extra_config = self.config.extra_config or {} + sub_orchestration_name = extra_config.get("sub_orchestration_name", "agent_sub_orchestrator") + + # Prepare the input for the sub-orchestration + sub_orchestration_input = { + "method": method, + "args": args, + "kwargs": kwargs, + "agent_name": self.config.name + } + + # This would typically be called from an orchestrator context using: + # result = await context.call_sub_orchestrator(sub_orchestration_name, sub_orchestration_input) + + # Simulated response for demonstration + return AgentCallResponse.success_response({ + "message": f"A2A sync call to {self.config.name}.{method} completed", + "sub_orchestration_name": sub_orchestration_name, + "input": sub_orchestration_input + }) + + except Exception as e: + logger.exception(f"Error executing A2A sync call for {self.config.name}") + return AgentCallResponse.error_response(str(e)) diff --git a/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/base.py b/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/base.py new file mode 100644 index 0000000..59e2d0d --- /dev/null +++ b/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/base.py @@ -0,0 +1,41 @@ +"""Base interface for agent callers.""" + +from abc import ABC, abstractmethod +from typing import Any, Dict, Union, Optional +from azure.functions import FunctionApp +import azure.durable_functions as df + +from ..types import AgentConfig, AgentCallResponse + + +class BaseAgentCaller(ABC): + """Base class for all agent callers.""" + + def __init__(self, config: AgentConfig, app: Union[FunctionApp, df.DFApp]): + self.config = config + self.app = app @abstractmethod + def call_agent(self, context: df.DurableOrchestrationContext, method: str, + args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any: + """Call the agent with the specified method and parameters. + + Args: + context: Durable orchestrator context (required for this framework) + method: The method to call on the agent + args: Positional arguments for the method + kwargs: Keyword arguments for the method + + Returns: + Result from the agent call through the activity pattern + """ + pass + + @abstractmethod + def register_activities(self) -> None: + """Register any required activity functions for this caller.""" + pass + + def get_activity_name(self, method: str) -> str: + """Get the activity name for a specific method. Override in subclasses if needed.""" + # Default pattern: call_{call_mode}_{agent_name} + call_mode = self.config.call_mode.value.lower() + return f"call_{call_mode}_agent_{self.config.name}" \ No newline at end of file diff --git a/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/http_caller.py b/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/http_caller.py new file mode 100644 index 0000000..c844181 --- /dev/null +++ b/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/http_caller.py @@ -0,0 +1,103 @@ +"""HTTP agent caller implementation.""" + +import json +import logging +from typing import Any, Dict, Union +import aiohttp +import azure.functions as func +import azure.durable_functions as df + +from .base import BaseAgentCaller +from ..types import AgentCallResponse + + +logger = logging.getLogger(__name__) + + +class HttpAgentCaller(BaseAgentCaller): + """Agent caller for HTTP endpoints.""" + def register_activities(self) -> None: + """Register HTTP activity function.""" + activity_name = self.get_activity_name("default") + + @self.app.activity_trigger(arg_name="req", activity_name=activity_name) + async def http_activity(req: str) -> str: + """Activity function for HTTP agent calls.""" + try: + request_data = json.loads(req) + response = await self._make_http_call( + request_data["method"], + request_data.get("args", {}), + request_data.get("kwargs", {}) + ) + return json.dumps(response.__dict__) + except Exception as e: + error_response = AgentCallResponse.error_response(str(e)) + return json.dumps(error_response.__dict__) + + def get_activity_name(self, method: str) -> str: + """Get the activity name for HTTP calls.""" + return f"call_http_agent_{self.config.name}" + + def call_agent(self, context: df.DurableOrchestrationContext, method: str, + args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any: + """Call the HTTP agent using the orchestrator context. + + Args: + context: Durable orchestrator context (required) + method: The method to call on the agent + args: Positional arguments for the method + kwargs: Keyword arguments for the method + + Returns: + Result from the HTTP agent call + """ + args = args or {} + kwargs = kwargs or {} + + if context is None: + raise ValueError("context parameter is required for durable orchestrator framework") + + # Use activity pattern with orchestrator context + activity_name = self.get_activity_name("default") + request_data = { + "method": method, + "args": args, + "kwargs": kwargs + } + return context.call_activity(activity_name, json.dumps(request_data)) + + async def _make_http_call(self, method: str, args: Dict[str, Any], kwargs: Dict[str, Any]) -> AgentCallResponse: + """Make the actual HTTP call.""" + try: + headers = {} + if self.config.auth_token: + headers["Authorization"] = f"Bearer {self.config.auth_token}" + + payload = { + "method": method, + "args": args, + "kwargs": kwargs + } + + async with aiohttp.ClientSession() as session: + async with session.post( + self.config.endpoint, + json=payload, + headers=headers, + timeout=aiohttp.ClientTimeout(total=30) + ) as response: + if response.status == 200: + result = await response.json() + return AgentCallResponse.success_response(result) + else: + error_text = await response.text() + return AgentCallResponse.error_response(f"HTTP {response.status}: {error_text}") + + except aiohttp.ClientTimeout: + return AgentCallResponse.error_response("Request timeout") + except aiohttp.ClientError as e: + return AgentCallResponse.error_response(f"HTTP client error: {str(e)}") + except Exception as e: + logger.exception(f"Unexpected error calling HTTP agent {self.config.name}") + return AgentCallResponse.error_response(f"Unexpected error: {str(e)}") diff --git a/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/mcp_caller.py b/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/mcp_caller.py new file mode 100644 index 0000000..6a01c5e --- /dev/null +++ b/azurefunctions-agents-durable/azurefunctions/agents/durable/call_modes/mcp_caller.py @@ -0,0 +1,399 @@ +"""MCP (Model Context Protocol) agent caller implementation.""" + +import json +import logging +import subprocess +import asyncio +from typing import Any, Dict, List, Optional, Union +import azure.functions as func +import azure.durable_functions as df + +from .base import BaseAgentCaller +from ..types import AgentCallResponse + +logger = logging.getLogger(__name__) + +# MCP imports (will be optional if not available) +try: + from mcp.client.sse import sse_client + from mcp.client.stdio import stdio_client + from mcp import ClientSession + MCP_AVAILABLE = True +except ImportError: + MCP_AVAILABLE = False + logger.warning("MCP client library not available. MCP functionality will be simulated.") + + +class MCPAgentCaller(BaseAgentCaller): + """Agent caller for MCP (Model Context Protocol) endpoints with full SSE and stdio support.""" + + def register_activities(self) -> None: + """Register MCP activity function.""" + activity_name = self.get_activity_name("default") + + @self.app.activity_trigger(arg_name="req", activity_name=activity_name) + async def mcp_activity(req: str) -> str: + """Activity function for MCP agent calls.""" + try: + request_data = json.loads(req) + response = await self._execute_mcp_call( + request_data["method"], + request_data.get("args", {}), + request_data.get("kwargs", {}) + ) + return json.dumps(response.__dict__) + except Exception as e: + logger.exception(f"Error in MCP activity for {self.config.name}") + error_response = AgentCallResponse.error_response(str(e)) + return json.dumps(error_response.__dict__) + + def get_activity_name(self, method: str) -> str: + """Get the activity name for MCP calls.""" + return f"call_mcp_agent_{self.config.name}" + + def call_agent(self, context: df.DurableOrchestrationContext, method: str, + args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any: + """Call the MCP agent using the orchestrator context. + + Args: + context: Durable orchestrator context (required) + method: The method to call on the agent + args: Positional arguments for the method + kwargs: Keyword arguments for the method + + Returns: + Result from the MCP agent call + """ + args = args or {} + kwargs = kwargs or {} + + if context is None: + raise ValueError("context parameter is required for durable orchestrator framework") + + # Use activity pattern with orchestrator context + activity_name = self.get_activity_name("default") + request_data = { + "method": method, + "args": args, + "kwargs": kwargs + } + return context.call_activity(activity_name, json.dumps(request_data)) + + async def call_mcp_tool(self, context: df.DurableOrchestrationContext, tool_name: str, + arguments: Dict[str, Any] = None) -> Any: + """Convenience method to call an MCP tool. + + Args: + context: Durable orchestrator context (required) + tool_name: Name of the tool to call + arguments: Arguments to pass to the tool + """ + return await self.call_agent("call_tool", {"name": tool_name, "arguments": arguments or {}}, context=context) + + async def list_mcp_tools(self, context: df.DurableOrchestrationContext) -> Any: + """List available tools on the MCP server. + + Args: + context: Durable orchestrator context (required) + """ + return await self.call_agent("list_tools", context=context) + + async def list_mcp_resources(self, context: df.DurableOrchestrationContext) -> AgentCallResponse: + """List available resources on the MCP server.""" + return await self.call_agent(context, "list_resources") + + async def read_mcp_resource(self, context: df.DurableOrchestrationContext, uri: str) -> AgentCallResponse: + """Read a resource from the MCP server.""" + return await self.call_agent(context, "read_resource", {"uri": uri}) + + async def _execute_mcp_call(self, method: str, args: Dict[str, Any], kwargs: Dict[str, Any]) -> AgentCallResponse: + """Execute MCP call based on the configured client type.""" + try: + extra_config = self.config.extra_config or {} + client_type = extra_config.get("client_type", "sse") + + if client_type == "sse": + return await self._call_via_sse(method, args, kwargs) + elif client_type == "stdio": + return await self._call_via_stdio(method, args, kwargs) + else: + return AgentCallResponse.error_response(f"Unsupported MCP client type: {client_type}") + + except Exception as e: + logger.exception(f"Error executing MCP call for {self.config.name}") + return AgentCallResponse.error_response(str(e)) + + async def _call_via_sse(self, method: str, args: Dict[str, Any], kwargs: Dict[str, Any]) -> AgentCallResponse: + """Execute MCP call via Server-Sent Events with full MCP protocol implementation.""" + if not MCP_AVAILABLE: + return AgentCallResponse.error_response("MCP client library not available") + + try: + # Extract SSE configuration + extra_config = self.config.extra_config or {} + sse_url = extra_config.get("sse_url") or self.config.endpoint + + if not sse_url: + return AgentCallResponse.error_response("Missing SSE URL for SSE MCP client") + + # Extract tool name and arguments from method call + tool_name = method + arguments = args.copy() + arguments.update(kwargs) + + # Connect to MCP server via SSE + async with sse_client(url=sse_url) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + logger.debug(f"Connected to MCP server at {sse_url}") + + # Handle different method types + if method == "list_tools": + response = await session.list_tools() + tools = [{"name": tool.name, "description": tool.description} for tool in response.tools] + return AgentCallResponse.success_response(tools) + + elif method == "call_tool": + tool_name = arguments.get("name") or arguments.get("tool_name") + tool_arguments = arguments.get("arguments", {}) + + if not tool_name: + return AgentCallResponse.error_response("Missing tool name for call_tool") + + response = await session.call_tool(name=tool_name, arguments=tool_arguments) + + # Convert MCP response to serializable format + serializable_result = self._serialize_mcp_response(response) + return AgentCallResponse.success_response(serializable_result) + + elif method == "list_resources": + response = await session.list_resources() + resources = [{"uri": res.uri, "name": res.name} for res in response.resources] + return AgentCallResponse.success_response(resources) + + elif method == "read_resource": + uri = arguments.get("uri") + if not uri: + return AgentCallResponse.error_response("Missing URI for read_resource") + + response = await session.read_resource(uri=uri) + return AgentCallResponse.success_response({"uri": uri, "content": response.contents}) + + else: + # Generic tool call + response = await session.call_tool(name=method, arguments=arguments) + serializable_result = self._serialize_mcp_response(response) + return AgentCallResponse.success_response(serializable_result) + + except Exception as e: + logger.exception(f"MCP SSE call failed for {self.config.name}") + return AgentCallResponse.error_response(f"MCP SSE call failed: {str(e)}") + + async def _call_via_stdio(self, method: str, args: Dict[str, Any], kwargs: Dict[str, Any]) -> AgentCallResponse: + """Execute MCP call via stdio with full MCP protocol implementation.""" + if not MCP_AVAILABLE: + return AgentCallResponse.error_response("MCP client library not available") + + try: + # Extract stdio configuration + extra_config = self.config.extra_config or {} + command = extra_config.get("command") + command_args = extra_config.get("args", []) + + if not command: + return AgentCallResponse.error_response("Missing 'command' in extra_config for stdio MCP client") + + # Extract tool name and arguments from method call + tool_name = method + arguments = args.copy() + arguments.update(kwargs) + + # Connect to MCP server via stdio + async with stdio_client(command=command, args=command_args) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + logger.debug(f"Connected to MCP server via stdio: {command}") + + # Handle different method types + if method == "list_tools": + response = await session.list_tools() + tools = [{"name": tool.name, "description": tool.description} for tool in response.tools] + return AgentCallResponse.success_response(tools) + + elif method == "call_tool": + tool_name = arguments.get("name") or arguments.get("tool_name") + tool_arguments = arguments.get("arguments", {}) + + if not tool_name: + return AgentCallResponse.error_response("Missing tool name for call_tool") + + response = await session.call_tool(name=tool_name, arguments=tool_arguments) + + # Convert MCP response to serializable format + serializable_result = self._serialize_mcp_response(response) + return AgentCallResponse.success_response(serializable_result) + + elif method == "list_resources": + response = await session.list_resources() + resources = [{"uri": res.uri, "name": res.name} for res in response.resources] + return AgentCallResponse.success_response(resources) + + elif method == "read_resource": + uri = arguments.get("uri") + if not uri: + return AgentCallResponse.error_response("Missing URI for read_resource") + + response = await session.read_resource(uri=uri) + return AgentCallResponse.success_response({"uri": uri, "content": response.contents}) + + else: + # Generic tool call + response = await session.call_tool(name=method, arguments=arguments) + serializable_result = self._serialize_mcp_response(response) + return AgentCallResponse.success_response(serializable_result) + + except Exception as e: + logger.exception(f"MCP stdio call failed for {self.config.name}") + return AgentCallResponse.error_response(f"MCP stdio call failed: {str(e)}") + + def _serialize_mcp_response(self, response) -> Dict[str, Any]: + """Convert MCP response to JSON serializable format.""" + try: + serializable_result = {} + + if hasattr(response, 'content') and response.content: + # Extract text from content array + text_content = [] + for content_item in response.content: + if hasattr(content_item, 'text'): + text_content.append(content_item.text) + elif hasattr(content_item, 'type') and content_item.type == 'text': + text_content.append(str(content_item)) + else: + text_content.append(str(content_item)) + + serializable_result = { + "content": text_content, + "isError": getattr(response, 'isError', False) + } + else: + # Fallback to string representation + serializable_result = {"content": [str(response)], "isError": False} + + return serializable_result + + except Exception as e: + logger.warning(f"Failed to serialize MCP response: {e}") + return {"content": [str(response)], "isError": False} + + +class MCPClientHelper: + """Convenience helper class that provides the exact prototype pattern from mcp_client.py.""" + + def __init__(self, app: df.DFApp, sse_url: str = "http://localhost:7071/runtime/webhooks/mcp/sse"): + """Initialize MCP client helper with prototype pattern compatibility. + + Parameters + ---------- + app : df.DFApp + Durable Functions application instance + sse_url : str + MCP SSE endpoint URL + """ + self.app = app + self.sse_url = sse_url + # Register the necessary activity functions using the same pattern as mcp_client.py + self._register_mcp_activities() + + def _register_mcp_activities(self): + """Register necessary MCP activity functions using the exact prototype pattern.""" + + @self.app.activity_trigger(input_name="mcp_request") + async def mcp_call_service(mcp_request: Dict[str, Any]) -> Dict[str, Any]: + """Generic MCP service call activity function.""" + tool_name = mcp_request.get("tool_name") + arguments = mcp_request.get("arguments", {}) + sse_url = mcp_request.get("sse_url", self.sse_url) + + response = None + try: + if not MCP_AVAILABLE: + return {"status": "error", "error": "MCP client library not available"} + + # MCP client initialization + async with sse_client(url=sse_url) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + logger.debug("Connected successfully to MCP server!") + + # Tool call + response = await session.call_tool( + name=tool_name, + arguments=arguments + ) + + except Exception as e: + logger.error("Unexpected exception in mcp_call_service: %s", str(e)) + logger.exception("Full stack trace:") + return {"status": "error", "error": str(e)} + + # Convert MCP response to JSON serializable format + if response is None: + return {"status": "error", "error": "Failed to call MCP service"} + + # Extract text content from CallToolResult (same as mcp_client.py) + serializable_result = {} + if hasattr(response, 'content') and response.content: + # Extract text from content array + text_content = [] + for content_item in response.content: + if hasattr(content_item, 'text'): + text_content.append(content_item.text) + elif hasattr(content_item, 'type') and content_item.type == 'text': + text_content.append(str(content_item)) + + serializable_result = { + "content": text_content, + "isError": getattr(response, 'isError', False) + } + else: + # Fallback to string representation + serializable_result = {"content": [str(response)], "isError": False} + + result = { + "status": "success", + "data": f"Called {tool_name} with arguments {arguments}", + "result": serializable_result + } + return result + + def call_service(self, context: df.DurableOrchestrationContext, + tool_name: str, arguments: Optional[Dict] = None, + sse_url: Optional[str] = None) -> Any: + """Helper method to call MCP service from orchestrator (exact prototype pattern). + + Parameters + ---------- + context : df.DurableOrchestrationContext + Orchestration context + tool_name : str + MCP tool name to call + arguments : Optional[Dict] + Tool arguments + sse_url : Optional[str] + Custom SSE URL (optional) + + Returns + ------- + Any + Service call result + """ + mcp_request = { + "tool_name": tool_name, + "arguments": arguments or {}, + "sse_url": sse_url or self.sse_url + } + + # Call pre-registered activity (use yield in orchestrator) + result = yield context.call_activity("mcp_call_service", mcp_request) + return result \ No newline at end of file diff --git a/azurefunctions-agents-durable/azurefunctions/agents/durable/client.py b/azurefunctions-agents-durable/azurefunctions/agents/durable/client.py new file mode 100644 index 0000000..e69de29 diff --git a/azurefunctions-agents-durable/azurefunctions/agents/durable/decorators.py b/azurefunctions-agents-durable/azurefunctions/agents/durable/decorators.py new file mode 100644 index 0000000..95f0be9 --- /dev/null +++ b/azurefunctions-agents-durable/azurefunctions/agents/durable/decorators.py @@ -0,0 +1,151 @@ +"""Decorators for the Durable Functions Agents framework.""" + +import functools +import logging +from typing import Callable, Any, Optional +from azure.durable_functions import DurableOrchestrationContext + +from .framework import DFAgentFramework, AgentCaller + + +logger = logging.getLogger(__name__) + + +def orchestrator(framework: DFAgentFramework, name: Optional[str] = None, agent_caller: Optional[AgentCaller] = None): + """Decorator that provides activity trigger-less notation for orchestrators. + + This decorator automatically injects an AgentCaller instance into the orchestrator + function, allowing direct agent calls without explicit activity triggers. + It also registers the orchestrator function with the framework's app. + + Args: + framework: The DFAgentFramework instance + name: Optional name for the orchestrator function + agent_caller: Optional pre-configured AgentCaller instance + + Example: + @orchestrator(framework, name="my_orchestrator") + async def my_orchestrator(context: DurableOrchestrationContext, agents: AgentCaller): + # Call agents directly using yield (not await) in orchestrator context + result = yield agents.call_agent(context, "my_agent", "my_method", {"arg1": "value1"}) + yield result # Use yield to return final result in async generator + """ + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def wrapper(context: DurableOrchestrationContext, *args, **kwargs) -> Any: + # Create or use provided agent caller + caller = agent_caller if agent_caller is not None else framework.create_agent_caller() + + # Call the original function with the agent caller + try: + # For orchestrator functions, we need to handle both regular functions + # and async generators (when yield is used) + result = func(context, caller, *args, **kwargs) + + # If it's an async generator, we need to handle it properly + if hasattr(result, '__aiter__'): + # It's an async generator - iterate through it + final_result = None + async for value in result: + final_result = value + return final_result + else: + # It's a regular async function + return await result + + except Exception as e: + logger.exception(f"Error in orchestrator {func.__name__}") + raise + + # Register the orchestrator with the framework's app + orchestrator_name = name or func.__name__ + framework.app.orchestration_trigger(arg_name="context", orchestrator_name=orchestrator_name)(wrapper) + + return wrapper + return decorator + + +def activity_with_agent_support(framework: DFAgentFramework): + """Decorator for activity functions that need to call agents. + + This decorator provides agent calling capabilities to activity functions, + which is useful for activities that need to make agent calls. + + Args: + framework: The DFAgentFramework instance + + Example: + @activity_with_agent_support(framework) + async def my_activity(context, agents: AgentCaller, data): + result = await agents.call("my_agent", "process", data) + return result + """ + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def wrapper(*args, **kwargs) -> Any: + # Create agent caller + caller = framework.create_agent_caller() + + # Inject the agent caller as the second parameter + # First parameter is typically the activity context + if args: + new_args = (args[0], caller) + args[1:] + else: + new_args = (caller,) + + try: + return await func(*new_args, **kwargs) + except Exception as e: + logger.exception(f"Error in activity {func.__name__}") + raise + + return wrapper + return decorator + + +def register_orchestrator_with_agents(app, framework: DFAgentFramework, orchestrator_name: Optional[str] = None): + """Decorator that combines orchestrator registration with agent support. + + This is a convenience decorator that both registers the function as a Durable Functions + orchestrator and provides agent calling capabilities. + + Args: + app: The FunctionApp instance + framework: The DFAgentFramework instance + orchestrator_name: Optional custom name for the orchestrator + + Example: + @register_orchestrator_with_agents(app, framework) + async def my_orchestrator(context: DurableOrchestrationContext, agents: AgentCaller): + # Use yield for orchestrator calls, not await + result = yield agents.call_agent(context, "my_agent", "my_method") + yield result # Use yield to return final result + """ + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def wrapper(context: DurableOrchestrationContext, *args, **kwargs) -> Any: + # Create agent caller + caller = framework.create_agent_caller() + + try: + # Call the original function with the agent caller + result = func(context, caller, *args, **kwargs) + + # Handle async generators (when yield is used in orchestrators) + if hasattr(result, '__aiter__'): + final_result = None + async for value in result: + final_result = value + return final_result + else: + return await result + + except Exception as e: + logger.exception(f"Error in orchestrator {func.__name__}") + raise + + # Register with Durable Functions + name = orchestrator_name or func.__name__ + return app.orchestration_trigger(arg_name="context", orchestrator_name=name)(wrapper) + + return decorator \ No newline at end of file diff --git a/azurefunctions-agents-durable/azurefunctions/agents/durable/framework.py b/azurefunctions-agents-durable/azurefunctions/agents/durable/framework.py new file mode 100644 index 0000000..05ee857 --- /dev/null +++ b/azurefunctions-agents-durable/azurefunctions/agents/durable/framework.py @@ -0,0 +1,180 @@ +"""Core framework for Durable Functions Agents.""" + +import json +import logging +from typing import Dict, List, Any, Optional, Union +from azure.functions import FunctionApp +import azure.durable_functions as df + +from .types import AgentConfig, CallMode +from .call_modes import ( + BaseAgentCaller, + HttpAgentCaller, + MCPAgentCaller, + A2ATaskAgentCaller, + A2ASyncAgentCaller +) + + +logger = logging.getLogger(__name__) + + +class DFAgentFramework: + """Framework for managing agent calls in Durable Functions.""" + + def __init__(self, app: Union[FunctionApp, df.DFApp]): + self.app = app + self.agents: Dict[str, BaseAgentCaller] = {} + self._caller_classes = { + CallMode.HTTP: HttpAgentCaller, + CallMode.MCP: MCPAgentCaller, + CallMode.A2A_TASK: A2ATaskAgentCaller, + CallMode.A2A_SYNC: A2ASyncAgentCaller + } + + def register_agent(self, config: AgentConfig) -> None: + """Register an agent with the framework.""" + if config.name in self.agents: + logger.warning(f"Agent '{config.name}' is already registered. Overwriting.") + + # Create the appropriate caller based on call mode + caller_class = self._caller_classes.get(config.call_mode) + if not caller_class: + raise ValueError(f"Unsupported call mode: {config.call_mode}") + + # Create caller instance + caller = caller_class(config, self.app) + + # Register the caller's activity functions + caller.register_activities() + + # Store the caller + self.agents[config.name] = caller + + logger.info(f"Registered agent '{config.name}' with call mode '{config.call_mode.value}'") + + def register_agents_from_config(self, config_data: List[Dict[str, Any]]) -> None: + """Register multiple agents from configuration data.""" + for agent_data in config_data: + config = AgentConfig.from_dict(agent_data) + self.register_agent(config) + + def register_agents_from_file(self, config_file_path: str) -> None: + """Register agents from a JSON configuration file.""" + try: + with open(config_file_path, 'r') as f: + config_data = json.load(f) + + agents_config = config_data.get("agents", []) + self.register_agents_from_config(agents_config) + + except FileNotFoundError: + raise FileNotFoundError(f"Configuration file not found: {config_file_path}") + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in configuration file: {e}") + + def get_agent(self, agent_name: str) -> Optional[BaseAgentCaller]: + """Get a registered agent caller by name.""" + return self.agents.get(agent_name) + + def list_agents(self) -> List[str]: + """List all registered agent names.""" + return list(self.agents.keys()) + + def create_agent_caller(self) -> "AgentCaller": + """Create an agent caller for use in orchestrators.""" + return AgentCaller(self) + + def create_mcp_client_helper(self, sse_url: str = "http://localhost:7071/runtime/webhooks/mcp/sse"): + """Create an MCP client helper with the exact prototype pattern from mcp_client.py. + + Args: + sse_url: MCP SSE endpoint URL + + Returns: + MCPClientHelper instance with prototype pattern compatibility + """ + # Import here to avoid circular imports + from .call_modes.mcp_caller import MCPClientHelper + + # Ensure we have a DFApp for the helper + if hasattr(self.app, '__class__') and 'DFApp' in str(self.app.__class__): + return MCPClientHelper(self.app, sse_url) + else: + # If we have a FunctionApp, we need to wrap it or convert it + # For now, assume direct compatibility + return MCPClientHelper(self.app, sse_url) + + +class AgentCaller: + """Agent caller interface for use within orchestrators.""" + def __init__(self, framework: DFAgentFramework): + self.framework = framework + + def call_agent(self, context: df.DurableOrchestrationContext, agent_name: str, method: str, + args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any: + """Call an agent method using the orchestrator context. + + Args: + context: Durable orchestrator context (required for this framework) + agent_name: Name of the agent to call + method: Method name to call on the agent + args: Positional arguments for the method + kwargs: Keyword arguments for the method + + Returns: + The result from the agent call + + Raises: + ValueError: If the agent is not found + Exception: If the agent call fails + """ + agent = self.framework.get_agent(agent_name) + if not agent: + raise ValueError(f"Agent '{agent_name}' not found. Available agents: {self.framework.list_agents()}") + # Use the unified call_agent method with context support + return agent.call_agent(context, method, args, kwargs) + + def call_service(self, context: df.DurableOrchestrationContext, agent_name: str, method: str, + args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any: + """Alias for call_agent - matches MCPClientHelper pattern.""" + return self.call_agent(context, agent_name, method, args, kwargs) + + # Orchestrator-compatible convenience methods + def call_http_agent(self, context: df.DurableOrchestrationContext, agent_name: str, method: str, + args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any: + """Convenience method for calling HTTP agents.""" + return self.call_agent(context, agent_name, method, args, kwargs) + + def call_mcp_agent(self, context: df.DurableOrchestrationContext, agent_name: str, method: str, + args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any: + """Convenience method for calling MCP agents.""" + return self.call_agent(context, agent_name, method, args, kwargs) + + def call_a2a_task_agent(self, context: df.DurableOrchestrationContext, agent_name: str, method: str, + args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any: + """Convenience method for calling A2A task agents.""" + return self.call_agent(context, agent_name, method, args, kwargs) + + def call_a2a_sync_agent(self, context: df.DurableOrchestrationContext, agent_name: str, method: str, + args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any: + """Convenience method for calling A2A sync agents.""" + return self.call_agent(context, agent_name, method, args, kwargs) + + # MCP-specific convenience methods + def list_mcp_tools(self, context: df.DurableOrchestrationContext, agent_name: str) -> Any: + """List tools available on an MCP agent.""" + return self.call_agent(context, agent_name, "list_tools") + + def call_mcp_tool(self, context: df.DurableOrchestrationContext, agent_name: str, tool_name: str, + arguments: Dict[str, Any] = None) -> Any: + """Call a specific tool on an MCP agent.""" + return self.call_agent(context, agent_name, "call_tool", {"name": tool_name, "arguments": arguments or {}}) + + def list_mcp_resources(self, context: df.DurableOrchestrationContext, agent_name: str) -> Any: + """List resources available on an MCP agent.""" + return self.call_agent(context, agent_name, "list_resources") + + def read_mcp_resource(self, context: df.DurableOrchestrationContext, agent_name: str, uri: str) -> Any: + """Read a specific resource from an MCP agent.""" + return self.call_agent(context, agent_name, "read_resource", {"uri": uri}) \ No newline at end of file diff --git a/azurefunctions-agents-durable/azurefunctions/agents/durable/types.py b/azurefunctions-agents-durable/azurefunctions/agents/durable/types.py new file mode 100644 index 0000000..d7f7e7a --- /dev/null +++ b/azurefunctions-agents-durable/azurefunctions/agents/durable/types.py @@ -0,0 +1,64 @@ +"""Core types for the Durable Functions Agents framework.""" + +from enum import Enum +from typing import Any, Dict, Optional, Union +from dataclasses import dataclass + + +class CallMode(Enum): + """Supported agent call modes.""" + HTTP = "http" + MCP = "mcp" + A2A_TASK = "a2a_task" + A2A_SYNC = "a2a_sync" + + +@dataclass +class AgentConfig: + """Configuration for an agent.""" + name: str + call_mode: CallMode + endpoint: Optional[str] = None + auth_token: Optional[str] = None + client_type: Optional[str] = None # For MCP: "stdio" or "sse" + extra_config: Optional[Dict[str, Any]] = None + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "AgentConfig": + """Create AgentConfig from dictionary.""" + call_mode = CallMode(data["call_mode"]) if isinstance(data["call_mode"], str) else data["call_mode"] + return cls( + name=data["name"], + call_mode=call_mode, + endpoint=data.get("endpoint"), + auth_token=data.get("auth_token"), + client_type=data.get("client_type"), + extra_config=data.get("extra_config") + ) + + +@dataclass +class AgentCallRequest: + """Request for calling an agent.""" + agent_name: str + method: str + args: Optional[Dict[str, Any]] = None + kwargs: Optional[Dict[str, Any]] = None + + +@dataclass +class AgentCallResponse: + """Response from calling an agent.""" + success: bool + result: Optional[Any] = None + error: Optional[str] = None + + @classmethod + def success_response(cls, result: Any) -> "AgentCallResponse": + """Create a successful response.""" + return cls(success=True, result=result) + + @classmethod + def error_response(cls, error: str) -> "AgentCallResponse": + """Create an error response.""" + return cls(success=False, error=error) \ No newline at end of file diff --git a/azurefunctions-agents-durable/pyproject.toml b/azurefunctions-agents-durable/pyproject.toml new file mode 100644 index 0000000..ceceb8d --- /dev/null +++ b/azurefunctions-agents-durable/pyproject.toml @@ -0,0 +1,89 @@ +[build-system] +requires = ["setuptools >= 61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "azurefunctions-agents-durable" +dynamic = ["version"] +requires-python = ">=3.10" +authors = [{ name = "Azure Functions team at Microsoft Corp.", email = "azurefunctions@microsoft.com"}] +description = "Durable Functions extension for Azure Functions Agents Framework." +readme = "README.md" +license = {text = "MIT License"} +classifiers= [ + 'License :: OSI Approved :: MIT License', + 'Intended Audience :: Developers', + 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', + 'Programming Language :: Python :: 3.13', + 'Operating System :: Microsoft :: Windows', + 'Operating System :: POSIX', + 'Operating System :: MacOS :: MacOS X', + 'Environment :: Web Environment', + 'Development Status :: 5 - Production/Stable', + ] +dependencies = [ + 'azure-functions>=1.19.0', + 'azure-functions-durable>=1.2.9', + 'aiohttp>=3.8.0' + ] + +[project.optional-dependencies] +mcp = [ + 'mcp>=1.0.0', +] +test = [ + 'pytest>=7.0.0', + 'pytest-asyncio>=0.21.0', +] +dev = [ + 'flake8', + 'mypy', + 'pytest', + 'pytest-cov', + 'coverage', + 'pytest-instafail', + 'pre-commit' +] +all = [ + 'mcp>=1.0.0', + 'pytest>=7.0.0', + 'pytest-asyncio>=0.21.0', + 'flake8', + 'mypy', + 'pytest-cov', + 'coverage', +] + +[tool.setuptools.dynamic] +version = {attr = "azurefunctions.agents.durable.__version__"} + +[tool.setuptools.packages.find] +exclude = [ + 'azurefunctions.agents','azurefunctions', 'tests', 'samples' + ] + +[tool.black] +line-length = 88 +target-version = ['py38'] +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.mypy_cache + | \.venv + | build + | dist +)/ +''' + +[tool.isort] +profile = "black" +line_length = 88 +known_first_party = ["azurefunctions"] + +[dependency-groups] +dev = [ + "pytest>=8.4.0", +] diff --git a/azurefunctions-agents-durable/samples/README.md b/azurefunctions-agents-durable/samples/README.md new file mode 100644 index 0000000..6915af5 --- /dev/null +++ b/azurefunctions-agents-durable/samples/README.md @@ -0,0 +1,33 @@ +# Combined Agents with Durable Orchestrator Samples + +This directory contains samples demonstrating how to use the Azure Functions Agents Durable framework to create and call different types of agents from Durable Functions orchestrators. + +## Samples + +### [Combined Sample](./combined_sample) + +A comprehensive sample that demonstrates how to: +- Create simple HTTP endpoints as an HTTP agent +- Create MCP tools in the same function app +- Call both agent types from a single Durable Functions orchestrator +- Use the new context-first parameter pattern + +This is the recommended starting point for understanding the framework. + +## Key Concepts + +The samples demonstrate several key concepts: + +1. **Agent Registration** - How to register HTTP and MCP agents with the framework +2. **Context-First Pattern** - The new API design requiring context as the first parameter +3. **Orchestrator Integration** - How to use agents inside durable orchestrators +4. **MCP Integration** - Working with Model Context Protocol tools + +## Getting Started + +Each sample includes: +- A README with detailed instructions +- A complete function app implementation +- Required configuration files + +Follow the README in each sample directory for specific setup and execution instructions. diff --git a/azurefunctions-agents-durable/samples/combined_sample/.gitignore b/azurefunctions-agents-durable/samples/combined_sample/.gitignore new file mode 100644 index 0000000..46cc430 --- /dev/null +++ b/azurefunctions-agents-durable/samples/combined_sample/.gitignore @@ -0,0 +1 @@ +local.settings.json diff --git a/azurefunctions-agents-durable/samples/combined_sample/README.md b/azurefunctions-agents-durable/samples/combined_sample/README.md new file mode 100644 index 0000000..78a4e4b --- /dev/null +++ b/azurefunctions-agents-durable/samples/combined_sample/README.md @@ -0,0 +1,97 @@ +# Combined Agents with Durable Orchestrator Sample + +This sample demonstrates how to use the Azure Functions Agents Durable framework to create and call both HTTP and MCP agents from a single Durable Functions orchestrator. + +## Sample Overview + +This combined sample includes: + +1. **HTTP Agent** - Simple Hello World functions exposed via HTTP endpoints +2. **MCP Server** - A Model Context Protocol server exposing tools +3. **Durable Orchestrator** - A Durable Functions orchestrator that calls both agents + +All three components are combined into a single function app for simplicity. + +## Prerequisites + +- Python 3.8 or later +- Azure Functions Core Tools v4 +- Azure Storage Emulator or Azure Storage Account + +## Setup & Installation + +1. Install the required packages: + +```bash +pip install -r requirements.txt +``` + +2. Configure your Azure OpenAI settings in `local.settings.json` if you're planning to use AI capabilities. + +3. Start the Function App: + +```bash +func start +``` + +## How It Works + +### HTTP Agent + +The sample includes two simple HTTP endpoints: + +- `/api/hello_world` - Returns "Hello, World!" +- `/api/hello_name` - Takes a name parameter and returns a personalized greeting + +These endpoints are registered as an HTTP agent named "hello_world_agent" in the Durable Agents framework. + +### MCP Server + +The sample includes an MCP tool: + +- `hello_mcp` - Takes a user name parameter and returns a greeting + +The MCP server is registered as an MCP agent named "mcp_agent" in the Durable Agents framework. + +### Durable Orchestrator + +The sample includes a durable orchestrator that: + +1. Calls the Hello World agent via HTTP +2. Calls the MCP agent via the MCP protocol +3. Also demonstrates calling an MCP tool directly via an activity function +4. Returns all responses in a combined result + +## Testing the Sample + +1. Start the orchestrator by sending a request: + +``` +GET http://localhost:7071/api/start_orchestration?name=YourName +``` + +2. Use the returned instance ID to check the status: + +``` +GET http://localhost:7071/api/orchestrator_status/{instance_id} +``` + +3. The orchestration result will include responses from both agents. + +## Key Concepts Demonstrated + +1. **Context-First Parameter Pattern** - All agent calls use the `context` parameter as the first argument +2. **Agent Registration** - Both HTTP and MCP agents are registered with the framework +3. **Multiple Agent Types** - A single orchestrator calls multiple agent types +4. **MCP Integration** - Direct and framework-based MCP tool calls + +## Code Structure + +- `function_app.py` - Contains all function code divided into sections + - Part 1: Hello World Agent Definition + - Part 2: MCP Server Definition + - Part 3: Durable Orchestrator + - Part 4: HTTP Triggers +- `host.json` - Function app configuration +- `local.settings.json` - Environment settings +- `requirements.txt` - Package dependencies diff --git a/azurefunctions-agents-durable/samples/combined_sample/function_app.py b/azurefunctions-agents-durable/samples/combined_sample/function_app.py new file mode 100644 index 0000000..7553f5c --- /dev/null +++ b/azurefunctions-agents-durable/samples/combined_sample/function_app.py @@ -0,0 +1,227 @@ +""" +Combined Sample: Durable Orchestrator calling both Hello World Agent and MCP Server +This demonstrates how to use the azure-functions-agents-durable framework to call different agent types +""" + +# Apply the import patch to fix azurefunctions.agents imports +import patch_imports +patch_imports.apply_patch() + +import json +import logging +import os +from typing import Dict, Any + +import azure.functions as func +from azure.functions import AuthLevel + +import azure.durable_functions as df +from azurefunctions.agents.durable import ( + DFAgentFramework, AgentCaller, AgentConfig, CallMode, orchestrator +) +from azurefunctions.agent import AgentFunctionApp, MCPServerStdio, LLMConfig, LLMProvider + +# Create the Function app and setup the agent framework +app = df.DFApp() +framework = DFAgentFramework(app) + +######################################### +# PART 1: Hello World Agent Definition +######################################### + +@app.route(route="hello_world", auth_level=func.AuthLevel.ANONYMOUS) +def hello_world_http(req: func.HttpRequest) -> func.HttpResponse: + """Simple HTTP endpoint that returns Hello, World!""" + return func.HttpResponse("Hello, World!") + +@app.route(route="hello_name", auth_level=func.AuthLevel.ANONYMOUS) +def hello_name_http(req: func.HttpRequest) -> func.HttpResponse: + """HTTP endpoint that greets a person by name""" + name = req.params.get('name', req.get_json().get('name', 'Anonymous') if req.get_body() else 'Anonymous') + return func.HttpResponse(f"Hello, {name}! Nice to meet you!") + +from azurefunctions.agents import AgentFunctionApp, MCPServerStdio + +# Create Git MCP server based on your JSON configuration +git_mcp_server = MCPServerStdio( + params={ + "command":"uvx", # Command from the JSON config + "args":["mcp-server-git"] # Args array from JSON + }, + name="git" # Server name from the JSON key +) + +# Create the Hello World Agent Function App + +# Configure LLM for conversational AI +llm_config = LLMConfig( + provider=LLMProvider.AzureOpenAI, + model_name="gpt-4o", # Using a cost-effective model + temperature=0.7, + max_tokens=1000, + # API key will be read from OPENAI_API_KEY environment variable + # Or you can set it explicitly: api_key="your-api-key-here" +) + +# TODO What is the agent_mode? +helloAgent = AgentFunctionApp( + name="hello_world_agent", + instructions="Say hello to the user", + mcp_servers=[git_mcp_server], # Conditional MCP + http_auth_level=AuthLevel.ANONYMOUS, # For easier testing - change for production + llm_config=llm_config, + enable_conversational_agent=True, + version="1.0.0", + description="A helpful weather assistant agent that provides current conditions, forecasts, and weather" +) +# MCP tools are automatically available alongside regular function tools. + +@helloAgent.tool(name="hello_world") +async def hello_world(): + """ + Simple tool that responds with "Hello, World!". + + Args: + req: HTTP request object + + Returns: + HTTP response with "Hello, World!" + """ + return "Hello, World!" + +# Register the Hello World Agent (HTTP mode) +framework.register_agent(AgentConfig( + name="hello_world_agent", + call_mode=CallMode.HTTP, + endpoint="http://localhost:7071/api", # Will call the same function app for demo purposes + timeout=30 +)) + +######################################### +# PART 2: MCP Server Definition +######################################### + +from mcp.client import MCPClientHelper +from mcp.schema import ToolDefinition, ParameterDefinition + +# Create MCP client helper for this app +mcp = MCPClientHelper(app) + +@app.generic_trigger( + arg_name="context", + type="mcpToolTrigger", + toolName="hello_mcp", + description="Say hello to a user through MCP", + toolProperties=json.dumps([{ + "propertyName": "user_name", + "propertyType": "string", + "description": "The name of the user." + }]) +) +def hello_mcp(context) -> str: + """MCP tool that greets a user by name""" + content = json.loads(context) + arguments = content.get("arguments", {}) + user_name = arguments.get("user_name", "Anonymous") + return f"Hello {user_name}, I am MCPTool!" + +@app.activity_trigger(input_name="req") +def mcp_call_service(req: Dict[str, Any]) -> str: + """Activity to call MCP service""" + tool_name = req.get("tool_name", "") + arguments = req.get("arguments", {}) + + if tool_name == "hello_mcp": + user_name = arguments.get("user_name", "Anonymous") + return f"Hello {user_name}, I am MCPTool!" + else: + return f"Unknown tool: {tool_name}" + +# Register the MCP agent +framework.register_agent(AgentConfig( + name="mcp_agent", + call_mode=CallMode.MCP, + timeout=30, + extra_config={ + "client_type": "stdio", + "server_command": ["python", "-m", "mcp.server"] # Uses the mcp.server module as a fallback + } +)) + +######################################### +# PART 3: Durable Orchestrator +######################################### + +@orchestrator(framework) +def multi_agent_orchestrator(context: df.DurableOrchestrationContext, agents: AgentCaller): + """ + Orchestrator function that calls both HTTP agent and MCP server + + Args: + context: Durable orchestration context + agents: Agent caller provided by the framework + + Returns: + Object containing responses from both agents + """ + # Get input parameters or use defaults + input_data = context.get_input() or {} + name = input_data.get("name", "Durable Orchestrator") + + # Call the Hello World agent over HTTP + logging.info("Calling Hello World agent via HTTP...") + hello_name_response = yield agents.call_http_agent( + context, + "hello_world_agent", + "hello_name", + {"name": name} + ) + + # Call the MCP tool using the agent framework + logging.info("Calling MCP agent...") + mcp_response = yield agents.call_mcp_tool( + context, + "mcp_agent", + "hello_mcp", + {"user_name": name} + ) + + # Return all responses + return { + "hello_name_response": hello_name_response, + "mcp_response": mcp_response + } + +######################################### +# PART 4: HTTP Triggers +######################################### + +@app.route(route="start_orchestration", auth_level=func.AuthLevel.ANONYMOUS) +@app.durable_client_input(client_name="client") +async def start_orchestration(req: func.HttpRequest, client: df.DurableOrchestrationClient): + """HTTP trigger to start the orchestrator""" + # Get name from query params, body, or use default + try: + body = req.get_json() if req.get_body() else {} + except ValueError: + body = {} + + name = req.params.get("name", body.get("name", "Anonymous")) + + instance_id = await client.start_new( + "multi_agent_orchestrator", + client_input={"name": name} + ) + + return client.create_check_status_response(req, instance_id) + +@app.route(route="orchestrator_status/{instance_id}", auth_level=func.AuthLevel.ANONYMOUS) +@app.durable_client_input(client_name="client") +async def get_status(req: func.HttpRequest, client: df.DurableOrchestrationClient, instance_id: str): + """HTTP trigger to check orchestrator status""" + status = await client.get_status(instance_id) + + return func.HttpResponse( + body=json.dumps(status), + mimetype="application/json" + ) diff --git a/azurefunctions-agents-durable/samples/combined_sample/host.json b/azurefunctions-agents-durable/samples/combined_sample/host.json new file mode 100644 index 0000000..0c4b3c2 --- /dev/null +++ b/azurefunctions-agents-durable/samples/combined_sample/host.json @@ -0,0 +1,29 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + } + } + }, + "extensions": { + "durableTask": { + "hubName": "DurableAgentsHub", + "storageProvider": { + "connectionStringName": "AzureWebJobsStorage" + } + }, + "http": { + "routePrefix": "api", + "maxOutstandingRequests": 200, + "maxConcurrentRequests": 100, + "dynamicThrottlesEnabled": true + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} diff --git a/azurefunctions-agents-durable/samples/combined_sample/local.settings.json b/azurefunctions-agents-durable/samples/combined_sample/local.settings.json new file mode 100644 index 0000000..eae9160 --- /dev/null +++ b/azurefunctions-agents-durable/samples/combined_sample/local.settings.json @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "FUNCTIONS_WORKER_RUNTIME": "python", + "AZURE_OPENAI_ENDPOINT": "https://your-azure-openai-endpoint.openai.azure.com/", + "AZURE_OPENAI_API_KEY": "your-azure-openai-api-key", + "AZURE_OPENAI_DEPLOYMENT_NAME": "your-model-deployment-name" + } +} diff --git a/azurefunctions-agents-durable/samples/combined_sample/local.settings.template.json b/azurefunctions-agents-durable/samples/combined_sample/local.settings.template.json new file mode 100644 index 0000000..d4921bf --- /dev/null +++ b/azurefunctions-agents-durable/samples/combined_sample/local.settings.template.json @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "FUNCTIONS_WORKER_RUNTIME": "python", + "AZURE_OPENAI_ENDPOINT": "", + "AZURE_OPENAI_API_KEY": "", + "AZURE_OPENAI_DEPLOYMENT_NAME": "" + } +} diff --git a/azurefunctions-agents-durable/samples/combined_sample/requirements.txt b/azurefunctions-agents-durable/samples/combined_sample/requirements.txt new file mode 100644 index 0000000..dcc0d4c --- /dev/null +++ b/azurefunctions-agents-durable/samples/combined_sample/requirements.txt @@ -0,0 +1,8 @@ +# /workspaces/azure-functions-python-extensions/azurefunctions-agents-durable/samples/combined_sample/requirements.txt +azure-functions +azure-functions-durable +mcp[cli]>=1.6.0 + +# Local development packages are installed in editable mode via the setup script + -e /workspaces/azure-functions-python-extensions/azurefunctions-agents-framework + -e /workspaces/azure-functions-python-extensions/azurefunctions-agents-durable \ No newline at end of file diff --git a/azurefunctions-agents-durable/tests/__init__.py b/azurefunctions-agents-durable/tests/__init__.py new file mode 100644 index 0000000..e69de29