Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 117 additions & 49 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,48 +1,44 @@
# Taskiq + FastAPI

This repository has a code to integrate FastAPI with taskiq easily.
This repository provides code to integrate FastAPI with Taskiq easily.

Taskiq and FastAPI both have dependencies and this library makes it possible to depend on
`fastapi.Request` or `starlette.requests.HTTPConnection` in taskiq tasks.
Taskiq and FastAPI both use dependency injection, but they function differently. This library bridges the gap, allowing you to depend on `fastapi.Request` or `starlette.requests.HTTPConnection` inside your Taskiq tasks.

With this library you can easily re-use your fastapi dependencies in taskiq functions.
With this library, you can easily re-use your FastAPI dependencies (like database pools or config loaders) inside your Taskiq background functions.

## How does it work?

It adds startup functions to broker so it imports your fastapi application
and creates a single worker-wide Request and HTTPConnection objects that you depend on.
### 1. Process Separation
Taskiq tasks usually run in a separate **Worker process**, not inside your FastAPI web server process. (_It does not run within event loop_)

THIS REQUEST IS NOT RELATED TO THE ACTUAL REQUESTS IN FASTAPI!
This request won't have actual data about the request you were handling while sending task.
### 2. Context Bridging
When the Worker starts, this library initializes your FastAPI application in the background.

## Usage
### 3. Dependency Injection
It creates a **dummy Request object** within the Worker. This allows functions that need FastAPI request context (like accessing `app.state`) to work identically in both the Web App and the Background Worker.

Here we have an example of function that is being used by both taskiq's task and
fastapi's handler function.
> **Note:** The injected Request object in a task is **NOT** the original HTTP request from the user. It is a simulated request context solely for accessing application state.

I have a script called `test_script.py` so my app can be found at `test_script:app`.
We use strings to resolve application to bypass circular imports.
---

Also, as you can see, we use `TaskiqDepends` for Request. That's because
taskiq dependency resolver must know that this type must be injected. FastAPI disallow
Depends for Request type. That's why we use `TaskiqDepends`.
## Usage Example

### File: `test_script.py`

```python
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Depends as FastAPIDepends
from pydantic import BaseModel
from redis.asyncio import ConnectionPool, Redis
from fastapi import Depends as FastAPIDepends
from taskiq import TaskiqDepends
from taskiq import TaskiqDepends, ZeroMQBroker
import taskiq_fastapi
from taskiq import ZeroMQBroker

broker = ZeroMQBroker()

app = FastAPI()


@app.on_event("startup")
async def app_startup():
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Creating redis pool")
app.state.redis_pool = ConnectionPool.from_url("redis://localhost")
#####################
# IMPORTANT NOTE #
#####################
Expand All @@ -51,38 +47,32 @@ async def app_startup():
# create an infinite recursion. Because in worker processes
# fastapi startup will be called.
if not broker.is_worker_process:
print("Starting broker")
print("Starting broker client")
await broker.startup()
print("Creating redis pool")
app.state.redis_pool = ConnectionPool.from_url("redis://localhost")

yield # waits for shutdown

@app.on_event("shutdown")
async def app_shutdown():
#####################
# IMPORTANT NOTE #
#####################
# If you won't check that this is not
# a worker process, you'll
# create an infinite recursion. Because in worker processes
# fastapi startup will be called.
# Same as above
if not broker.is_worker_process:
print("Shutting down broker")
print("Shutting down broker client")
await broker.shutdown()

print("Stopping redis pool")
await app.state.redis_pool.disconnect()

app = FastAPI(lifespan=lifespan)

# Here we call our magic function.
taskiq_fastapi.init(broker, "test_script:app")


# We use TaskiqDepends here, because if we use FastAPIDepends fastapi
# initialization will fail.
def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool:
return request.app.state.redis_pool


@broker.task
async def my_redis_task(
key: str,
Expand All @@ -96,43 +86,121 @@ async def my_redis_task(
await redis.set(key, val)
print("Value set.")


class MyVal(BaseModel):
key: str
val: str


@app.post("/val")
async def setval_endpoint(val: MyVal) -> None:
await my_redis_task.kiq(
key=val.key,
val=val.val,
)
await my_redis_task.kiq(key=val.key, val=val.val)
print("Task sent")


@app.get("/val")
async def getval_endpoint(
key: str,
pool: ConnectionPool = FastAPIDepends(get_redis_pool),
) -> str:
async with Redis(connection_pool=pool, decode_responses=True) as redis:
return await redis.get(key)

```

## Manually update dependency context
---

## Key Takeaways

### `if not broker.is_worker_process`

- **True for FastAPI server (uvicorn)** → lifespan starts broker client so tasks can be sent.
- **False for Taskiq worker** → worker manages its own broker connection; lifespan still initializes dependencies.

Prevents double-starting the broker.

When using `InMemoryBroker` it may be required to update the dependency context manually. This may also be useful when setting up tests.
### `TaskiqDepends` vs `FastAPIDepends`

```py
| Purpose | Use |
|--------|------|
| Inside Taskiq tasks | `TaskiqDepends` |
| Inside HTTP routes | `FastAPIDepends` |

---

## Manual Dependency Context Update (InMemoryBroker)

When using `InMemoryBroker` (often used for unit testing) it may be required to update the dependency context manually, as there is no separate worker process to trigger the initialization.

```python
import taskiq_fastapi
from taskiq import InMemoryBroker
from fastapi import FastAPI

broker = InMemoryBroker()

app = FastAPI()

taskiq_fastapi.init(broker, "test_script:app")
taskiq_fastapi.populate_dependency_context(broker, app)
```

---

## Deployment with Docker (Single Artifact Pattern)

The best way to deploy this system is to use the Single Artifact pattern. You build one Docker image that contains your entire codebase, and you run it with different commands to start the API or the Worker.

### 1. Dockerfile

This Dockerfile copies your code and installs dependencies. It does not specify a `CMD` because we will set that in `docker-compose.yml`.

```dockerfile
FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .
```

### 2. docker-compose.yml

Notice how api and worker use the same build: . context but run different commands.

```yaml
version: "3.8"

services:
api:
build: .
container_name: fastapi_app
command: uvicorn test_script:app --host 0.0.0.0 --port 8000
ports:
- "8000:8000"
environment:
- NATS_URL=nats://nats:4222
depends_on:
- nats

worker:
build: .
container_name: taskiq_worker
command: taskiq worker test_script:broker
environment:
- NATS_URL=nats://nats:4222
depends_on:
- nats

nats:
image: nats:latest
ports:
- "4222:4222"
```

### Run the stack

```bash
docker-compose up --build
```

You may also run it individually using `docker run` or integrate into the kubernetes environment.

---