Skip to content

Commit 4580f8b

Browse files
committed
feat(core): implement taskiq scheduler and slack app home dashboard
Details: - Configured TaskiqScheduler with RedisScheduleSource in broker. - Created check_scheduled_posts cron task to process due drafts automatically. - Injected AsyncSession into Taskiq dependencies. - Transformed Slack App Home into a dynamic dashboard displaying recent drafts. - Added scheduler and all targets to Makefile.
1 parent bcc1b12 commit 4580f8b

File tree

8 files changed

+189
-54
lines changed

8 files changed

+189
-54
lines changed

Makefile

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1-
.PHONY: worker api
1+
.PHONY: worker api scheduler all
22

33
worker:
44
poetry run taskiq worker backend.workers.broker:broker backend.workers.tasks
55

6+
scheduler:
7+
poetry run taskiq scheduler backend.workers.broker:scheduler
8+
69
api:
7-
poetry run uvicorn backend.api.main:app --host 127.0.0.1 --port 8001 --reload
10+
poetry run uvicorn backend.api.main:app --host 127.0.0.1 --port 8001 --reload
11+
12+
all:
13+
make -j 3 api worker scheduler

backend/api/routes/feedback.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33

44
import httpx
55
import structlog
6-
from fastapi import APIRouter, HTTPException, Request, Response
6+
from fastapi import APIRouter, Depends, HTTPException, Request, Response
7+
from sqlalchemy.ext.asyncio import AsyncSession
78

89
from backend.config.lexicon import SLACK_UI
910
from backend.config.settings import settings
11+
from backend.repositories.draft_repository import DraftRepository
12+
from backend.workers.dependencies import get_db_session
1013
from backend.workers.tasks.generate_draft import generate_draft_task
1114
from backend.workers.tasks.ingest_guideline import ingest_guideline_task
1215
from backend.workers.tasks.publish_post import publish_post_task
@@ -321,29 +324,35 @@ async def slack_interactions(request: Request):
321324

322325

323326
@router.post("/events")
324-
async def slack_events(request: Request):
327+
async def slack_events(
328+
request: Request,
329+
session: AsyncSession = Depends(get_db_session), # noqa: B008
330+
):
325331
"""Обробка Events API (наприклад, відкриття вкладки Home)."""
326332
data = await request.json()
327333

328-
# 1. Підтвердження URL для Slack (виконується один раз при налаштуванні)
329334
if data.get("type") == "url_verification":
330335
return {"challenge": data.get("challenge")}
331336

332337
event = data.get("event", {})
333338
user_id = event.get("user")
334339

335-
# 2. Коли користувач відкриває вкладку Home — малюємо йому дашборд
336340
if event.get("type") == "app_home_opened":
341+
# 1. Витягуємо останні 10 драфтів
342+
repo = DraftRepository(session)
343+
recent_drafts = await repo.get_recent_drafts(limit=10)
344+
345+
# 2. Рендеримо дашборд
337346
slack_token = (
338347
settings.SLACK_BOT_TOKEN.get_secret_value()
339-
if settings.SLACK_BOT_TOKEN
340-
else ""
348+
if hasattr(settings.SLACK_BOT_TOKEN, "get_secret_value")
349+
else settings.SLACK_BOT_TOKEN
341350
)
342351
async with httpx.AsyncClient() as client:
343352
await client.post(
344353
"https://slack.com/api/views.publish",
345354
headers={"Authorization": f"Bearer {slack_token}"},
346-
json={"user_id": user_id, "view": build_app_home()},
355+
json={"user_id": user_id, "view": build_app_home(drafts=recent_drafts)},
347356
)
348357

349358
return Response(status_code=200)

backend/config/lexicon.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@
4646
"home_description": "Це твій центр керування медичним контентом. Натисни кнопку нижче, щоб почати.",
4747
"home_btn_create": "✨ Створити новий пост",
4848
"home_btn_upload": "📚 Завантажити гайдлайн",
49+
"home_drafts_header": "🗓 Останні драфти",
50+
"home_drafts_empty": "_Поки що немає жодного драфту._",
51+
"home_draft_card_text": "*{topic}*\nПлатформа: *{platform}* | Статус: {status_emoji} `{status}`",
52+
"home_draft_open_btn": "📝 Відкрити",
4953
# --- Upload Modal ---
5054
"upload_modal_title": "База знань",
5155
"upload_modal_submit": "Завантажити",

backend/workers/broker.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,33 @@
11
from typing import Any
22

3-
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
3+
from taskiq import TaskiqScheduler
4+
from taskiq.schedule_sources import LabelScheduleSource
5+
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend, RedisScheduleSource
46

5-
# from .scheduled_post import scheduled_post_task
67
from backend.config.settings import settings
78
from backend.workers.middlewares.logging import StructlogMiddleware
89
from backend.workers.middlewares.metrics import PrometheusMetricsMiddleware
910
from backend.workers.middlewares.retry import RetryTrackerMiddleware
1011

11-
# from .scheduled_post import scheduled_post_task
1212
redis_url = f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/0"
1313

14-
# Ініціалізуємо брокер з явно вказаною чергою (ізоляція від інших проєктів)
14+
# Ініціалізуємо брокер з явно вказаною чергою
1515
broker = ListQueueBroker(redis_url, queue_name="seratonin_tasks")
1616

17-
# Додаємо явну типізацію для усунення помилок Pylance
1817
result_backend: RedisAsyncResultBackend[Any] = RedisAsyncResultBackend(
1918
redis_url, result_ex_time=3600
2019
)
2120
broker.with_result_backend(result_backend)
2221

23-
# Підключення Middlewares у правильному порядку
2422
broker.add_middlewares(
2523
StructlogMiddleware(),
2624
RetryTrackerMiddleware(),
2725
PrometheusMetricsMiddleware(),
2826
)
27+
28+
# --- НОВЕ: Ініціалізація планувальника ---
29+
redis_source = RedisScheduleSource(redis_url)
30+
scheduler = TaskiqScheduler(
31+
broker=broker,
32+
sources=[LabelScheduleSource(broker), redis_source],
33+
)

backend/workers/dependencies.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
1-
from typing import Annotated
1+
from typing import Annotated, AsyncGenerator
22

3+
from sqlalchemy.ext.asyncio import AsyncSession
34
from taskiq import TaskiqDepends
45

6+
from backend.api.dependencies import async_session_maker
57
from backend.integrations.llm.router import LLMRouter
68
from backend.services.content_generator import ContentGenerator
79
from backend.services.fact_checker import FactChecker
810
from backend.services.style_matcher import StyleMatcher
911

1012

13+
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
14+
"""Генерує асинхронну сесію БД для FastAPI та Taskiq."""
15+
async with async_session_maker() as session:
16+
yield session
17+
18+
1119
def get_llm_router() -> LLMRouter:
1220
"""Ініціалізує та повертає роутер моделей."""
1321
return LLMRouter()

backend/workers/tasks/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
from .generate_draft import generate_draft_task
22
from .ingest_guideline import ingest_guideline_task
33
from .publish_post import publish_post_task
4-
5-
# from .scheduled_post import scheduled_post_task
4+
from .scheduled_post import check_scheduled_posts_task
65
from .vectorize_post import vectorize_published_post_task
76

87
__all__ = [
98
"generate_draft_task",
109
"publish_post_task",
11-
# "scheduled_post_task",
10+
"check_scheduled_posts_task",
1211
"vectorize_published_post_task",
1312
"ingest_guideline_task",
1413
]
Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,53 @@
1-
# backend/workers/tasks/scheduled_post.py
1+
import structlog
2+
from sqlalchemy.ext.asyncio import AsyncSession
3+
from taskiq import TaskiqDepends
4+
5+
from backend.models.enums import DraftStatus
6+
from backend.models.schemas import DraftUpdate
7+
from backend.repositories.draft_repository import DraftRepository
8+
from backend.workers.broker import broker
9+
from backend.workers.dependencies import get_db_session
10+
from backend.workers.tasks.publish_post import publish_post_task
11+
12+
logger = structlog.get_logger()
13+
14+
15+
@broker.task(task_name="check_scheduled_posts", schedule=[{"cron": "* * * * *"}])
16+
async def check_scheduled_posts_task(
17+
session: AsyncSession = TaskiqDepends(get_db_session), # noqa: B008
18+
) -> None:
19+
"""
20+
Періодична задача, яка перевіряє БД на наявність постів,
21+
час публікації яких настав.
22+
"""
23+
repo = DraftRepository(session)
24+
25+
# Витягуємо всі пости, де status == 'scheduled' і scheduled_at <= now
26+
due_drafts = await repo.get_due_scheduled_drafts()
27+
28+
if not due_drafts:
29+
return
30+
31+
logger.info("found_scheduled_drafts", count=len(due_drafts))
32+
33+
for draft in due_drafts:
34+
try:
35+
# 1. Відправляємо в чергу на публікацію
36+
await publish_post_task.kiq( # type: ignore[call-overload]
37+
post_id=str(draft.id),
38+
platform=draft.platform,
39+
content=draft.content or "",
40+
)
41+
42+
# Оновлення статусу в БД
43+
await repo.update(draft.id, DraftUpdate(status=DraftStatus.PUBLISHED))
44+
logger.info(
45+
"scheduled_draft_sent_to_publish",
46+
draft_id=draft.id,
47+
platform=draft.platform,
48+
)
49+
50+
except Exception as e:
51+
logger.error(
52+
"failed_to_process_scheduled_draft", draft_id=draft.id, error=str(e)
53+
)

slack_app/utils/block_builder.py

Lines changed: 86 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Any
22

33
from backend.config.lexicon import SLACK_UI
4+
from backend.models.db_models import Draft
45

56

67
def build_draft_card(
@@ -17,7 +18,9 @@ def build_draft_card(
1718
f"{SLACK_UI['draft_ready_header'].format(topic=topic)} | 📢 {platform.upper()}"
1819
)
1920
if not is_valid:
20-
header_text = SLACK_UI["validation_failed_header"].format(platform=platform.upper())
21+
header_text = SLACK_UI["validation_failed_header"].format(
22+
platform=platform.upper()
23+
)
2124

2225
blocks: list[dict[str, Any]] = [
2326
{
@@ -269,49 +272,98 @@ def build_generation_modal(channel_id: str) -> dict[str, Any]:
269272
}
270273

271274

272-
def build_app_home() -> dict[str, Any]:
273-
return {
274-
"type": "home",
275-
"blocks": [
276-
{
277-
"type": "header",
278-
"text": {
279-
"type": "plain_text",
280-
"text": SLACK_UI["home_welcome"],
281-
"emoji": True,
282-
},
275+
def build_app_home(drafts: list[Draft] | None = None) -> dict[str, Any]:
276+
if drafts is None:
277+
drafts = []
278+
279+
blocks: list[dict[str, Any]] = [
280+
{
281+
"type": "header",
282+
"text": {
283+
"type": "plain_text",
284+
"text": SLACK_UI["home_welcome"],
285+
"emoji": True,
283286
},
287+
},
288+
{
289+
"type": "section",
290+
"text": {"type": "mrkdwn", "text": SLACK_UI["home_description"]},
291+
},
292+
{"type": "divider"},
293+
{
294+
"type": "actions",
295+
"elements": [
296+
{
297+
"type": "button",
298+
"text": {
299+
"type": "plain_text",
300+
"text": SLACK_UI["home_btn_create"],
301+
"emoji": True,
302+
},
303+
"style": "primary",
304+
"action_id": "action_open_generation_modal",
305+
},
306+
{
307+
"type": "button",
308+
"text": {
309+
"type": "plain_text",
310+
"text": SLACK_UI["home_btn_upload"],
311+
"emoji": True,
312+
},
313+
"action_id": "action_open_upload_modal",
314+
},
315+
],
316+
},
317+
{"type": "divider"},
318+
{
319+
"type": "header",
320+
"text": {"type": "plain_text", "text": SLACK_UI["home_drafts_header"], "emoji": True},
321+
},
322+
]
323+
324+
if not drafts:
325+
blocks.append(
284326
{
285327
"type": "section",
286-
"text": {"type": "mrkdwn", "text": SLACK_UI["home_description"]},
287-
},
288-
{"type": "divider"},
289-
{
290-
"type": "actions",
291-
"elements": [
292-
{
293-
"type": "button",
294-
"text": {
295-
"type": "plain_text",
296-
"text": SLACK_UI["home_btn_create"],
297-
"emoji": True,
298-
},
299-
"style": "primary",
300-
"action_id": "action_open_generation_modal",
328+
"text": {"type": "mrkdwn", "text": SLACK_UI["home_drafts_empty"]},
329+
}
330+
)
331+
else:
332+
for d in drafts:
333+
status_emoji = "⏳"
334+
if d.status == "published":
335+
status_emoji = "✅"
336+
elif d.status == "scheduled":
337+
status_emoji = "🕒"
338+
elif d.status == "failed":
339+
status_emoji = "❌"
340+
341+
blocks.append(
342+
{
343+
"type": "section",
344+
"text": {
345+
"type": "mrkdwn",
346+
"text": SLACK_UI["home_draft_card_text"].format(
347+
topic=d.topic,
348+
platform=d.platform.upper(),
349+
status_emoji=status_emoji,
350+
status=d.status,
351+
),
301352
},
302-
{
353+
"accessory": {
303354
"type": "button",
304355
"text": {
305356
"type": "plain_text",
306-
"text": SLACK_UI["home_btn_upload"],
357+
"text": SLACK_UI["home_draft_open_btn"],
307358
"emoji": True,
308359
},
309-
"action_id": "action_open_upload_modal",
360+
"value": f"{d.id}|{d.platform}",
361+
"action_id": "action_open_draft_details",
310362
},
311-
],
312-
},
313-
],
314-
}
363+
}
364+
)
365+
366+
return {"type": "home", "blocks": blocks}
315367

316368

317369
def build_upload_modal() -> dict[str, Any]:

0 commit comments

Comments
 (0)