-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
151 lines (118 loc) · 4.5 KB
/
server.py
File metadata and controls
151 lines (118 loc) · 4.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import argparse
import asyncio
import logging
import threading
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.staticfiles import StaticFiles
from core import AsyncEngine
from use_cases.travel import TravelUseCase
from use_cases.music import MusicUseCase
from use_cases.multi import MultiUseCase
USE_CASES = {
"travel": TravelUseCase,
"music": MusicUseCase,
"multi": MultiUseCase,
}
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s.%(msecs)03d [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("async_tools")
log.setLevel(logging.DEBUG)
# ---------------------------------------------------------------------------
# Engine — created in __main__ after CLI args are parsed.
# Exposed at module level so tests and tooling can import it directly.
# ---------------------------------------------------------------------------
engine: AsyncEngine = None # type: ignore[assignment]
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
engine._sse_loop = asyncio.get_event_loop()
yield
app = FastAPI(lifespan=lifespan)
@app.get("/config")
def config():
"""Use-case metadata consumed by the frontend at page load."""
return {
"display_name": engine.use_case.display_name,
"placeholder": engine.use_case.input_placeholder,
}
@app.get("/stream")
async def stream():
q: asyncio.Queue = asyncio.Queue()
engine._sse_clients.append(q)
async def event_generator():
try:
while True:
data = await q.get()
yield f"data: {data}\n\n"
except asyncio.CancelledError:
pass
finally:
engine._sse_clients.remove(q)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.post("/chat")
def chat(body: dict):
user_text = (body.get("message") or "").strip()
if not user_text:
return {"ok": False, "error": "empty message"}
log.info("USER MSG %r", user_text[:100])
with engine._lock:
log.debug("LOCK acquired by /chat")
engine.messages.append({"role": "user", "content": user_text})
response = engine.call_openai()
bot_text = engine.handle_response(response)
log.debug("LOCK releasing from /chat")
engine.push_event("assistant", {"content": bot_text})
return {"ok": True}
@app.post("/reset")
def reset():
"""Clear conversation history (keep system prompt)."""
engine.reset()
engine.push_event("reset", {})
return {"ok": True}
# Serve static files last so API routes take precedence
app.mount("/", StaticFiles(directory="static", html=True), name="static")
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Async Tools Demo server")
parser.add_argument(
"--use-case",
choices=list(USE_CASES),
default="travel",
help="Which use case to run (default: travel)",
)
parser.add_argument(
"--injection-mode",
choices=["user", "system", "tool"],
default="tool",
help=(
"How completed background job results are injected into the LLM context.\n"
" user — appended as a user-role message\n"
" system — appended as a system-role message\n"
" tool — injected as a synthetic assistant tool_call + tool result pair (default)"
),
)
args = parser.parse_args()
use_case = USE_CASES[args.use_case]
engine = AsyncEngine(use_case, injection_mode=args.injection_mode)
log.info("=" * 50)
log.info("Starting server use_case=%s injection_mode=%s", args.use_case, args.injection_mode)
log.info("=" * 50)
print(f"Starting server: use_case={args.use_case!r} injection_mode={args.injection_mode!r}")
uvicorn.run(app, host="0.0.0.0", port=7862, log_level="warning")