Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 58 additions & 13 deletions core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import logging
import re
from datetime import timedelta
from functools import lru_cache
from typing import Any, Literal, TypedDict, cast
from functools import lru_cache, wraps
from typing import Any, Callable, Literal, TypedDict, cast

from django.conf import settings
from django.db.models import F, Model
Expand All @@ -30,6 +30,7 @@
from core.llm import build_skill_user_prompt, get_skill_definition, openrouter_chat_json
from entities.extraction import run_entity_extraction
from entities.models import EntityMention
from newsletter_maker.telemetry import trace_span
from pipeline.models import (
ReviewQueue,
ReviewReason,
Expand Down Expand Up @@ -98,6 +99,9 @@ class PipelineState(TypedDict, total=False):
status: str


PipelineNodeCallable = Callable[[PipelineState], PipelineState]


def _require_pk(instance: Model) -> int:
"""Return a saved model primary key for typed pipeline operations."""

Expand All @@ -122,6 +126,30 @@ def _content_id_from_state(state: PipelineState) -> int:
return content_id


def _trace_pipeline_step(
step_name: str,
) -> Callable[[PipelineNodeCallable], PipelineNodeCallable]:
"""Wrap a LangGraph node with a stable telemetry span name."""

def decorator(func: PipelineNodeCallable) -> PipelineNodeCallable:
@wraps(func)
def wrapper(state: PipelineState) -> PipelineState:
with trace_span(
f"pipeline.{step_name}",
attributes={
"pipeline.step": step_name,
"content.id": state.get("content_id"),
"project.id": state.get("project_id"),
"pipeline.status": state.get("status"),
},
):
return func(state)

return cast(PipelineNodeCallable, wrapper)

return decorator


@lru_cache(maxsize=1)
def get_ingestion_graph():
"""Build and cache the LangGraph workflow used for content processing.
Expand Down Expand Up @@ -199,18 +227,26 @@ def process_content_pipeline(content_id: int) -> PipelineState:
"""

content = Content.objects.select_related("project").get(pk=content_id)
content.pipeline_state = ContentPipelineState.PROCESSING
content.save(update_fields=["pipeline_state"])
retry_budget = build_retry_budget(PIPELINE_RETRY_SKILLS)
initial_state: PipelineState = {
"content_id": _require_pk(content),
"project_id": _project_pk(content),
"retry_budget_remaining": retry_budget.remaining_retries,
"status": "processing",
}
return cast(PipelineState, get_ingestion_graph().invoke(initial_state))
with trace_span(
"pipeline.process_content",
attributes={
"content.id": _require_pk(content),
"project.id": _project_pk(content),
},
):
content.pipeline_state = ContentPipelineState.PROCESSING
content.save(update_fields=["pipeline_state"])
retry_budget = build_retry_budget(PIPELINE_RETRY_SKILLS)
initial_state: PipelineState = {
"content_id": _require_pk(content),
"project_id": _project_pk(content),
"retry_budget_remaining": retry_budget.remaining_retries,
"status": "processing",
}
return cast(PipelineState, get_ingestion_graph().invoke(initial_state))


@_trace_pipeline_step("deduplicate")
def deduplicate_node(state: PipelineState) -> PipelineState:
"""Detect duplicates before downstream skills consume the content."""

Expand Down Expand Up @@ -263,6 +299,7 @@ def deduplicate_node(state: PipelineState) -> PipelineState:
}


@_trace_pipeline_step("classify")
def classify_node(state: PipelineState) -> PipelineState:
"""Classify the content item and persist the resulting skill output."""

Expand Down Expand Up @@ -312,6 +349,7 @@ def classify_node(state: PipelineState) -> PipelineState:
}


@_trace_pipeline_step("extract_entities")
def extract_entities_node(state: PipelineState) -> PipelineState:
"""Extract tracked-entity mentions before relevance scoring."""

Expand Down Expand Up @@ -348,6 +386,7 @@ def extract_entities_node(state: PipelineState) -> PipelineState:
}


@_trace_pipeline_step("score_relevance")
def relevance_node(state: PipelineState) -> PipelineState:
"""Score content relevance, persist the score, and keep the item active."""

Expand Down Expand Up @@ -392,6 +431,7 @@ def relevance_node(state: PipelineState) -> PipelineState:
}


@_trace_pipeline_step("summarize")
def summarize_node(state: PipelineState) -> PipelineState:
"""Generate and store a newsletter-ready summary for relevant content."""

Expand Down Expand Up @@ -430,6 +470,7 @@ def summarize_node(state: PipelineState) -> PipelineState:
}


@_trace_pipeline_step("archive")
def archive_node(state: PipelineState) -> PipelineState:
"""Mark a low-value content item inactive so it drops out of active flows."""

Expand All @@ -440,6 +481,7 @@ def archive_node(state: PipelineState) -> PipelineState:
return {"status": "archived"}


@_trace_pipeline_step("queue_review")
def queue_review_node(state: PipelineState) -> PipelineState:
"""Create or refresh a manual review item for borderline relevance."""

Expand Down Expand Up @@ -1582,6 +1624,9 @@ def retry_review_queue_item(review_item: ReviewQueue) -> dict[str, object]:
else ContentPipelineState.COMPLETED
)
content.save(update_fields=["pipeline_state"])
else:
content.pipeline_state = ContentPipelineState.AWAITING_REVIEW
content.save(update_fields=["pipeline_state"])
else:
summary = _run_ad_hoc_summarization(content)
result = {
Expand All @@ -1595,7 +1640,7 @@ def retry_review_queue_item(review_item: ReviewQueue) -> dict[str, object]:
)
content.save(update_fields=["pipeline_state"])

if result.get("status") != "awaiting_review":
if result.get("status") in {"completed", "archived"}:
review_item.resolved = True
review_item.resolution = ReviewResolution.RETRIED
review_item.resolved_at = timezone.now()
Expand Down
79 changes: 79 additions & 0 deletions core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
from pipeline.models import ReviewQueue
from pipeline.resilience import opened_circuit_breakers, probe_circuit_breaker
from projects.models import Project, ProjectConfig
from trends.models import (
SourceDiversitySnapshot,
TopicCentroidSnapshot,
TopicVelocitySnapshot,
TrendTaskRun,
)

logger = logging.getLogger(__name__)

Expand All @@ -44,11 +50,13 @@
)

__all__ = [
"apply_retention_policies",
"circuit_breaker_health_check",
"recompute_source_quality",
"recompute_authority_scores",
"run_all_source_quality_recomputations",
"run_all_authority_recomputations",
"run_all_retention_policies",
"run_relevance_scoring_skill",
"run_summarization_skill",
"queue_content_skill",
Expand Down Expand Up @@ -84,6 +92,12 @@ def _require_pk(instance: Model) -> int:
return int(pk)


def _retention_cutoff(days: int):
"""Return the timestamp cutoff for a retention window in days."""

return timezone.now() - timedelta(days=days)


@shared_task(name="core.tasks.run_all_authority_recomputations")
def run_all_authority_recomputations():
"""Queue authority recomputation for every project.
Expand Down Expand Up @@ -115,6 +129,19 @@ def run_all_source_quality_recomputations() -> int:
return len(project_ids)


@shared_task(name="core.tasks.run_all_retention_policies")
def run_all_retention_policies() -> int:
"""Queue retention-policy cleanup for every project."""

project_ids = list(Project.objects.values_list("id", flat=True))
for project_id in project_ids:
if settings.CELERY_TASK_ALWAYS_EAGER:
apply_retention_policies(project_id)
else:
_enqueue_task(apply_retention_policies, project_id)
return len(project_ids)


@shared_task(name="core.tasks.process_content")
def process_content(content_id: int):
"""Run the main AI pipeline for a stored content item."""
Expand Down Expand Up @@ -152,6 +179,58 @@ def retry_pipeline_review_item(review_item_id: int) -> dict[str, object]:
return retry_review_queue_item_from_pipeline(review_item)


@shared_task(name="core.tasks.apply_retention_policies")
def apply_retention_policies(project_id: int) -> dict[str, object]:
"""Delete old observability records for one project."""

project = Project.objects.get(pk=project_id)
snapshot_cutoff = _retention_cutoff(settings.OBSERVABILITY_SNAPSHOT_RETENTION_DAYS)
trend_run_cutoff = _retention_cutoff(
settings.OBSERVABILITY_TREND_TASK_RUN_RETENTION_DAYS
)
review_cutoff = _retention_cutoff(
settings.OBSERVABILITY_REVIEW_QUEUE_RETENTION_DAYS
)

deleted = {
"topic_centroid_snapshots": TopicCentroidSnapshot.objects.filter(
project=project,
computed_at__lt=snapshot_cutoff,
).delete()[0],
"topic_velocity_snapshots": TopicVelocitySnapshot.objects.filter(
project=project,
computed_at__lt=snapshot_cutoff,
).delete()[0],
"source_diversity_snapshots": SourceDiversitySnapshot.objects.filter(
project=project,
computed_at__lt=snapshot_cutoff,
).delete()[0],
"entity_authority_snapshots": EntityAuthoritySnapshot.objects.filter(
project=project,
computed_at__lt=snapshot_cutoff,
).delete()[0],
"trend_task_runs": TrendTaskRun.objects.filter(project=project)
.filter(
Q(finished_at__lt=trend_run_cutoff)
| Q(finished_at__isnull=True, started_at__lt=trend_run_cutoff)
)
.delete()[0],
"resolved_review_items": ReviewQueue.objects.filter(
project=project,
resolved=True,
resolved_at__lt=review_cutoff,
).delete()[0],
}

return {
"project_id": project_id,
"deleted": deleted,
"snapshot_cutoff": snapshot_cutoff.isoformat(),
"trend_run_cutoff": trend_run_cutoff.isoformat(),
"review_cutoff": review_cutoff.isoformat(),
}


@shared_task(name="core.tasks.recompute_source_quality")
def recompute_source_quality(project_id: int) -> dict[str, object]:
"""Recompute source-quality scores used by entity authority scoring."""
Expand Down
Loading
Loading