-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
224 lines (188 loc) · 7.66 KB
/
main.py
File metadata and controls
224 lines (188 loc) · 7.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import os
import json
from typing import List, Optional, Any
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, StreamingResponse
from pydantic import BaseModel
import anthropic
import uvicorn
from tools import TOOLS, ToolExecutor
# Initialize FastAPI app
app = FastAPI(title="Data Engineering Agent")
# Models for API
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
messages: List[Message]
stream: bool = False
class ChatResponse(BaseModel):
message: Message
# Get API key from environment
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
if not ANTHROPIC_API_KEY:
print("Warning: ANTHROPIC_API_KEY not set. Chat functionality will not work.")
# Initialize Anthropic client
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) if ANTHROPIC_API_KEY else None
# Load system prompt from file
def load_system_prompt() -> str:
"""Load the system prompt from system_prompt.md"""
prompt_path = os.path.join(os.path.dirname(__file__), "system_prompt.md")
with open(prompt_path, "r") as f:
return f.read().strip()
SYSTEM_PROMPT = load_system_prompt()
@app.get("/", response_class=HTMLResponse)
async def root():
"""Serve the chat interface"""
with open("static/index.html", "r") as f:
return f.read()
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "healthy"}
@app.post("/api/chat")
async def chat(request: ChatRequest):
"""Handle chat requests with tool support"""
if not client:
raise HTTPException(status_code=500, detail="API key not configured")
try:
# Convert messages to Anthropic format
anthropic_messages = [
{"role": msg.role, "content": msg.content}
for msg in request.messages
]
if request.stream:
# Streaming response with tool use
async def generate():
current_messages = anthropic_messages.copy()
while True:
# Call Claude with tools
response = client.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=8192,
system=SYSTEM_PROMPT,
messages=current_messages,
tools=TOOLS
)
# Process response content
for block in response.content:
if block.type == "text":
# Stream text content
yield f"data: {json.dumps({'content': block.text})}\n\n"
elif block.type == "tool_use":
# Notify that tool is being used
tool_use_msg = {
'tool_use': {
'name': block.name,
'input': block.input
}
}
yield f"data: {json.dumps(tool_use_msg)}\n\n"
# Execute the tool
tool_result = ToolExecutor.execute(block.name, block.input)
# Send tool result status
tool_result_msg = {
'tool_result': {
'name': block.name,
'success': tool_result.get('success', False)
}
}
yield f"data: {json.dumps(tool_result_msg)}\n\n"
# Check if we need to continue (if there were tool uses)
tool_uses = [block for block in response.content if block.type == "tool_use"]
if not tool_uses:
# No tool uses, we're done
break
# Add assistant response with tool uses to messages
current_messages.append({
"role": "assistant",
"content": response.content
})
# Add tool results to messages
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = ToolExecutor.execute(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result)
})
current_messages.append({
"role": "user",
"content": tool_results
})
# Send completion signal
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
else:
# Non-streaming response with tool use
current_messages = anthropic_messages.copy()
while True:
response = client.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=8192,
system=SYSTEM_PROMPT,
messages=current_messages,
tools=TOOLS
)
# Check if there are tool uses
tool_uses = [block for block in response.content if block.type == "tool_use"]
if not tool_uses:
# No tool uses, return the text response
text_content = " ".join([
block.text for block in response.content
if block.type == "text"
])
return ChatResponse(
message=Message(
role="assistant",
content=text_content
)
)
# Execute tools and continue conversation
current_messages.append({
"role": "assistant",
"content": response.content
})
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = ToolExecutor.execute(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result)
})
current_messages.append({
"role": "user",
"content": tool_results
})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
def main():
port = int(os.getenv("PORT", "50051"))
hostname = os.getenv("TOWER__HOSTNAME", "(local)")
print(f"Starting Data Engineering Agent on {hostname}")
print(f"Server running on http://0.0.0.0:{port}")
try:
uvicorn.run(
app,
host="0.0.0.0",
port=port,
log_level="info",
use_colors=False
)
except Exception as e:
print(f"Failed to start server: {str(e)}")
raise
finally:
print("FastAPI application shutting down...")
if __name__ == "__main__":
main()