Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Based on Azure Functions Python image with Core Tools
# Based on Azure Functions Python image with Core Tools
FROM mcr.microsoft.com/azure-functions/python:4-python3.11-core-tools

# Update and install dependencies
RUN apt-get update \
&& apt-get install -y \
curl \
npm \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

# Workaround for core tools hasn't support APT install for MCP version 4.0.7332-preview1 yet.
RUN npm uninstall -g azure-functions-core-tools-4 -y
# Install Azurite globally using npm
RUN npm install -g azurite
RUN npm install -g azure-functions-core-tools@4.0.7332-preview1

# Set up Azure Functions environment variables
ENV AzureFunctionsJobHost__Logging__Console__IsEnabled=true

# Expose ports
EXPOSE 7071

24 changes: 24 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "Azure Functions & Python 3",
"dockerFile": "Dockerfile",
"forwardPorts": [ 7071 ],

// Configure tool-specific properties.
"customizations": {
// Configure properties specific to VS Code.
"vscode": {
// Add the IDs of extensions you want installed when the container is created.
"extensions": [
"ms-azuretools.vscode-azurefunctions",
"ms-azuretools.vscode-docker",
"ms-python.python"
]
}
},

// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "npm install",

// Set `remoteUser` to `root` to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
"remoteUser": "vscode"
}
4 changes: 4 additions & 0 deletions .gitattribute
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

* text=auto eol=lf
*.{cmd,[cC][mM][dD]} text eol=crlf
*.{bat,[bB][aA][tT]} text eol=crlf
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ The libraries are supported on Function Apps using Python 3.9 or later. For more

## SDK Support
* Azure Storage Blob
* Azure Functions Agents Framework
* Azure Functions Agents Durable Extension

## Need help?

Expand Down
Empty file.
2 changes: 2 additions & 0 deletions azurefunctions-agents-durable/azurefunctions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Azure Functions Agents Durable Package.

This package provides a framework for calling agents from Durable Functions orchestrators
without needing explicit activity triggers. It supports multiple communication modes
including HTTP, MCP (Model Context Protocol), and Agent-to-Agent communication.

Key Features:
- Activity trigger-less notation for orchestrators
- Support for HTTP, MCP, A2A_TASK, and A2A_SYNC call modes
- Enhanced MCP support with both stdio and SSE clients
- Decorator-based framework for easy integration
- Configuration-driven agent management

Example Usage:
from azure.functions import FunctionApp
from azurefunctions.agents.durable import DFAgentFramework, orchestrator, AgentConfig, CallMode

app = FunctionApp()
framework = DFAgentFramework(app)

# Register agents
framework.register_agent(AgentConfig(
name="my_mcp_agent",
call_mode=CallMode.MCP,
client_type="stdio",
extra_config={
"command": "python",
"args": ["-m", "my_mcp_server"]
}
))

@orchestrator(framework)
async def my_orchestrator(context, agents):
tools = await agents.list_mcp_tools("my_mcp_agent")
result = await agents.call_mcp_tool("my_mcp_agent", "my_tool", {"arg": "value"})
return result
"""

from .types import CallMode, AgentConfig, AgentCallRequest, AgentCallResponse
from .framework import DFAgentFramework, AgentCaller
from .decorators import orchestrator, activity_with_agent_support, register_orchestrator_with_agents
from .call_modes import (
BaseAgentCaller,
HttpAgentCaller,
MCPAgentCaller,
A2ATaskAgentCaller,
A2ASyncAgentCaller
)
from .call_modes.mcp_caller import MCPClientHelper

__version__ = "0.1.0"

__all__ = [
# Core types
"CallMode",
"AgentConfig",
"AgentCallRequest",
"AgentCallResponse",

# Framework classes
"DFAgentFramework",
"AgentCaller",

# Decorators
"orchestrator",
"activity_with_agent_support",
"register_orchestrator_with_agents",

# MCP Helper
"MCPClientHelper",
"MCPAgentCaller",

# Caller implementations
"BaseAgentCaller",
"HttpAgentCaller",
"MCPAgentCaller",
"A2ATaskAgentCaller",
"A2ASyncAgentCaller",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Call mode implementations for the Durable Functions Agents framework."""

from .base import BaseAgentCaller
from .http_caller import HttpAgentCaller
from .mcp_caller import MCPAgentCaller
from .a2a_caller import A2ATaskAgentCaller, A2ASyncAgentCaller

__all__ = [
"BaseAgentCaller",
"HttpAgentCaller",
"MCPAgentCaller",
"A2ATaskAgentCaller",
"A2ASyncAgentCaller"
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""Agent-to-Agent (A2A) caller implementations."""

import json
import logging
from typing import Any, Dict, Optional, Union
import azure.functions as func
import azure.durable_functions as df
from azure.durable_functions import DurableOrchestrationClient

from .base import BaseAgentCaller
from ..types import AgentCallResponse


logger = logging.getLogger(__name__)


class A2ATaskAgentCaller(BaseAgentCaller):
"""Agent caller for task-based A2A communication."""
def register_activities(self) -> None:
"""Register A2A task activity function."""
activity_name = self.get_activity_name("default")

@self.app.activity_trigger(arg_name="req", activity_name=activity_name)
async def a2a_task_activity(req: str) -> str:
"""Activity function for A2A task agent calls."""
try:
request_data = json.loads(req)
response = await self._execute_a2a_task_call(
request_data["method"],
request_data.get("args", {}),
request_data.get("kwargs", {})
)
return json.dumps(response.__dict__)
except Exception as e:
logger.exception(f"Error in A2A task activity for {self.config.name}")
error_response = AgentCallResponse.error_response(str(e))
return json.dumps(error_response.__dict__)

def get_activity_name(self, method: str) -> str:
"""Get the activity name for A2A task calls."""
return f"call_a2a_task_agent_{self.config.name}"

def call_agent(self, context: df.DurableOrchestrationContext, method: str,
args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any:
"""Call the A2A task agent using the orchestrator context.

Args:
context: Durable orchestrator context (required)
method: The method to call on the agent
args: Positional arguments for the method
kwargs: Keyword arguments for the method

Returns:
Result from the A2A task agent call
"""
args = args or {}
kwargs = kwargs or {}

if context is None:
raise ValueError("context parameter is required for durable orchestrator framework")

# Use activity pattern with orchestrator context
activity_name = self.get_activity_name("default")
request_data = {
"method": method,
"args": args,
"kwargs": kwargs
}
return context.call_activity(activity_name, json.dumps(request_data))

async def _execute_a2a_task_call(self, method: str, args: Dict[str, Any], kwargs: Dict[str, Any]) -> AgentCallResponse:
"""Execute A2A task call by starting a new orchestration."""
try:
# Extract orchestration function name from config
extra_config = self.config.extra_config or {}
orchestration_name = extra_config.get("orchestration_name", "agent_orchestrator")

# Prepare the input for the target orchestration
orchestration_input = {
"method": method,
"args": args,
"kwargs": kwargs,
"agent_name": self.config.name
}

# This would typically be called from an orchestrator context
# For now, we'll simulate the call
# In a real scenario, you'd use:
# client = DurableOrchestrationClient(...)
# instance_id = await client.start_new(orchestration_name, None, orchestration_input)
# result = await client.wait_for_completion_or_create_check_status_response(instance_id, timeout_in_milliseconds=30000)

# Simulated response for demonstration
return AgentCallResponse.success_response({
"message": f"A2A task call to {self.config.name}.{method} initiated",
"orchestration_name": orchestration_name,
"input": orchestration_input
})

except Exception as e:
logger.exception(f"Error executing A2A task call for {self.config.name}")
return AgentCallResponse.error_response(str(e))


class A2ASyncAgentCaller(BaseAgentCaller):
"""Agent caller for synchronous A2A communication."""
def register_activities(self) -> None:
"""Register A2A sync activity function."""
activity_name = self.get_activity_name("default")

@self.app.activity_trigger(arg_name="req", activity_name=activity_name)
async def a2a_sync_activity(req: str) -> str:
"""Activity function for A2A sync agent calls."""
try:
request_data = json.loads(req)
response = await self._execute_a2a_sync_call(
request_data["method"],
request_data.get("args", {}),
request_data.get("kwargs", {})
)
return json.dumps(response.__dict__)
except Exception as e:
logger.exception(f"Error in A2A sync activity for {self.config.name}")
error_response = AgentCallResponse.error_response(str(e))
return json.dumps(error_response.__dict__)

def get_activity_name(self, method: str) -> str:
"""Get the activity name for A2A sync calls."""
return f"call_a2a_sync_agent_{self.config.name}"

def call_agent(self, context: df.DurableOrchestrationContext, method: str,
args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any:
"""Call the A2A sync agent using the orchestrator context.

Args:
context: Durable orchestrator context (required)
method: The method to call on the agent
args: Positional arguments for the method
kwargs: Keyword arguments for the method

Returns:
Result from the A2A sync agent call
"""
args = args or {}
kwargs = kwargs or {}

if context is None:
raise ValueError("context parameter is required for durable orchestrator framework")

# Use activity pattern with orchestrator context
activity_name = self.get_activity_name("default")
request_data = {
"method": method,
"args": args,
"kwargs": kwargs
}
result = yield context.call_activity(activity_name, json.dumps(request_data))
return json.loads(result)

async def _execute_a2a_sync_call(self, method: str, args: Dict[str, Any], kwargs: Dict[str, Any]) -> AgentCallResponse:
"""Execute synchronous A2A call through sub-orchestration."""
try:
# Extract sub-orchestration details from config
extra_config = self.config.extra_config or {}
sub_orchestration_name = extra_config.get("sub_orchestration_name", "agent_sub_orchestrator")

# Prepare the input for the sub-orchestration
sub_orchestration_input = {
"method": method,
"args": args,
"kwargs": kwargs,
"agent_name": self.config.name
}

# This would typically be called from an orchestrator context using:
# result = await context.call_sub_orchestrator(sub_orchestration_name, sub_orchestration_input)

# Simulated response for demonstration
return AgentCallResponse.success_response({
"message": f"A2A sync call to {self.config.name}.{method} completed",
"sub_orchestration_name": sub_orchestration_name,
"input": sub_orchestration_input
})

except Exception as e:
logger.exception(f"Error executing A2A sync call for {self.config.name}")
return AgentCallResponse.error_response(str(e))
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Base interface for agent callers."""

from abc import ABC, abstractmethod
from typing import Any, Dict, Union, Optional
from azure.functions import FunctionApp
import azure.durable_functions as df

from ..types import AgentConfig, AgentCallResponse


class BaseAgentCaller(ABC):
"""Base class for all agent callers."""

def __init__(self, config: AgentConfig, app: Union[FunctionApp, df.DFApp]):
self.config = config
self.app = app @abstractmethod
def call_agent(self, context: df.DurableOrchestrationContext, method: str,
args: Dict[str, Any] = None, kwargs: Dict[str, Any] = None) -> Any:
"""Call the agent with the specified method and parameters.

Args:
context: Durable orchestrator context (required for this framework)
method: The method to call on the agent
args: Positional arguments for the method
kwargs: Keyword arguments for the method

Returns:
Result from the agent call through the activity pattern
"""
pass

@abstractmethod
def register_activities(self) -> None:
"""Register any required activity functions for this caller."""
pass

def get_activity_name(self, method: str) -> str:
"""Get the activity name for a specific method. Override in subclasses if needed."""
# Default pattern: call_{call_mode}_{agent_name}
call_mode = self.config.call_mode.value.lower()
return f"call_{call_mode}_agent_{self.config.name}"
Loading
Loading