diff --git a/.env.example b/.env.example index 97c0b6ab..991d1d85 100644 --- a/.env.example +++ b/.env.example @@ -11,6 +11,16 @@ OPENROUTER_API_KEY= OPENROUTER_API_BASE=https://openrouter.ai/api/v1 OPENROUTER_APP_URL= OPENROUTER_APP_NAME=newsletter-maker +AI_CLASSIFICATION_MODEL=meta-llama/llama-3.1-70b-instruct +AI_RELEVANCE_MODEL=qwen/qwen-2.5-72b-instruct +AI_SUMMARIZATION_MODEL=google/gemma-3-27b-it +AI_CLASSIFICATION_REVIEW_THRESHOLD=0.6 +AI_RELEVANCE_LOW_THRESHOLD=0.5 +AI_RELEVANCE_HIGH_THRESHOLD=0.85 +AI_RELEVANCE_REVIEW_THRESHOLD=0.4 +AI_RELEVANCE_SUMMARIZE_THRESHOLD=0.7 +AI_MAX_NODE_RETRIES=2 +AI_REQUEST_TIMEOUT_SECONDS=60 EMBEDDING_PROVIDER=sentence-transformers EMBEDDING_MODEL=sentence-transformers/all-MiniLM-L6-v2 EMBEDDING_TRUST_REMOTE_CODE=false diff --git a/.vscode/settings.json b/.vscode/settings.json index e1b74788..55016634 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -18,6 +18,7 @@ "noinput", "nomic", "OLLAMA", + "ormsgpack", "PRAW", "psycopg", "pylint", @@ -28,6 +29,8 @@ "readyz", "Referer", "upserted", - "upvote" + "upvote", + "uritemplate", + "xxhash" ] } diff --git a/celerybeat-schedule b/celerybeat-schedule index 03a92990..1089d88f 100644 Binary files a/celerybeat-schedule and b/celerybeat-schedule differ diff --git a/core/llm.py b/core/llm.py new file mode 100644 index 00000000..2bf545f3 --- /dev/null +++ b/core/llm.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import json +import re +import time +from dataclasses import dataclass +from typing import Any + +import httpx +from django.conf import settings + +JSON_OBJECT_PATTERN = re.compile(r"\{.*\}", re.DOTALL) + + +@dataclass(slots=True) +class OpenRouterJSONResponse: + payload: dict[str, Any] + model: str + latency_ms: int + + +def openrouter_chat_json(*, model: str, system_prompt: str, user_prompt: str) -> OpenRouterJSONResponse: + if not settings.OPENROUTER_API_KEY: + raise RuntimeError("OPENROUTER_API_KEY must be configured for OpenRouter chat completions.") + + headers = { + "Authorization": f"Bearer {settings.OPENROUTER_API_KEY}", + "Content-Type": "application/json", + } + if settings.OPENROUTER_APP_URL: + headers["HTTP-Referer"] = settings.OPENROUTER_APP_URL + if settings.OPENROUTER_APP_NAME: + headers["X-OpenRouter-Title"] = settings.OPENROUTER_APP_NAME + + started_at = time.perf_counter() + response = httpx.post( + f"{settings.OPENROUTER_API_BASE.rstrip('/')}/chat/completions", + headers=headers, + json={ + "model": model, + "temperature": 0, + "response_format": {"type": "json_object"}, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + }, + timeout=settings.AI_REQUEST_TIMEOUT_SECONDS, + ) + latency_ms = int((time.perf_counter() - started_at) * 1000) + response.raise_for_status() + + message_content = response.json()["choices"][0]["message"]["content"] + return OpenRouterJSONResponse( + payload=_extract_json_object(message_content), + model=model, + latency_ms=latency_ms, + ) + + +def _extract_json_object(message_content: str) -> dict[str, Any]: + try: + payload = json.loads(message_content) + except json.JSONDecodeError: + match = JSON_OBJECT_PATTERN.search(message_content) + if not match: + raise ValueError("Model response did not contain a JSON object.") + payload = json.loads(match.group(0)) + if not isinstance(payload, dict): + raise ValueError("Model response JSON must be an object.") + return payload diff --git a/core/pipeline.py b/core/pipeline.py new file mode 100644 index 00000000..521a94db --- /dev/null +++ b/core/pipeline.py @@ -0,0 +1,388 @@ +from __future__ import annotations + +import logging +import re +from functools import lru_cache +from typing import Any, Literal, TypedDict + +from django.conf import settings +from langgraph.graph import END, StateGraph + +from core.embeddings import build_content_embedding_text, embed_text, get_reference_similarity +from core.llm import openrouter_chat_json +from core.models import Content, ReviewQueue, ReviewReason, SkillResult, SkillStatus + +logger = logging.getLogger(__name__) + +CLASSIFICATION_SKILL_NAME = "content_classification" +RELEVANCE_SKILL_NAME = "relevance_scoring" +SUMMARIZATION_SKILL_NAME = "summarization" + +CONTENT_TYPES = ( + "technical_article", + "tutorial", + "opinion", + "product_announcement", + "event", + "release_notes", + "other", +) + + +class PipelineState(TypedDict, total=False): + content_id: int + tenant_id: int + classification: dict[str, Any] | None + relevance: dict[str, Any] | None + summary: dict[str, Any] | None + status: str + + +@lru_cache(maxsize=1) +def get_ingestion_graph(): + graph = StateGraph(PipelineState) + graph.add_node("classify", classify_node) + graph.add_node("score_relevance", relevance_node) + graph.add_node("summarize", summarize_node) + graph.add_node("archive", archive_node) + graph.add_node("queue_review", queue_review_node) + graph.set_entry_point("classify") + graph.add_edge("classify", "score_relevance") + graph.add_conditional_edges( + "score_relevance", + route_by_relevance, + { + "relevant": "summarize", + "borderline": "queue_review", + "irrelevant": "archive", + }, + ) + graph.add_edge("summarize", END) + graph.add_edge("archive", END) + graph.add_edge("queue_review", END) + return graph.compile() + + +def process_content_pipeline(content_id: int) -> PipelineState: + content = Content.objects.select_related("tenant").get(pk=content_id) + initial_state: PipelineState = { + "content_id": content.id, + "tenant_id": content.tenant_id, + "status": "processing", + } + return get_ingestion_graph().invoke(initial_state) + + +def classify_node(state: PipelineState) -> PipelineState: + content = _get_content(state) + classification = _execute_with_retries(CLASSIFICATION_SKILL_NAME, lambda: run_content_classification(content)) + content.content_type = classification["content_type"] + content.save(update_fields=["content_type"]) + _create_skill_result( + content, + skill_name=CLASSIFICATION_SKILL_NAME, + status=SkillStatus.COMPLETED, + result_data=classification, + model_used=classification["model_used"], + latency_ms=classification["latency_ms"], + confidence=classification["confidence"], + ) + if classification["confidence"] < settings.AI_CLASSIFICATION_REVIEW_THRESHOLD: + _upsert_review_queue_item( + content, + reason=ReviewReason.LOW_CONFIDENCE_CLASSIFICATION, + confidence=float(classification["confidence"]), + ) + return {"classification": classification} + + +def relevance_node(state: PipelineState) -> PipelineState: + content = _get_content(state) + relevance = _execute_with_retries(RELEVANCE_SKILL_NAME, lambda: run_relevance_scoring(content)) + content.relevance_score = relevance["relevance_score"] + content.is_active = True + content.save(update_fields=["relevance_score", "is_active"]) + _create_skill_result( + content, + skill_name=RELEVANCE_SKILL_NAME, + status=SkillStatus.COMPLETED, + result_data=relevance, + model_used=relevance["model_used"], + latency_ms=relevance["latency_ms"], + confidence=relevance["relevance_score"], + ) + return {"relevance": relevance} + + +def summarize_node(state: PipelineState) -> PipelineState: + content = _get_content(state) + summary = _execute_with_retries(SUMMARIZATION_SKILL_NAME, lambda: run_summarization(content)) + _create_skill_result( + content, + skill_name=SUMMARIZATION_SKILL_NAME, + status=SkillStatus.COMPLETED, + result_data=summary, + model_used=summary["model_used"], + latency_ms=summary["latency_ms"], + ) + return {"summary": summary, "status": "completed"} + + +def archive_node(state: PipelineState) -> PipelineState: + content = _get_content(state) + content.is_active = False + content.save(update_fields=["is_active"]) + return {"status": "archived"} + + +def queue_review_node(state: PipelineState) -> PipelineState: + content = _get_content(state) + relevance = state.get("relevance") or {} + _upsert_review_queue_item( + content, + reason=ReviewReason.BORDERLINE_RELEVANCE, + confidence=float(relevance.get("relevance_score", settings.AI_RELEVANCE_REVIEW_THRESHOLD)), + ) + content.is_active = True + content.save(update_fields=["is_active"]) + return {"status": "review"} + + +def route_by_relevance(state: PipelineState) -> Literal["relevant", "borderline", "irrelevant"]: + relevance = state.get("relevance") or {} + score = float(relevance.get("relevance_score", 0.0)) + if score >= settings.AI_RELEVANCE_SUMMARIZE_THRESHOLD: + return "relevant" + if score < settings.AI_RELEVANCE_REVIEW_THRESHOLD: + return "irrelevant" + return "borderline" + + +def run_content_classification(content: Content) -> dict[str, Any]: + if settings.OPENROUTER_API_KEY: + try: + response = openrouter_chat_json( + model=settings.AI_CLASSIFICATION_MODEL, + system_prompt=( + "You classify newsletter content into one of these categories: " + "technical_article, tutorial, opinion, product_announcement, event, release_notes, other. " + "Return JSON with content_type, confidence, and explanation." + ), + user_prompt=f"Title: {content.title}\nURL: {content.url}\n\nContent:\n{content.content_text[:5000]}", + ) + payload = response.payload + content_type = str(payload.get("content_type", "other")) + if content_type not in CONTENT_TYPES: + content_type = "other" + confidence = _clamp_score(payload.get("confidence", 0.5)) + return { + "content_type": content_type, + "confidence": confidence, + "explanation": str(payload.get("explanation", "LLM-based classification.")), + "model_used": response.model, + "latency_ms": response.latency_ms, + } + except Exception: + logger.exception( + "Classification model call failed; falling back to heuristic classifier", + extra={"content_id": content.id}, + ) + return _heuristic_classification(content) + + +def run_relevance_scoring(content: Content) -> dict[str, Any]: + vector = embed_text(build_content_embedding_text(content)) + similarity = float(get_reference_similarity(content.tenant_id, vector)) + if similarity >= settings.AI_RELEVANCE_HIGH_THRESHOLD or similarity < settings.AI_RELEVANCE_LOW_THRESHOLD: + return { + "relevance_score": similarity, + "explanation": f"Reference corpus similarity score is {similarity:.2f}; no LLM adjudication was required.", + "used_llm": False, + "model_used": f"embedding:{settings.EMBEDDING_MODEL}", + "latency_ms": 0, + } + + if settings.OPENROUTER_API_KEY: + try: + response = openrouter_chat_json( + model=settings.AI_RELEVANCE_MODEL, + system_prompt=( + "You score how relevant a candidate article is for a newsletter topic. " + "Return JSON with relevance_score between 0 and 1, explanation, and used_llm=true." + ), + user_prompt=( + f"Newsletter topic: {content.tenant.topic_description}\n" + f"Reference similarity score: {similarity:.3f}\n" + f"Title: {content.title}\n" + f"Content:\n{content.content_text[:5000]}" + ), + ) + payload = response.payload + return { + "relevance_score": _clamp_score(payload.get("relevance_score", similarity)), + "explanation": str(payload.get("explanation", "LLM-based relevance adjudication.")), + "used_llm": True, + "model_used": response.model, + "latency_ms": response.latency_ms, + } + except Exception: + logger.exception( + "Relevance model call failed; falling back to heuristic relevance", + extra={"content_id": content.id}, + ) + + return { + "relevance_score": similarity, + "explanation": ( + f"Borderline reference similarity of {similarity:.2f} against the tenant baseline for " + f"'{content.tenant.topic_description}'." + ), + "used_llm": False, + "model_used": f"embedding:{settings.EMBEDDING_MODEL}", + "latency_ms": 0, + } + + +def run_summarization(content: Content) -> dict[str, Any]: + if settings.OPENROUTER_API_KEY: + try: + response = openrouter_chat_json( + model=settings.AI_SUMMARIZATION_MODEL, + system_prompt=( + "You write concise newsletter-ready summaries. Return JSON with a single key named summary." + ), + user_prompt=( + f"Newsletter topic: {content.tenant.topic_description}\n" + f"Title: {content.title}\n" + f"Content:\n{content.content_text[:5000]}" + ), + ) + return { + "summary": _normalize_summary(str(response.payload.get("summary", "")), content), + "model_used": response.model, + "latency_ms": response.latency_ms, + } + except Exception: + logger.exception( + "Summarization model call failed; falling back to heuristic summary", + extra={"content_id": content.id}, + ) + return { + "summary": _heuristic_summary(content), + "model_used": "heuristic", + "latency_ms": 0, + } + + +def _execute_with_retries(skill_name: str, fn): + last_exc: Exception | None = None + for attempt in range(settings.AI_MAX_NODE_RETRIES + 1): + try: + return fn() + except Exception as exc: # pragma: no cover + last_exc = exc + logger.exception( + "Skill execution failed", + extra={"skill_name": skill_name, "attempt": attempt + 1}, + ) + assert last_exc is not None + raise last_exc + + +def _heuristic_classification(content: Content) -> dict[str, Any]: + text = f"{content.title}\n{content.content_text}".lower() + keyword_sets = { + "release_notes": ("release notes", "changelog", "version", "released"), + "tutorial": ("tutorial", "how to", "guide", "walkthrough", "step-by-step"), + "product_announcement": ("announcing", "launch", "launched", "available now", "introducing"), + "event": ("conference", "summit", "meetup", "webinar", "event"), + "opinion": ("opinion", "why i", "lessons learned", "thoughts", "editorial"), + "technical_article": ("architecture", "engineering", "platform", "infrastructure", "devops", "kubernetes"), + } + best_type = "other" + best_score = 0 + for content_type, keywords in keyword_sets.items(): + score = sum(text.count(keyword) for keyword in keywords) + if score > best_score: + best_type = content_type + best_score = score + confidence = 0.45 if best_type == "other" else min(0.95, 0.55 + (best_score * 0.1)) + return { + "content_type": best_type, + "confidence": confidence, + "explanation": "Keyword heuristic based on title and body text.", + "model_used": "heuristic", + "latency_ms": 0, + } + + +def _heuristic_summary(content: Content) -> str: + sentences = [segment.strip() for segment in re.split(r"(?<=[.!?])\s+", content.content_text.strip()) if segment.strip()] + if not sentences: + return f"{content.title}: no summary was available from the source content." + summary = " ".join(sentences[:2]) + if len(summary) > 400: + summary = summary[:397].rstrip() + "..." + return _normalize_summary(summary, content) + + +def _normalize_summary(summary: str, content: Content) -> str: + normalized = summary.strip() + if normalized: + return normalized + return f"{content.title}: summary generation returned no content." + + +def _clamp_score(value: Any) -> float: + try: + score = float(value) + except (TypeError, ValueError): + score = 0.0 + return max(0.0, min(1.0, score)) + + +def _get_content(state: PipelineState) -> Content: + return Content.objects.select_related("tenant").get(pk=state["content_id"]) + + +def _upsert_review_queue_item(content: Content, *, reason: ReviewReason, confidence: float) -> ReviewQueue: + existing = ReviewQueue.objects.filter(content=content, reason=reason, resolved=False).first() + if existing: + existing.confidence = confidence + existing.save(update_fields=["confidence"]) + return existing + return ReviewQueue.objects.create( + tenant=content.tenant, + content=content, + reason=reason, + confidence=confidence, + ) + + +def _create_skill_result( + content: Content, + *, + skill_name: str, + status: SkillStatus, + result_data: dict[str, Any] | None = None, + error_message: str = "", + model_used: str = "", + latency_ms: int | None = None, + confidence: float | None = None, +) -> SkillResult: + previous = SkillResult.objects.filter(content=content, skill_name=skill_name, superseded_by__isnull=True).first() + skill_result = SkillResult.objects.create( + content=content, + tenant=content.tenant, + skill_name=skill_name, + status=status, + result_data=result_data, + error_message=error_message, + model_used=model_used, + latency_ms=latency_ms, + confidence=confidence, + ) + if previous: + previous.superseded_by = skill_result + previous.save(update_fields=["superseded_by"]) + return skill_result diff --git a/core/static/core/favicon.ico b/core/static/core/favicon.ico index 02d0115b..6eabf355 100644 Binary files a/core/static/core/favicon.ico and b/core/static/core/favicon.ico differ diff --git a/core/static/core/logo.png b/core/static/core/logo.png new file mode 100644 index 00000000..159c9fdf Binary files /dev/null and b/core/static/core/logo.png differ diff --git a/core/static/core/logo.svg b/core/static/core/logo.svg deleted file mode 100644 index 0c20a100..00000000 --- a/core/static/core/logo.svg +++ /dev/null @@ -1 +0,0 @@ - diff --git a/core/tasks.py b/core/tasks.py index 6fb22e2a..5fc8e945 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -6,6 +6,7 @@ from core.embeddings import upsert_content_embedding from core.models import Content, IngestionRun, RunStatus, SourceConfig +from core.pipeline import process_content_pipeline from core.plugins import get_plugin_for_source_config logger = logging.getLogger(__name__) @@ -48,6 +49,11 @@ def run_all_ingestions(): return len(source_config_ids) +@shared_task(name="core.tasks.process_content") +def process_content(content_id: int): + return process_content_pipeline(content_id) + + def _ingest_source_config(source_config: SourceConfig) -> tuple[int, int]: plugin = get_plugin_for_source_config(source_config) fetched_items = plugin.fetch_new_content(source_config.last_fetched_at) @@ -66,6 +72,10 @@ def _ingest_source_config(source_config: SourceConfig) -> tuple[int, int]: content_text=item.content_text, ) upsert_content_embedding(content) + if settings.CELERY_TASK_ALWAYS_EAGER: + process_content(content.id) + else: + process_content.delay(content.id) ingested_count += 1 source_config.last_fetched_at = timezone.now() source_config.save(update_fields=["last_fetched_at"]) diff --git a/core/tests/test_pipeline.py b/core/tests/test_pipeline.py new file mode 100644 index 00000000..196ccbb1 --- /dev/null +++ b/core/tests/test_pipeline.py @@ -0,0 +1,174 @@ +from types import SimpleNamespace + +import pytest + +from core.models import Content, ReviewQueue, ReviewReason, SkillResult, Tenant +from core.pipeline import CLASSIFICATION_SKILL_NAME, RELEVANCE_SKILL_NAME, SUMMARIZATION_SKILL_NAME +from core.tasks import process_content + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def pipeline_context(django_user_model): + user = django_user_model.objects.create_user(username="pipeline-owner", password="testpass123") + tenant = Tenant.objects.create(name="Pipeline Tenant", user=user, topic_description="Platform engineering") + content = Content.objects.create( + tenant=tenant, + url="https://example.com/article", + title="Kubernetes Release Notes", + author="Editor", + source_plugin="rss", + published_date="2026-04-26T00:00:00Z", + content_text="This article covers a new Kubernetes release and what changed for platform teams.", + embedding_id="emb_123", + ) + return SimpleNamespace(user=user, tenant=tenant, content=content) + + +def test_process_content_runs_full_pipeline_for_relevant_content(pipeline_context, mocker): + mocker.patch( + "core.pipeline.run_content_classification", + return_value={ + "content_type": "release_notes", + "confidence": 0.9, + "explanation": "High confidence classification.", + "model_used": "heuristic", + "latency_ms": 0, + }, + ) + mocker.patch( + "core.pipeline.run_relevance_scoring", + return_value={ + "relevance_score": 0.92, + "explanation": "Very close to the tenant reference corpus.", + "used_llm": False, + "model_used": "embedding:test", + "latency_ms": 0, + }, + ) + mocker.patch( + "core.pipeline.run_summarization", + return_value={ + "summary": "A concise summary for the editor.", + "model_used": "heuristic", + "latency_ms": 0, + }, + ) + + result = process_content(pipeline_context.content.id) + + pipeline_context.content.refresh_from_db() + assert result["status"] == "completed" + assert pipeline_context.content.content_type == "release_notes" + assert pipeline_context.content.relevance_score == pytest.approx(0.92) + assert pipeline_context.content.is_active is True + assert SkillResult.objects.filter(content=pipeline_context.content, skill_name=CLASSIFICATION_SKILL_NAME).count() == 1 + assert SkillResult.objects.filter(content=pipeline_context.content, skill_name=RELEVANCE_SKILL_NAME).count() == 1 + assert SkillResult.objects.filter(content=pipeline_context.content, skill_name=SUMMARIZATION_SKILL_NAME).count() == 1 + assert ReviewQueue.objects.filter(content=pipeline_context.content).count() == 0 + + +def test_process_content_queues_borderline_items_for_review(pipeline_context, mocker): + mocker.patch( + "core.pipeline.run_content_classification", + return_value={ + "content_type": "technical_article", + "confidence": 0.9, + "explanation": "High confidence classification.", + "model_used": "heuristic", + "latency_ms": 0, + }, + ) + mocker.patch( + "core.pipeline.run_relevance_scoring", + return_value={ + "relevance_score": 0.55, + "explanation": "Borderline similarity to the tenant baseline.", + "used_llm": False, + "model_used": "embedding:test", + "latency_ms": 0, + }, + ) + summarize_mock = mocker.patch("core.pipeline.run_summarization") + + result = process_content(pipeline_context.content.id) + + pipeline_context.content.refresh_from_db() + assert result["status"] == "review" + assert pipeline_context.content.is_active is True + summarize_mock.assert_not_called() + review_item = ReviewQueue.objects.get(content=pipeline_context.content, reason=ReviewReason.BORDERLINE_RELEVANCE) + assert review_item.confidence == pytest.approx(0.55) + + +def test_process_content_archives_irrelevant_items(pipeline_context, mocker): + mocker.patch( + "core.pipeline.run_content_classification", + return_value={ + "content_type": "other", + "confidence": 0.7, + "explanation": "Low signal classification.", + "model_used": "heuristic", + "latency_ms": 0, + }, + ) + mocker.patch( + "core.pipeline.run_relevance_scoring", + return_value={ + "relevance_score": 0.2, + "explanation": "Far from the tenant reference corpus.", + "used_llm": False, + "model_used": "embedding:test", + "latency_ms": 0, + }, + ) + summarize_mock = mocker.patch("core.pipeline.run_summarization") + + result = process_content(pipeline_context.content.id) + + pipeline_context.content.refresh_from_db() + assert result["status"] == "archived" + assert pipeline_context.content.is_active is False + summarize_mock.assert_not_called() + assert ReviewQueue.objects.filter(content=pipeline_context.content, reason=ReviewReason.BORDERLINE_RELEVANCE).count() == 0 + + +def test_process_content_adds_review_item_for_low_confidence_classification(pipeline_context, mocker): + mocker.patch( + "core.pipeline.run_content_classification", + return_value={ + "content_type": "other", + "confidence": 0.3, + "explanation": "Ambiguous content.", + "model_used": "heuristic", + "latency_ms": 0, + }, + ) + mocker.patch( + "core.pipeline.run_relevance_scoring", + return_value={ + "relevance_score": 0.9, + "explanation": "Close to the tenant baseline.", + "used_llm": False, + "model_used": "embedding:test", + "latency_ms": 0, + }, + ) + mocker.patch( + "core.pipeline.run_summarization", + return_value={ + "summary": "Summary present even though classification confidence was low.", + "model_used": "heuristic", + "latency_ms": 0, + }, + ) + + result = process_content(pipeline_context.content.id) + + assert result["status"] == "completed" + review_item = ReviewQueue.objects.get( + content=pipeline_context.content, + reason=ReviewReason.LOW_CONFIDENCE_CLASSIFICATION, + ) + assert review_item.confidence == pytest.approx(0.3) diff --git a/core/tests/test_tasks.py b/core/tests/test_tasks.py index df7a53ea..02de1c52 100644 --- a/core/tests/test_tasks.py +++ b/core/tests/test_tasks.py @@ -24,6 +24,7 @@ def source_plugin_context(django_user_model): def test_run_ingestion_creates_content_from_rss_entries(source_plugin_context, mocker): upsert_embedding_mock = mocker.patch("core.tasks.upsert_content_embedding") + process_content_delay_mock = mocker.patch("core.tasks.process_content.delay") parse_mock = mocker.patch("core.plugins.rss.feedparser.parse") source_config = SourceConfig.objects.create( tenant=source_plugin_context.tenant, @@ -50,12 +51,14 @@ def test_run_ingestion_creates_content_from_rss_entries(source_plugin_context, m assert content.tenant == source_plugin_context.tenant assert content.entity == source_plugin_context.entity upsert_embedding_mock.assert_called_once_with(content) + process_content_delay_mock.assert_called_once_with(content.id) assert SourceConfig.objects.get(pk=source_config.id).last_fetched_at is not None ingestion_run = IngestionRun.objects.get(tenant=source_plugin_context.tenant, plugin_name=SourcePluginName.RSS) assert ingestion_run.status == RunStatus.SUCCESS def test_run_ingestion_skips_duplicate_urls(source_plugin_context, mocker): upsert_embedding_mock = mocker.patch("core.tasks.upsert_content_embedding") + process_content_delay_mock = mocker.patch("core.tasks.process_content.delay") parse_mock = mocker.patch("core.plugins.rss.feedparser.parse") source_config = SourceConfig.objects.create( tenant=source_plugin_context.tenant, @@ -89,10 +92,12 @@ def test_run_ingestion_skips_duplicate_urls(source_plugin_context, mocker): assert result["items_fetched"] == 1 assert result["items_ingested"] == 0 upsert_embedding_mock.assert_not_called() + process_content_delay_mock.assert_not_called() assert Content.objects.filter(url="https://example.com/post-1").count() == 1 def test_run_ingestion_creates_content_from_reddit_posts(source_plugin_context, mocker): upsert_embedding_mock = mocker.patch("core.tasks.upsert_content_embedding") + process_content_delay_mock = mocker.patch("core.tasks.process_content.delay") reddit_mock = mocker.patch("core.plugins.reddit.praw.Reddit") source_config = SourceConfig.objects.create( tenant=source_plugin_context.tenant, @@ -117,6 +122,7 @@ def test_run_ingestion_creates_content_from_reddit_posts(source_plugin_context, assert result["items_ingested"] == 1 content = Content.objects.get(title="Reddit Post") upsert_embedding_mock.assert_called_once_with(content) + process_content_delay_mock.assert_called_once_with(content.id) assert content.source_plugin == SourcePluginName.REDDIT assert content.entity is None diff --git a/newsletter_maker/settings/ai.py b/newsletter_maker/settings/ai.py index 8a89beba..9d30ee98 100644 --- a/newsletter_maker/settings/ai.py +++ b/newsletter_maker/settings/ai.py @@ -2,11 +2,35 @@ from .base import env_bool + +def env_float(name: str, default: float) -> float: + value = os.getenv(name) + if value is None: + return default + return float(value) + + +def env_int(name: str, default: int) -> int: + value = os.getenv(name) + if value is None: + return default + return int(value) + QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333") OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") OPENROUTER_API_BASE = os.getenv("OPENROUTER_API_BASE", "https://openrouter.ai/api/v1") OPENROUTER_APP_URL = os.getenv("OPENROUTER_APP_URL", "") OPENROUTER_APP_NAME = os.getenv("OPENROUTER_APP_NAME", "newsletter-maker") +AI_CLASSIFICATION_MODEL = os.getenv("AI_CLASSIFICATION_MODEL", "meta-llama/llama-3.1-70b-instruct") +AI_RELEVANCE_MODEL = os.getenv("AI_RELEVANCE_MODEL", "qwen/qwen-2.5-72b-instruct") +AI_SUMMARIZATION_MODEL = os.getenv("AI_SUMMARIZATION_MODEL", "google/gemma-3-27b-it") +AI_CLASSIFICATION_REVIEW_THRESHOLD = env_float("AI_CLASSIFICATION_REVIEW_THRESHOLD", default=0.6) +AI_RELEVANCE_LOW_THRESHOLD = env_float("AI_RELEVANCE_LOW_THRESHOLD", default=0.5) +AI_RELEVANCE_HIGH_THRESHOLD = env_float("AI_RELEVANCE_HIGH_THRESHOLD", default=0.85) +AI_RELEVANCE_REVIEW_THRESHOLD = env_float("AI_RELEVANCE_REVIEW_THRESHOLD", default=0.4) +AI_RELEVANCE_SUMMARIZE_THRESHOLD = env_float("AI_RELEVANCE_SUMMARIZE_THRESHOLD", default=0.7) +AI_MAX_NODE_RETRIES = env_int("AI_MAX_NODE_RETRIES", default=2) +AI_REQUEST_TIMEOUT_SECONDS = env_float("AI_REQUEST_TIMEOUT_SECONDS", default=60.0) EMBEDDING_PROVIDER = os.getenv("EMBEDDING_PROVIDER", "sentence-transformers") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2") EMBEDDING_TRUST_REMOTE_CODE = env_bool("EMBEDDING_TRUST_REMOTE_CODE", default=False) diff --git a/newsletter_maker/settings/base.py b/newsletter_maker/settings/base.py index 37280aaa..14e78c15 100644 --- a/newsletter_maker/settings/base.py +++ b/newsletter_maker/settings/base.py @@ -153,7 +153,7 @@ def env_list(name: str, default: str = "") -> list[str]: "href": lambda request: static("core/favicon.ico"), }, ], - "SITE_ICON": lambda request: static("core/logo.svg"), + "SITE_ICON": lambda request: static("core/logo.png"), "SITE_SYMBOL": "newsletter", } diff --git a/requirements.txt b/requirements.txt index 95bf1b90..6fe1d3ac 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,9 +19,17 @@ httpx==0.28.1 identify==2.6.19 inflection==0.5.1 jsbeautifier==1.15.4 +langchain-core==1.3.2 +langchain-protocol==0.0.12 +langgraph-checkpoint==4.0.2 +langgraph-prebuilt==1.0.11 +langgraph-sdk==0.3.13 +langgraph==1.1.9 +langsmith==0.7.36 librt==0.9.0 mypy==1.20.2 nodeenv==1.10.0 +ormsgpack==1.12.2 praw==7.8.1 pre-commit==4.6.0 psycopg[binary]==3.3.3 @@ -32,9 +40,12 @@ pytest-mock==3.15.1 pytest==9.0.3 python-dotenv==1.2.2 qdrant-client==1.17.1 +requests-toolbelt==1.0.0 ruff==0.15.12 sentence-transformers==5.4.1 structlog==25.5.0 types-pyyaml==6.0.12.20260408 uritemplate==4.2.0 +uuid-utils==0.14.1 watchdog==6.0.0 +xxhash==3.7.0 diff --git a/skills/content_classification/SKILL.md b/skills/content_classification/SKILL.md new file mode 100644 index 00000000..3622de7f --- /dev/null +++ b/skills/content_classification/SKILL.md @@ -0,0 +1,17 @@ +--- +name: content_classification +input: title, content_text, url +output: content_type, confidence, explanation +--- + +Classify newsletter content into one of these categories: + +- technical_article +- tutorial +- opinion +- product_announcement +- event +- release_notes +- other + +Return structured JSON with `content_type`, `confidence`, and `explanation`. diff --git a/skills/relevance_scoring/SKILL.md b/skills/relevance_scoring/SKILL.md new file mode 100644 index 00000000..192f80e2 --- /dev/null +++ b/skills/relevance_scoring/SKILL.md @@ -0,0 +1,13 @@ +--- +name: relevance_scoring +input: content_embedding, tenant_id +output: relevance_score, explanation, used_llm +--- + +Score how relevant a piece of content is for a tenant using reference-corpus similarity first. + +- Similarity >= 0.85: use the similarity score directly. +- Similarity < 0.5: use the similarity score directly. +- Similarity between 0.5 and 0.85: use an LLM for nuanced judgment when available. + +Return structured JSON with `relevance_score`, `explanation`, and `used_llm`. diff --git a/skills/summarization/SKILL.md b/skills/summarization/SKILL.md new file mode 100644 index 00000000..f8a117a7 --- /dev/null +++ b/skills/summarization/SKILL.md @@ -0,0 +1,9 @@ +--- +name: summarization +input: title, content_text, newsletter_topic +output: summary +--- + +Write a concise 2-3 sentence newsletter-ready summary for content that has already been judged relevant. + +Return structured JSON with a single `summary` field.