forked from satyamsoni2211/vscode_debugging_configurations
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfastapi_demo.py
More file actions
61 lines (51 loc) · 1.71 KB
/
fastapi_demo.py
File metadata and controls
61 lines (51 loc) · 1.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import json
import asyncio
from uvicorn import run, Server, Config
from fastapi import FastAPI, Query, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.on_event("startup")
async def startup():
queue = asyncio.Queue()
app.state.queue = queue
event = asyncio.Event()
app.state.event = event
tasks = asyncio.create_task(worker(queue, event), name="worker")
app.state.task = tasks
async def worker(queue: asyncio.Queue, event: asyncio.Event):
try:
task = asyncio.tasks.current_task()
name = task.get_name()
print(f"started {name} ... ")
while not event.is_set():
try:
elem = queue.get_nowait()
print(f"{name} got {elem=}")
queue.task_done()
await asyncio.sleep(1)
except asyncio.QueueEmpty:
await asyncio.sleep(1)
print("quitting !")
except asyncio.CancelledError as e:
print(f"exception occurred ! {e}")
raise e
@app.on_event("shutdown")
async def shutdown():
event = app.state.event
event.set()
task = app.state.task
t_ = task.cancel(msg=f"Cancelling {task.get_name()}...")
print(f"cancelled {task.get_name()} {t_=}")
print("waiting for tasks to finish ...")
exceptions = await asyncio.gather(task, return_exceptions=True)
print(exceptions)
@app.get("/")
async def home(request: Request):
await request.app.state.queue.put("from home ")
return {"message": "Hello World"}
if __name__ == "__main__":
# run("demo:app", host="0.0.0.0", port=8000, reload=True)
config = Config(app='demo:app', host="0.0.0.0"
)
server = Server(config=config)
server.run()