Skip to content

Commit 0d0d3a3

Browse files
author
AgentPatterns
committed
feat(examples/python): add runnable example
1 parent 8f34496 commit 0d0d3a3

6 files changed

Lines changed: 575 additions & 0 deletions

File tree

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
📖 Based on article:
2+
3+
EN:
4+
https://agentpatterns.tech/en/agent-patterns/task-decomposition-agent
5+
6+
DE:
7+
https://agentpatterns.tech/de/agent-patterns/task-decomposition-agent
8+
9+
FR:
10+
https://agentpatterns.tech/fr/agent-patterns/task-decomposition-agent
11+
12+
ES:
13+
https://agentpatterns.tech/es/agent-patterns/task-decomposition-agent
14+
15+
UK:
16+
https://agentpatterns.tech/uk/agent-patterns/task-decomposition-agent
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
from __future__ import annotations
2+
3+
import hashlib
4+
import json
5+
from dataclasses import dataclass
6+
from typing import Any, Callable
7+
8+
9+
class StopRun(Exception):
10+
def __init__(self, reason: str):
11+
super().__init__(reason)
12+
self.reason = reason
13+
14+
15+
@dataclass(frozen=True)
16+
class Budget:
17+
max_plan_steps: int = 6
18+
max_execute_steps: int = 8
19+
max_tool_calls: int = 8
20+
max_seconds: int = 60
21+
22+
23+
def _stable_json(value: Any) -> str:
24+
if value is None or isinstance(value, (bool, int, float, str)):
25+
return json.dumps(value, ensure_ascii=True, sort_keys=True)
26+
if isinstance(value, list):
27+
return "[" + ",".join(_stable_json(item) for item in value) + "]"
28+
if isinstance(value, dict):
29+
parts = []
30+
for key in sorted(value):
31+
parts.append(
32+
json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key])
33+
)
34+
return "{" + ",".join(parts) + "}"
35+
return json.dumps(str(value), ensure_ascii=True)
36+
37+
38+
def args_hash(args: dict[str, Any]) -> str:
39+
raw = _stable_json(args or {})
40+
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
41+
42+
43+
def validate_plan_action(
44+
action: Any, *, max_plan_steps: int, allowed_tools: set[str]
45+
) -> list[dict[str, Any]]:
46+
if not isinstance(action, dict):
47+
raise StopRun("invalid_plan:not_object")
48+
49+
kind = action.get("kind")
50+
if kind == "invalid":
51+
raise StopRun("invalid_plan:non_json")
52+
if kind != "plan":
53+
raise StopRun("invalid_plan:bad_kind")
54+
55+
allowed_top_keys = {"kind", "steps"}
56+
if set(action.keys()) - allowed_top_keys:
57+
raise StopRun("invalid_plan:extra_keys")
58+
59+
steps = action.get("steps")
60+
if not isinstance(steps, list) or not steps:
61+
raise StopRun("invalid_plan:missing_steps")
62+
if len(steps) < 3:
63+
raise StopRun("invalid_plan:min_steps")
64+
if len(steps) > max_plan_steps:
65+
raise StopRun("invalid_plan:max_steps")
66+
67+
normalized: list[dict[str, Any]] = []
68+
seen_ids: set[str] = set()
69+
70+
for index, step in enumerate(steps, start=1):
71+
if not isinstance(step, dict):
72+
raise StopRun(f"invalid_plan:step_{index}_not_object")
73+
74+
allowed_step_keys = {"id", "title", "tool", "args"}
75+
if set(step.keys()) - allowed_step_keys:
76+
raise StopRun(f"invalid_plan:step_{index}_extra_keys")
77+
78+
step_id = step.get("id")
79+
if not isinstance(step_id, str) or not step_id.strip():
80+
raise StopRun(f"invalid_plan:step_{index}_missing_id")
81+
if step_id in seen_ids:
82+
raise StopRun("invalid_plan:duplicate_step_id")
83+
seen_ids.add(step_id)
84+
85+
title = step.get("title")
86+
if not isinstance(title, str) or not title.strip():
87+
raise StopRun(f"invalid_plan:step_{index}_missing_title")
88+
89+
tool = step.get("tool")
90+
if not isinstance(tool, str) or not tool.strip():
91+
raise StopRun(f"invalid_plan:step_{index}_missing_tool")
92+
tool = tool.strip()
93+
if tool not in allowed_tools:
94+
raise StopRun(f"invalid_plan:tool_not_allowed:{tool}")
95+
96+
args = step.get("args", {})
97+
if args is None:
98+
args = {}
99+
if not isinstance(args, dict):
100+
raise StopRun(f"invalid_plan:step_{index}_bad_args")
101+
102+
normalized.append(
103+
{
104+
"id": step_id.strip(),
105+
"title": title.strip(),
106+
"tool": tool,
107+
"args": args,
108+
}
109+
)
110+
111+
return normalized
112+
113+
114+
class ToolGateway:
115+
def __init__(
116+
self,
117+
*,
118+
allow: set[str],
119+
registry: dict[str, Callable[..., dict[str, Any]]],
120+
budget: Budget,
121+
):
122+
self.allow = set(allow)
123+
self.registry = registry
124+
self.budget = budget
125+
self.tool_calls = 0
126+
self.seen_calls: set[str] = set()
127+
128+
def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
129+
self.tool_calls += 1
130+
if self.tool_calls > self.budget.max_tool_calls:
131+
raise StopRun("max_tool_calls")
132+
133+
if name not in self.allow:
134+
raise StopRun(f"tool_denied:{name}")
135+
136+
tool = self.registry.get(name)
137+
if tool is None:
138+
raise StopRun(f"tool_missing:{name}")
139+
140+
signature = f"{name}:{args_hash(args)}"
141+
if signature in self.seen_calls:
142+
raise StopRun("loop_detected")
143+
self.seen_calls.add(signature)
144+
145+
try:
146+
return tool(**args)
147+
except TypeError as exc:
148+
raise StopRun(f"tool_bad_args:{name}") from exc
149+
except Exception as exc:
150+
raise StopRun(f"tool_error:{name}") from exc
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
from __future__ import annotations
2+
3+
import json
4+
import os
5+
from typing import Any
6+
7+
from openai import APIConnectionError, APITimeoutError, OpenAI
8+
9+
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
10+
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
11+
12+
13+
class LLMTimeout(Exception):
14+
pass
15+
16+
17+
class LLMEmpty(Exception):
18+
pass
19+
20+
21+
PLAN_SYSTEM_PROMPT = """
22+
You are a task decomposition planner.
23+
Return only one JSON object in this exact shape:
24+
{
25+
"kind": "plan",
26+
"steps": [
27+
{"id": "step_1", "title": "...", "tool": "...", "args": {...}}
28+
]
29+
}
30+
31+
Rules:
32+
- Create 3 to 6 steps.
33+
- Use only tools from available_tools.
34+
- Keep args minimal and valid.
35+
- Do not add extra keys.
36+
- Do not output markdown.
37+
""".strip()
38+
39+
FINAL_SYSTEM_PROMPT = """
40+
You are a reporting assistant.
41+
Write a short final summary in English for a US business audience.
42+
Include: manager name, month, gross sales (USD), refunds (USD), net sales (USD), refund rate (%), and key risk note.
43+
""".strip()
44+
45+
TOOL_CATALOG = [
46+
{
47+
"name": "get_manager_profile",
48+
"description": "Get manager profile by manager_id",
49+
"args": {"manager_id": "integer"},
50+
},
51+
{
52+
"name": "fetch_sales_data",
53+
"description": "Get daily gross sales for a month",
54+
"args": {"month": "string in YYYY-MM"},
55+
},
56+
{
57+
"name": "fetch_refund_data",
58+
"description": "Get daily refund values for a month",
59+
"args": {"month": "string in YYYY-MM"},
60+
},
61+
{
62+
"name": "calculate_monthly_kpis",
63+
"description": "Calculate gross/refunds/net/order KPIs for a month",
64+
"args": {"month": "string in YYYY-MM"},
65+
},
66+
{
67+
"name": "detect_risk_signals",
68+
"description": "Detect risk warnings for a month",
69+
"args": {"month": "string in YYYY-MM"},
70+
},
71+
]
72+
73+
74+
def _get_client() -> OpenAI:
75+
api_key = os.getenv("OPENAI_API_KEY")
76+
if not api_key:
77+
raise EnvironmentError(
78+
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
79+
)
80+
return OpenAI(api_key=api_key)
81+
82+
83+
def create_plan(goal: str, max_plan_steps: int) -> dict[str, Any]:
84+
payload = {
85+
"goal": goal,
86+
"max_plan_steps": max_plan_steps,
87+
"available_tools": TOOL_CATALOG,
88+
}
89+
90+
client = _get_client()
91+
try:
92+
completion = client.chat.completions.create(
93+
model=MODEL,
94+
temperature=0,
95+
timeout=LLM_TIMEOUT_SECONDS,
96+
response_format={"type": "json_object"},
97+
messages=[
98+
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
99+
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
100+
],
101+
)
102+
except (APITimeoutError, APIConnectionError) as exc:
103+
raise LLMTimeout("llm_timeout") from exc
104+
105+
text = completion.choices[0].message.content or "{}"
106+
try:
107+
return json.loads(text)
108+
except json.JSONDecodeError:
109+
return {"kind": "invalid", "raw": text}
110+
111+
112+
def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
113+
payload = {
114+
"goal": goal,
115+
"history": history,
116+
}
117+
118+
client = _get_client()
119+
try:
120+
completion = client.chat.completions.create(
121+
model=MODEL,
122+
temperature=0,
123+
timeout=LLM_TIMEOUT_SECONDS,
124+
messages=[
125+
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
126+
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
127+
],
128+
)
129+
except (APITimeoutError, APIConnectionError) as exc:
130+
raise LLMTimeout("llm_timeout") from exc
131+
132+
text = completion.choices[0].message.content or ""
133+
text = text.strip()
134+
if not text:
135+
raise LLMEmpty("llm_empty")
136+
return text

0 commit comments

Comments
 (0)