diff --git a/core/models.py b/core/models.py index 2b1bd85a..dc7d9b50 100644 --- a/core/models.py +++ b/core/models.py @@ -26,7 +26,10 @@ from pipeline.models import SkillResult as _SkillResult from pipeline.models import SkillStatus as _SkillStatus from projects.models import Project as _Project +from trends.models import ContentClusterMembership as _ContentClusterMembership from trends.models import TopicCentroidSnapshot as _TopicCentroidSnapshot +from trends.models import TopicCluster as _TopicCluster +from trends.models import TopicVelocitySnapshot as _TopicVelocitySnapshot Project = _Project ReviewQueue = _ReviewQueue @@ -34,10 +37,14 @@ ReviewResolution = _ReviewResolution SkillResult = _SkillResult SkillStatus = _SkillStatus +ContentClusterMembership = _ContentClusterMembership +TopicCluster = _TopicCluster TopicCentroidSnapshot = _TopicCentroidSnapshot +TopicVelocitySnapshot = _TopicVelocitySnapshot __all__ = [ "Content", + "ContentClusterMembership", "Entity", "EntityAuthoritySnapshot", "EntityCandidate", @@ -53,6 +60,9 @@ "NewsletterIntakeStatus", "Project", "RunStatus", + "TopicCluster", + "TopicCentroidSnapshot", + "TopicVelocitySnapshot", "UserFeedback", ] diff --git a/core/tasks.py b/core/tasks.py index 3be63815..f7f59b7b 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -49,9 +49,13 @@ from newsletters.tasks import process_newsletter_intake from trends.tasks import ( TOPIC_CENTROID_MIN_UPVOTES, + assign_content_to_topic_cluster, queue_topic_centroid_recompute, recompute_topic_centroid, + recompute_topic_clusters, + recompute_topic_velocity, run_all_topic_centroid_recomputations, + run_all_topic_cluster_recomputations, ) _COMPAT_TASK_EXPORTS = { @@ -61,15 +65,25 @@ ), "run_all_ingestions": ("ingestion.tasks", "run_all_ingestions"), "run_ingestion": ("ingestion.tasks", "run_ingestion"), + "assign_content_to_topic_cluster": ( + "trends.tasks", + "assign_content_to_topic_cluster", + ), "TOPIC_CENTROID_MIN_UPVOTES": ( "trends.tasks", "TOPIC_CENTROID_MIN_UPVOTES", ), + "recompute_topic_clusters": ("trends.tasks", "recompute_topic_clusters"), "queue_topic_centroid_recompute": ( "trends.tasks", "queue_topic_centroid_recompute", ), "recompute_topic_centroid": ("trends.tasks", "recompute_topic_centroid"), + "recompute_topic_velocity": ("trends.tasks", "recompute_topic_velocity"), + "run_all_topic_cluster_recomputations": ( + "trends.tasks", + "run_all_topic_cluster_recomputations", + ), "run_all_topic_centroid_recomputations": ( "trends.tasks", "run_all_topic_centroid_recomputations", @@ -79,12 +93,16 @@ __all__ = [ "process_newsletter_intake", "run_all_ingestions", + "assign_content_to_topic_cluster", "run_ingestion", "TOPIC_CENTROID_MIN_UPVOTES", "queue_topic_centroid_recompute", "recompute_authority_scores", + "recompute_topic_clusters", "recompute_topic_centroid", + "recompute_topic_velocity", "run_all_authority_recomputations", + "run_all_topic_cluster_recomputations", "run_all_topic_centroid_recomputations", "run_relevance_scoring_skill", "run_summarization_skill", diff --git a/core/tests/test_api.py b/core/tests/test_api.py index f8474db1..03918b1a 100644 --- a/core/tests/test_api.py +++ b/core/tests/test_api.py @@ -8,6 +8,7 @@ from core.models import ( Content, + ContentClusterMembership, Entity, EntityAuthoritySnapshot, EntityCandidate, @@ -24,6 +25,8 @@ SkillResult, SkillStatus, TopicCentroidSnapshot, + TopicCluster, + TopicVelocitySnapshot, UserFeedback, ) from projects.model_support import SourcePluginName @@ -540,6 +543,115 @@ def test_topic_centroid_summary_action_returns_latest_snapshot_and_averages(self self.assertAlmostEqual(response.json()["avg_drift_from_previous"], 0.2) self.assertAlmostEqual(response.json()["avg_drift_from_week_ago"], 0.3) + def test_topic_cluster_list_returns_current_velocity_annotation(self): + cluster = TopicCluster.objects.create( + project=self.owner_project, + first_seen_at="2026-04-22T00:00:00Z", + last_seen_at="2026-04-24T00:00:00Z", + is_active=True, + member_count=3, + dominant_entity=self.owner_entity, + ) + TopicVelocitySnapshot.objects.create( + cluster=cluster, + project=self.owner_project, + window_count=4, + trailing_mean=1.5, + trailing_stddev=0.5, + z_score=3.0, + velocity_score=1.0, + ) + + response = self.client.get( + reverse( + "v1:project-topic-cluster-list", + kwargs={"project_id": self.owner_project.id}, + ), + {"ordering": "-velocity_score"}, + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.json()), 1) + self.assertEqual(response.json()[0]["id"], cluster.id) + self.assertEqual(response.json()[0]["member_count"], 3) + self.assertEqual( + response.json()[0]["dominant_entity"]["id"], self.owner_entity.id + ) + self.assertAlmostEqual(response.json()[0]["velocity_score"], 1.0) + self.assertAlmostEqual(response.json()[0]["z_score"], 3.0) + self.assertEqual(response.json()[0]["window_count"], 4) + + def test_topic_cluster_detail_and_velocity_history_action_return_memberships(self): + cluster = TopicCluster.objects.create( + project=self.owner_project, + first_seen_at="2026-04-22T00:00:00Z", + last_seen_at="2026-04-24T00:00:00Z", + is_active=True, + member_count=1, + dominant_entity=self.owner_entity, + ) + ContentClusterMembership.objects.create( + content=self.owner_content, + cluster=cluster, + project=self.owner_project, + similarity=0.92, + ) + first_snapshot = TopicVelocitySnapshot.objects.create( + cluster=cluster, + project=self.owner_project, + window_count=2, + trailing_mean=1.0, + trailing_stddev=0.2, + z_score=1.5, + velocity_score=0.75, + ) + second_snapshot = TopicVelocitySnapshot.objects.create( + cluster=cluster, + project=self.owner_project, + window_count=3, + trailing_mean=1.0, + trailing_stddev=0.3, + z_score=3.0, + velocity_score=1.0, + ) + TopicVelocitySnapshot.objects.filter(pk=first_snapshot.pk).update( + computed_at="2026-04-23T00:00:00Z" + ) + TopicVelocitySnapshot.objects.filter(pk=second_snapshot.pk).update( + computed_at="2026-04-24T00:00:00Z" + ) + + detail_response = self.client.get( + reverse( + "v1:project-topic-cluster-detail", + kwargs={"project_id": self.owner_project.id, "pk": cluster.id}, + ) + ) + history_response = self.client.get( + reverse( + "v1:project-topic-cluster-velocity-history", + kwargs={"project_id": self.owner_project.id, "pk": cluster.id}, + ), + {"limit": 1}, + ) + + self.assertEqual(detail_response.status_code, status.HTTP_200_OK) + self.assertEqual(detail_response.json()["id"], cluster.id) + self.assertEqual(len(detail_response.json()["memberships"]), 1) + self.assertEqual( + detail_response.json()["memberships"][0]["content"]["id"], + self.owner_content.id, + ) + self.assertEqual(len(detail_response.json()["velocity_history"]), 2) + self.assertEqual( + detail_response.json()["velocity_history"][0]["id"], + second_snapshot.id, + ) + + self.assertEqual(history_response.status_code, status.HTTP_200_OK) + self.assertEqual(len(history_response.json()), 1) + self.assertEqual(history_response.json()[0]["id"], second_snapshot.id) + def test_content_detail_includes_duplicate_state(self): canonical = self.owner_content canonical.canonical_url = "https://example.com/owner" diff --git a/core/tests/test_tasks.py b/core/tests/test_tasks.py index b57e7cf3..e44119d3 100644 --- a/core/tests/test_tasks.py +++ b/core/tests/test_tasks.py @@ -1,10 +1,11 @@ -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from types import SimpleNamespace import pytest from core.models import ( Content, + ContentClusterMembership, Entity, EntityAuthoritySnapshot, EntityMention, @@ -14,6 +15,8 @@ RunStatus, SkillStatus, TopicCentroidSnapshot, + TopicCluster, + TopicVelocitySnapshot, UserFeedback, ) from core.pipeline import RELEVANCE_SKILL_NAME, SUMMARIZATION_SKILL_NAME @@ -29,9 +32,13 @@ from projects.models import Project, ProjectConfig, SourceConfig from trends.tasks import ( TOPIC_CENTROID_MIN_UPVOTES, + assign_content_to_topic_cluster, queue_topic_centroid_recompute, recompute_topic_centroid, + recompute_topic_clusters, + recompute_topic_velocity, run_all_topic_centroid_recomputations, + run_all_topic_cluster_recomputations, ) pytestmark = pytest.mark.django_db @@ -445,6 +452,23 @@ def test_run_all_topic_centroid_recomputations_executes_inline_when_eager( delay_mock.assert_not_called() +def test_run_all_topic_cluster_recomputations_enqueues_all_projects( + source_plugin_context, mocker +): + delay_mock = mocker.patch("trends.tasks.recompute_topic_clusters.delay") + other_project = Project.objects.create( + name="Other Cluster Project", + topic_description="Security", + ) + + enqueued_count = run_all_topic_cluster_recomputations() + + assert enqueued_count == 2 + delay_mock.assert_any_call(source_plugin_context.project.id) + delay_mock.assert_any_call(other_project.id) + assert delay_mock.call_count == 2 + + def test_recompute_authority_scores_updates_entities_and_creates_snapshots( source_plugin_context, mocker ): @@ -706,6 +730,209 @@ def test_recompute_topic_centroid_disables_centroid_below_minimum_upvotes( assert snapshot.drift_from_previous is None +def test_recompute_topic_clusters_groups_recent_similar_content( + source_plugin_context, mocker +): + project = source_plugin_context.project + second_entity = Entity.objects.create( + project=project, + name="Secondary Entity", + type="vendor", + ) + vector_lookup = { + "Trend 1": [1.0, 0.0], + "Trend 2": [0.99, 0.01], + "Trend 3": [0.98, 0.02], + "Trend 4": [0.97, 0.03], + "Outlier": [0.0, 1.0], + } + mocker.patch( + "trends.tasks.embed_text", + side_effect=lambda text: vector_lookup[text.split("\n\n", 1)[0]], + ) + delay_mock = mocker.patch("trends.tasks.recompute_topic_velocity.delay") + + clustered_contents = [] + for index in range(4): + content = Content.objects.create( + project=project, + entity=source_plugin_context.entity, + url=f"https://example.com/trend-{index}", + title=f"Trend {index + 1}", + author="Author", + source_plugin=SourcePluginName.RSS, + published_date=f"2026-04-2{index}T12:00:00Z", + content_text="Clusterable trend content", + ) + clustered_contents.append(content) + EntityMention.objects.create( + project=project, + content=content, + entity=source_plugin_context.entity, + role=EntityMentionRole.SUBJECT, + ) + outlier = Content.objects.create( + project=project, + entity=second_entity, + url="https://example.com/outlier", + title="Outlier", + author="Author", + source_plugin=SourcePluginName.RSS, + published_date="2026-04-24T12:00:00Z", + content_text="Outlier trend content", + ) + + result = recompute_topic_clusters(project.id) + + cluster = TopicCluster.objects.get(project=project, is_active=True) + memberships = list(cluster.memberships.values_list("content_id", flat=True)) + + assert result["contents_considered"] == 5 + assert result["clusters_updated"] == 1 + assert cluster.member_count == 4 + assert cluster.dominant_entity == source_plugin_context.entity + assert set(memberships) == {content.id for content in clustered_contents} + assert outlier.id not in memberships + delay_mock.assert_called_once_with(project.id) + + +def test_assign_content_to_topic_cluster_adds_similar_content_to_existing_cluster( + source_plugin_context, mocker +): + project = source_plugin_context.project + vector_lookup = { + "Cluster 1": [1.0, 0.0], + "Cluster 2": [0.99, 0.01], + "Cluster 3": [0.98, 0.02], + "Candidate": [0.97, 0.03], + } + mocker.patch( + "trends.tasks.embed_text", + side_effect=lambda text: vector_lookup[text.split("\n\n", 1)[0]], + ) + + existing_contents = [] + for index in range(3): + content = Content.objects.create( + project=project, + entity=source_plugin_context.entity, + url=f"https://example.com/cluster-{index}", + title=f"Cluster {index + 1}", + author="Author", + source_plugin=SourcePluginName.RSS, + published_date=f"2026-04-2{index}T12:00:00Z", + content_text="Existing cluster content", + ) + existing_contents.append(content) + cluster = TopicCluster.objects.create( + project=project, + first_seen_at=datetime(2026, 4, 20, 12, 0, tzinfo=timezone.utc), + last_seen_at=datetime(2026, 4, 22, 12, 0, tzinfo=timezone.utc), + is_active=True, + member_count=3, + dominant_entity=source_plugin_context.entity, + ) + ContentClusterMembership.objects.bulk_create( + [ + ContentClusterMembership( + content=content, + cluster=cluster, + project=project, + similarity=0.9, + ) + for content in existing_contents + ] + ) + candidate = Content.objects.create( + project=project, + entity=source_plugin_context.entity, + url="https://example.com/candidate", + title="Candidate", + author="Author", + source_plugin=SourcePluginName.RSS, + published_date="2026-04-24T12:00:00Z", + content_text="New similar cluster content", + ) + + result = assign_content_to_topic_cluster(candidate.id) + + cluster.refresh_from_db() + membership = ContentClusterMembership.objects.get(content=candidate) + assert result["assigned"] is True + assert result["cluster_id"] == cluster.id + assert membership.cluster == cluster + assert cluster.member_count == 4 + assert cluster.is_active is True + + +def test_recompute_topic_velocity_detects_synthetic_burst( + source_plugin_context, mocker +): + project = source_plugin_context.project + fixed_now = datetime(2026, 4, 30, 12, 0, tzinfo=timezone.utc) + mocker.patch("trends.tasks.timezone.now", return_value=fixed_now) + cluster = TopicCluster.objects.create( + project=project, + first_seen_at=fixed_now - timedelta(days=8), + last_seen_at=fixed_now, + is_active=True, + member_count=11, + dominant_entity=source_plugin_context.entity, + ) + + membership_rows = [] + for offset in range(1, 8): + content = Content.objects.create( + project=project, + entity=source_plugin_context.entity, + url=f"https://example.com/baseline-{offset}", + title=f"Baseline {offset}", + author="Author", + source_plugin=SourcePluginName.RSS, + published_date=fixed_now - timedelta(days=offset, hours=1), + content_text="Baseline trend content", + ) + membership_rows.append( + ContentClusterMembership( + content=content, + cluster=cluster, + project=project, + similarity=0.9, + ) + ) + for index in range(4): + content = Content.objects.create( + project=project, + entity=source_plugin_context.entity, + url=f"https://example.com/burst-{index}", + title=f"Burst {index}", + author="Author", + source_plugin=SourcePluginName.RSS, + published_date=fixed_now - timedelta(hours=index + 1), + content_text="Burst trend content", + ) + membership_rows.append( + ContentClusterMembership( + content=content, + cluster=cluster, + project=project, + similarity=0.95, + ) + ) + ContentClusterMembership.objects.bulk_create(membership_rows) + + result = recompute_topic_velocity(project.id) + + snapshot = TopicVelocitySnapshot.objects.get(cluster=cluster) + assert result["clusters_evaluated"] == 1 + assert result["snapshots_created"] == 1 + assert snapshot.window_count == 4 + assert snapshot.trailing_mean == pytest.approx(1.0) + assert snapshot.trailing_stddev == pytest.approx(0.0) + assert snapshot.z_score == pytest.approx(3.0) + assert snapshot.velocity_score == pytest.approx(1.0) + + def test_run_ingestion_marks_failure_when_plugin_errors(source_plugin_context, mocker): parse_mock = mocker.patch("core.plugins.rss.feedparser.parse") source_config = SourceConfig.objects.create( diff --git a/ingestion/tasks.py b/ingestion/tasks.py index e972e7a1..4b956752 100644 --- a/ingestion/tasks.py +++ b/ingestion/tasks.py @@ -135,9 +135,14 @@ def _match_entity_for_item(plugin, item): def _schedule_content_processing(content: Content) -> None: """Ensure a content row is embedded before it enters the AI pipeline.""" - from core.tasks import process_content, upsert_content_embedding + from core.tasks import ( + assign_content_to_topic_cluster, + process_content, + upsert_content_embedding, + ) upsert_content_embedding(content) + assign_content_to_topic_cluster(content.id) if settings.CELERY_TASK_ALWAYS_EAGER: process_content(content.id) else: diff --git a/newsletter_maker/settings/celery.py b/newsletter_maker/settings/celery.py index 40f5e9a8..875ee3cc 100644 --- a/newsletter_maker/settings/celery.py +++ b/newsletter_maker/settings/celery.py @@ -6,8 +6,8 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") -# Celery: these settings point workers at Redis and keep the recurring -# ingestion job on its 6-hour beat schedule. +# Celery: these settings point workers at Redis and keep recurring +# ingestion and trend-analysis jobs on their beat schedules. CELERY_BROKER_URL = REDIS_URL CELERY_RESULT_BACKEND = REDIS_URL CELERY_TASK_ALWAYS_EAGER = env_bool("CELERY_TASK_ALWAYS_EAGER", default=False) @@ -26,6 +26,10 @@ "task": "core.tasks.run_all_topic_centroid_recomputations", "schedule": crontab(hour=3, minute=0), }, + "run-all-topic-cluster-recomputations-nightly": { + "task": "core.tasks.run_all_topic_cluster_recomputations", + "schedule": crontab(hour=4, minute=0), + }, } __all__ = [ diff --git a/newsletters/tasks.py b/newsletters/tasks.py index 2e2f4609..eeb32961 100644 --- a/newsletters/tasks.py +++ b/newsletters/tasks.py @@ -85,9 +85,14 @@ def process_newsletter_intake(intake_id: int): def _schedule_content_processing(content: Content) -> None: """Ensure a content row is embedded before it enters the AI pipeline.""" - from core.tasks import process_content, upsert_content_embedding + from core.tasks import ( + assign_content_to_topic_cluster, + process_content, + upsert_content_embedding, + ) upsert_content_embedding(content) + assign_content_to_topic_cluster(content.id) if settings.CELERY_TASK_ALWAYS_EAGER: process_content(content.id) else: diff --git a/trends/api.py b/trends/api.py index dd287d02..66aebd57 100644 --- a/trends/api.py +++ b/trends/api.py @@ -1,9 +1,10 @@ """Trends-domain API viewsets kept under the existing nested project routes.""" -from django.db.models import Avg, Count, Q -from drf_spectacular.utils import extend_schema -from rest_framework import viewsets +from django.db.models import Avg, Count, OuterRef, Prefetch, Q, Subquery +from drf_spectacular.utils import OpenApiParameter, extend_schema +from rest_framework import serializers, viewsets from rest_framework.decorators import action +from rest_framework.filters import OrderingFilter from rest_framework.response import Response from core.api import ( @@ -12,12 +13,135 @@ build_crud_action_overrides, document_project_owned_viewset, ) -from core.permissions import IsProjectContributor -from trends.models import TopicCentroidSnapshot +from core.permissions import IsProjectContributor, IsProjectMember +from trends.models import ( + ContentClusterMembership, + TopicCentroidSnapshot, + TopicCluster, + TopicVelocitySnapshot, +) from trends.serializers import ( + TopicClusterDetailSerializer, + TopicClusterSerializer, TopicCentroidObservabilitySummarySerializer, TopicCentroidSnapshotSerializer, + TopicVelocitySnapshotSerializer, +) + + +@document_project_owned_viewset( + resource_plural="topic clusters", + resource_singular="topic cluster", + create_description="Topic clusters are pipeline-managed analysis rows and are exposed read-only for trend exploration.", + tag="Trend Analysis", + action_overrides=build_crud_action_overrides( + TopicClusterSerializer, + resource_plural="topic clusters for the selected project", + resource_singular="topic cluster", + ), ) +class TopicClusterViewSet(ProjectOwnedQuerysetMixin, viewsets.ReadOnlyModelViewSet): + """Inspect a project's current topic clusters and velocity history.""" + + serializer_class = TopicClusterSerializer + filter_backends = [OrderingFilter] + ordering_fields = [ + "velocity_score", + "member_count", + "last_seen_at", + "first_seen_at", + ] + ordering = ["-velocity_score", "-last_seen_at"] + queryset = TopicCluster.objects.select_related("project", "dominant_entity") + + def get_queryset(self): + """Annotate clusters with the latest persisted velocity metrics.""" + + latest_snapshot_queryset = TopicVelocitySnapshot.objects.filter( + cluster_id=OuterRef("pk") + ).order_by("-computed_at") + queryset = ( + super() + .get_queryset() + .annotate( + velocity_score=Subquery( + latest_snapshot_queryset.values("velocity_score")[:1] + ), + z_score=Subquery(latest_snapshot_queryset.values("z_score")[:1]), + window_count=Subquery( + latest_snapshot_queryset.values("window_count")[:1] + ), + velocity_computed_at=Subquery( + latest_snapshot_queryset.values("computed_at")[:1] + ), + ) + ) + if self.action == "retrieve": + queryset = queryset.prefetch_related( + Prefetch( + "memberships", + queryset=ContentClusterMembership.objects.select_related( + "content" + ).order_by("-similarity", "-assigned_at"), + ), + Prefetch( + "velocity_snapshots", + queryset=TopicVelocitySnapshot.objects.order_by("-computed_at"), + ), + ) + return queryset + + def get_serializer_class(self): + """Return the detail serializer for cluster drill-down responses.""" + + if self.action == "retrieve": + return TopicClusterDetailSerializer + return super().get_serializer_class() + + def get_permissions(self): + """Allow project members to inspect trend clusters.""" + + return [IsProjectMember()] + + @extend_schema( + summary="List velocity history", + description=( + "Return persisted velocity snapshots for one topic cluster. " + "Use the optional limit query parameter to cap the number of snapshots returned." + ), + parameters=[ + OpenApiParameter( + name="limit", + type=int, + location=OpenApiParameter.QUERY, + description="Maximum number of velocity snapshots to return.", + required=False, + ) + ], + request=None, + responses={ + 200: TopicVelocitySnapshotSerializer(many=True), + 403: AUTHENTICATION_REQUIRED_RESPONSE, + }, + tags=["Trend Analysis"], + ) + @action(detail=True, methods=["get"], url_path="velocity_history") + def velocity_history(self, request, *args, **kwargs): + """Return recent velocity snapshots for the selected topic cluster.""" + + cluster = self.get_object() + snapshots = cluster.velocity_snapshots.order_by("-computed_at") + limit_param = request.query_params.get("limit") + if limit_param: + try: + limit = max(1, min(int(limit_param), 100)) + except ValueError as exc: + raise serializers.ValidationError( + {"limit": "Limit must be an integer between 1 and 100."} + ) from exc + snapshots = snapshots[:limit] + serializer = TopicVelocitySnapshotSerializer(snapshots, many=True) + return Response(serializer.data) @document_project_owned_viewset( diff --git a/trends/api_urls.py b/trends/api_urls.py index 8dc3309a..6b5c17a1 100644 --- a/trends/api_urls.py +++ b/trends/api_urls.py @@ -2,12 +2,17 @@ from rest_framework_nested.routers import NestedSimpleRouter -from trends.api import TopicCentroidSnapshotViewSet +from trends.api import TopicCentroidSnapshotViewSet, TopicClusterViewSet def register_project_routes(project_router: NestedSimpleRouter) -> None: """Register nested trend observability endpoints.""" + project_router.register( + r"clusters", + TopicClusterViewSet, + basename="project-topic-cluster", + ) project_router.register( r"topic-centroid-snapshots", TopicCentroidSnapshotViewSet, diff --git a/trends/migrations/0002_topic_cluster_models.py b/trends/migrations/0002_topic_cluster_models.py new file mode 100644 index 00000000..9fca4809 --- /dev/null +++ b/trends/migrations/0002_topic_cluster_models.py @@ -0,0 +1,181 @@ +import django.db.models.deletion +import uuid + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("content", "0001_initial"), + ("entities", "0002_alter_entitycandidate_first_seen_in_and_more"), + ("projects", "0005_alter_sourceconfig_plugin_name"), + ("trends", "0001_initial"), + ] + + operations = [ + migrations.CreateModel( + name="TopicCluster", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "centroid_vector_id", + models.UUIDField(default=uuid.uuid4, unique=True), + ), + ("label", models.CharField(blank=True, max_length=255)), + ("first_seen_at", models.DateTimeField()), + ("last_seen_at", models.DateTimeField()), + ("is_active", models.BooleanField(default=True)), + ("member_count", models.PositiveIntegerField(default=0)), + ( + "dominant_entity", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="dominant_topic_clusters", + to="entities.entity", + ), + ), + ( + "project", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="topic_clusters", + to="projects.project", + ), + ), + ], + options={ + "ordering": ["-last_seen_at", "id"], + "db_table": "core_topiccluster", + "indexes": [ + models.Index( + fields=["project", "-last_seen_at"], + name="core_topicc_project_ff9533_idx", + ), + models.Index( + fields=["project", "is_active", "-last_seen_at"], + name="core_topicc_project_f8f19c_idx", + ), + ], + }, + ), + migrations.CreateModel( + name="TopicVelocitySnapshot", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("computed_at", models.DateTimeField(auto_now_add=True)), + ("window_count", models.PositiveIntegerField()), + ("trailing_mean", models.FloatField()), + ("trailing_stddev", models.FloatField()), + ("z_score", models.FloatField()), + ("velocity_score", models.FloatField()), + ( + "cluster", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="velocity_snapshots", + to="trends.topiccluster", + ), + ), + ( + "project", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="topic_velocity_snapshots", + to="projects.project", + ), + ), + ], + options={ + "ordering": ["-computed_at", "id"], + "db_table": "core_topicvelocitysnapshot", + "indexes": [ + models.Index( + fields=["project", "-computed_at"], + name="core_topicv_project_d00a67_idx", + ), + models.Index( + fields=["cluster", "-computed_at"], + name="core_topicv_cluster_257d0d_idx", + ), + ], + }, + ), + migrations.CreateModel( + name="ContentClusterMembership", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("similarity", models.FloatField()), + ("assigned_at", models.DateTimeField(auto_now_add=True)), + ( + "cluster", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="memberships", + to="trends.topiccluster", + ), + ), + ( + "content", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="cluster_memberships", + to="content.content", + ), + ), + ( + "project", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="content_cluster_memberships", + to="projects.project", + ), + ), + ], + options={ + "ordering": ["-assigned_at", "id"], + "db_table": "core_contentclustermembership", + "indexes": [ + models.Index( + fields=["cluster", "-assigned_at"], + name="core_conten_cluster_a1e0e7_idx", + ), + models.Index( + fields=["project", "content"], + name="core_conten_project_7bf892_idx", + ), + ], + "constraints": [ + models.UniqueConstraint( + fields=("content", "cluster"), + name="core_contentcluster_unique_content_cluster", + ), + ], + }, + ), + ] diff --git a/trends/models.py b/trends/models.py index 0c169049..aebc59ab 100644 --- a/trends/models.py +++ b/trends/models.py @@ -1,5 +1,7 @@ """Trends-domain models for project observability and discovery workflows.""" +import uuid + from django.db import models @@ -36,3 +38,108 @@ class Meta: def __str__(self) -> str: return f"Topic centroid snapshot for {self.project.name}" + + +class TopicCluster(models.Model): + """Represent one project-scoped topic cluster over recent active content.""" + + project = models.ForeignKey( + "projects.Project", + on_delete=models.CASCADE, + related_name="topic_clusters", + ) + centroid_vector_id = models.UUIDField(default=uuid.uuid4, unique=True) + label = models.CharField(max_length=255, blank=True) + first_seen_at = models.DateTimeField() + last_seen_at = models.DateTimeField() + is_active = models.BooleanField(default=True) + member_count = models.PositiveIntegerField(default=0) + dominant_entity = models.ForeignKey( + "entities.Entity", + null=True, + blank=True, + on_delete=models.SET_NULL, + related_name="dominant_topic_clusters", + ) + + class Meta: + ordering = ["-last_seen_at", "id"] + db_table = "core_topiccluster" + indexes = [ + models.Index(fields=["project", "-last_seen_at"]), + models.Index(fields=["project", "is_active", "-last_seen_at"]), + ] + + def __str__(self) -> str: + return self.label or f"Topic cluster {self.pk}" + + +class TopicVelocitySnapshot(models.Model): + """Capture one computed velocity reading for an active topic cluster.""" + + cluster = models.ForeignKey( + TopicCluster, + on_delete=models.CASCADE, + related_name="velocity_snapshots", + ) + project = models.ForeignKey( + "projects.Project", + on_delete=models.CASCADE, + related_name="topic_velocity_snapshots", + ) + computed_at = models.DateTimeField(auto_now_add=True) + window_count = models.PositiveIntegerField() + trailing_mean = models.FloatField() + trailing_stddev = models.FloatField() + z_score = models.FloatField() + velocity_score = models.FloatField() + + class Meta: + ordering = ["-computed_at", "id"] + db_table = "core_topicvelocitysnapshot" + indexes = [ + models.Index(fields=["project", "-computed_at"]), + models.Index(fields=["cluster", "-computed_at"]), + ] + + def __str__(self) -> str: + return f"Velocity snapshot for cluster {self.cluster_id}" + + +class ContentClusterMembership(models.Model): + """Record one content item's current assignment to a topic cluster.""" + + content = models.ForeignKey( + "content.Content", + on_delete=models.CASCADE, + related_name="cluster_memberships", + ) + cluster = models.ForeignKey( + TopicCluster, + on_delete=models.CASCADE, + related_name="memberships", + ) + project = models.ForeignKey( + "projects.Project", + on_delete=models.CASCADE, + related_name="content_cluster_memberships", + ) + similarity = models.FloatField() + assigned_at = models.DateTimeField(auto_now_add=True) + + class Meta: + ordering = ["-assigned_at", "id"] + db_table = "core_contentclustermembership" + constraints = [ + models.UniqueConstraint( + fields=["content", "cluster"], + name="core_contentcluster_unique_content_cluster", + ) + ] + indexes = [ + models.Index(fields=["cluster", "-assigned_at"]), + models.Index(fields=["project", "content"]), + ] + + def __str__(self) -> str: + return f"Content {self.content_id} in cluster {self.cluster_id}" diff --git a/trends/serializers.py b/trends/serializers.py index 4e441488..5469e775 100644 --- a/trends/serializers.py +++ b/trends/serializers.py @@ -2,7 +2,106 @@ from rest_framework import serializers -from trends.models import TopicCentroidSnapshot +from trends.models import ( + ContentClusterMembership, + TopicCentroidSnapshot, + TopicCluster, + TopicVelocitySnapshot, +) + + +class TopicClusterEntitySerializer(serializers.Serializer): + """Serialize the dominant entity summary for a topic cluster.""" + + id = serializers.IntegerField() + name = serializers.CharField() + type = serializers.CharField() + + +class TopicClusterContentSummarySerializer(serializers.Serializer): + """Serialize the content fields surfaced on cluster detail responses.""" + + id = serializers.IntegerField() + url = serializers.URLField() + title = serializers.CharField() + published_date = serializers.DateTimeField() + source_plugin = serializers.CharField() + + +class ContentClusterMembershipSerializer(serializers.ModelSerializer): + """Serialize one content membership within a topic cluster.""" + + content = TopicClusterContentSummarySerializer(read_only=True) + + class Meta: + model = ContentClusterMembership + fields = ["id", "content", "similarity", "assigned_at"] + read_only_fields = fields + + +class TopicVelocitySnapshotSerializer(serializers.ModelSerializer): + """Serialize one persisted topic velocity snapshot.""" + + class Meta: + model = TopicVelocitySnapshot + fields = [ + "id", + "cluster", + "project", + "computed_at", + "window_count", + "trailing_mean", + "trailing_stddev", + "z_score", + "velocity_score", + ] + read_only_fields = fields + + +class TopicClusterSerializer(serializers.ModelSerializer): + """Serialize one topic cluster with its current velocity rollup.""" + + dominant_entity = TopicClusterEntitySerializer(read_only=True) + velocity_score = serializers.FloatField(read_only=True, allow_null=True) + z_score = serializers.FloatField(read_only=True, allow_null=True) + window_count = serializers.IntegerField(read_only=True, allow_null=True) + velocity_computed_at = serializers.DateTimeField(read_only=True, allow_null=True) + + class Meta: + model = TopicCluster + fields = [ + "id", + "project", + "centroid_vector_id", + "label", + "first_seen_at", + "last_seen_at", + "is_active", + "member_count", + "dominant_entity", + "velocity_score", + "z_score", + "window_count", + "velocity_computed_at", + ] + read_only_fields = fields + + +class TopicClusterDetailSerializer(TopicClusterSerializer): + """Serialize one topic cluster with memberships and snapshot history.""" + + memberships = ContentClusterMembershipSerializer(many=True, read_only=True) + velocity_history = TopicVelocitySnapshotSerializer( + many=True, + read_only=True, + source="velocity_snapshots", + ) + + class Meta(TopicClusterSerializer.Meta): + fields = TopicClusterSerializer.Meta.fields + [ + "memberships", + "velocity_history", + ] class TopicCentroidSnapshotSerializer(serializers.ModelSerializer): diff --git a/trends/tasks.py b/trends/tasks.py index ae111569..3b4ad902 100644 --- a/trends/tasks.py +++ b/trends/tasks.py @@ -1,5 +1,6 @@ -"""Celery tasks and helpers for trends-domain centroid recomputation.""" +"""Celery tasks and helpers for trends-domain centroid and cluster recomputation.""" +from collections import Counter import math from datetime import datetime, timedelta from typing import Protocol, cast @@ -7,6 +8,7 @@ from celery import shared_task from django.conf import settings from django.core.cache import cache +from django.db import transaction from django.db.models import Model from django.utils import timezone @@ -16,16 +18,27 @@ embed_text, upsert_topic_centroid, ) -from core.models import FeedbackType, UserFeedback +from content.models import Content, FeedbackType, UserFeedback +from entities.models import Entity, EntityMention from projects.models import Project -from .models import TopicCentroidSnapshot +from .models import ( + ContentClusterMembership, + TopicCentroidSnapshot, + TopicCluster, + TopicVelocitySnapshot, +) TOPIC_CENTROID_LOOKBACK_DAYS = 90 TOPIC_CENTROID_MIN_UPVOTES = 10 TOPIC_CENTROID_DOWNVOTE_WEIGHT = 0.25 TOPIC_CENTROID_DEBOUNCE_SECONDS = 60 * 5 TOPIC_CENTROID_DECAY_TAU_DAYS = 45 +TOPIC_CLUSTER_LOOKBACK_DAYS = 14 +TOPIC_CLUSTER_SIMILARITY_THRESHOLD = 0.85 +TOPIC_CLUSTER_MIN_MEMBERS = 3 +TOPIC_VELOCITY_TRAILING_DAYS = 7 +TOPIC_VELOCITY_EMA_ALPHA = 0.5 class DelayedTask(Protocol): @@ -220,6 +233,216 @@ def queue_topic_centroid_recompute(project_id: int) -> bool: return True +@shared_task(name="core.tasks.run_all_topic_cluster_recomputations") +def run_all_topic_cluster_recomputations() -> int: + """Queue topic-cluster recomputation for every project.""" + + project_ids = list(Project.objects.values_list("id", flat=True)) + for project_id in project_ids: + if settings.CELERY_TASK_ALWAYS_EAGER: + recompute_topic_clusters(project_id) + else: + _enqueue_task(recompute_topic_clusters, project_id) + return len(project_ids) + + +@shared_task(name="core.tasks.recompute_topic_clusters") +def recompute_topic_clusters(project_id: int) -> dict[str, int]: + """Rebuild recent topic clusters for one project from active content.""" + + now = timezone.now() + window_start = now - timedelta(days=TOPIC_CLUSTER_LOOKBACK_DAYS) + recent_contents = list( + Content.objects.filter( + project_id=project_id, + is_active=True, + published_date__gte=window_start, + ) + .select_related("entity") + .only( + "id", + "project_id", + "title", + "content_text", + "published_date", + "entity_id", + ) + .order_by("published_date", "id") + ) + vector_cache: dict[int, list[float]] = {} + cluster_groups = _build_cluster_groups(recent_contents, vector_cache) + clusters_updated = _sync_topic_clusters(project_id, cluster_groups) + if settings.CELERY_TASK_ALWAYS_EAGER: + recompute_topic_velocity(project_id) + else: + _enqueue_task(recompute_topic_velocity, project_id) + return { + "project_id": project_id, + "contents_considered": len(recent_contents), + "clusters_updated": clusters_updated, + } + + +@shared_task(name="core.tasks.recompute_topic_velocity") +def recompute_topic_velocity(project_id: int) -> dict[str, int]: + """Persist one fresh velocity snapshot for each active project cluster.""" + + computed_at = timezone.now() + clusters = list( + TopicCluster.objects.filter( + project_id=project_id, + is_active=True, + member_count__gte=TOPIC_CLUSTER_MIN_MEMBERS, + ) + .prefetch_related("memberships__content", "velocity_snapshots") + .order_by("id") + ) + snapshot_count = 0 + for cluster in clusters: + memberships = list(cluster.memberships.all()) + published_dates = [ + membership.content.published_date + for membership in memberships + if membership.content.published_date is not None + ] + if len(published_dates) < TOPIC_CLUSTER_MIN_MEMBERS: + continue + window_count = _count_within_window( + published_dates, + start=computed_at - timedelta(days=1), + end=computed_at, + ) + trailing_counts = [ + _count_within_window( + published_dates, + start=computed_at - timedelta(days=offset + 1), + end=computed_at - timedelta(days=offset), + ) + for offset in range(1, TOPIC_VELOCITY_TRAILING_DAYS + 1) + ] + trailing_mean = sum(trailing_counts) / len(trailing_counts) + trailing_stddev = _population_stddev(trailing_counts, trailing_mean) + z_score = _capped_z_score(window_count, trailing_mean, trailing_stddev) + normalized_score = (z_score + 3.0) / 6.0 + previous_snapshot = cluster.velocity_snapshots.first() + velocity_score = _smooth_velocity_score( + normalized_score, + previous_snapshot.velocity_score if previous_snapshot is not None else None, + ) + _create_topic_velocity_snapshot( + cluster=cluster, + computed_at=computed_at, + window_count=window_count, + trailing_mean=trailing_mean, + trailing_stddev=trailing_stddev, + z_score=z_score, + velocity_score=velocity_score, + ) + snapshot_count += 1 + return { + "project_id": project_id, + "clusters_evaluated": len(clusters), + "snapshots_created": snapshot_count, + } + + +@shared_task(name="core.tasks.assign_content_to_topic_cluster") +def assign_content_to_topic_cluster(content_id: int) -> dict[str, object]: + """Assign one content item to the nearest active cluster when it fits.""" + + content = ( + Content.objects.filter(pk=content_id) + .select_related("entity") + .only( + "id", + "project_id", + "title", + "content_text", + "published_date", + "entity_id", + "is_active", + ) + .first() + ) + if content is None: + return {"content_id": content_id, "assigned": False, "reason": "missing"} + + memberships = ContentClusterMembership.objects.filter( + project_id=content.project_id, + content_id=content.id, + ) + if not content.is_active or content.published_date < timezone.now() - timedelta( + days=TOPIC_CLUSTER_LOOKBACK_DAYS + ): + removed_cluster_ids = list(memberships.values_list("cluster_id", flat=True)) + memberships.delete() + for cluster_id in removed_cluster_ids: + _refresh_cluster_rollup(cluster_id) + return {"content_id": content_id, "assigned": False, "reason": "outside_window"} + + active_clusters = list( + TopicCluster.objects.filter(project_id=content.project_id, is_active=True) + .prefetch_related("memberships__content") + .only( + "id", + "project_id", + "first_seen_at", + "last_seen_at", + "is_active", + "member_count", + "dominant_entity_id", + ) + ) + if not active_clusters: + return {"content_id": content_id, "assigned": False, "reason": "no_clusters"} + + vector_cache: dict[int, list[float]] = {} + content_vector = _content_vector(content, vector_cache) + best_cluster: TopicCluster | None = None + best_similarity = -1.0 + for cluster in active_clusters: + member_contents = [ + membership.content + for membership in cluster.memberships.all() + if membership.content_id != content.id + ] + centroid_vector = _cluster_centroid_for_contents(member_contents, vector_cache) + if not centroid_vector: + continue + similarity = _cosine_similarity(content_vector, centroid_vector) + if similarity > best_similarity: + best_similarity = similarity + best_cluster = cluster + + removed_cluster_ids = list(memberships.values_list("cluster_id", flat=True)) + memberships.delete() + if best_cluster is None or best_similarity < TOPIC_CLUSTER_SIMILARITY_THRESHOLD: + for cluster_id in removed_cluster_ids: + _refresh_cluster_rollup(cluster_id) + return { + "content_id": content_id, + "assigned": False, + "similarity": best_similarity, + } + + ContentClusterMembership.objects.create( + content=content, + cluster=best_cluster, + project_id=content.project_id, + similarity=best_similarity, + ) + cluster_ids_to_refresh = set(removed_cluster_ids) + cluster_ids_to_refresh.add(best_cluster.id) + for cluster_id in cluster_ids_to_refresh: + _refresh_cluster_rollup(cluster_id) + return { + "content_id": content_id, + "assigned": True, + "cluster_id": best_cluster.id, + "similarity": best_similarity, + } + + def _feedback_decay_weight(created_at: datetime, now: datetime) -> float: """Return the EMA-style decay weight for one feedback event.""" @@ -331,3 +554,345 @@ def _topic_centroid_debounce_key(project_id: int) -> str: """Return the cache key used to debounce centroid recomputations.""" return f"topic-centroid-recompute:{project_id}" + + +def _build_cluster_groups( + contents: list[Content], + vector_cache: dict[int, list[float]], +) -> list[dict[str, object]]: + """Group recent contents into provisional clusters by centroid similarity.""" + + groups: list[dict[str, object]] = [] + for content in contents: + content_vector = _content_vector(content, vector_cache) + best_group: dict[str, object] | None = None + best_similarity = -1.0 + for group in groups: + similarity = _cosine_similarity( + content_vector, + cast(list[float], group["centroid_vector"]), + ) + if similarity > best_similarity: + best_similarity = similarity + best_group = group + if best_group is None or best_similarity < TOPIC_CLUSTER_SIMILARITY_THRESHOLD: + groups.append( + { + "contents": [content], + "centroid_vector": content_vector, + } + ) + continue + cast(list[Content], best_group["contents"]).append(content) + best_group["centroid_vector"] = _cluster_centroid_for_contents( + cast(list[Content], best_group["contents"]), + vector_cache, + ) + + return [ + group + for group in groups + if len(cast(list[Content], group["contents"])) >= TOPIC_CLUSTER_MIN_MEMBERS + ] + + +def _sync_topic_clusters( + project_id: int, + cluster_groups: list[dict[str, object]], +) -> int: + """Persist the current cluster rebuild and retire stale active clusters.""" + + existing_clusters = list( + TopicCluster.objects.filter(project_id=project_id) + .prefetch_related("memberships") + .order_by("id") + ) + existing_memberships = { + cluster.id: set(cluster.memberships.values_list("content_id", flat=True)) + for cluster in existing_clusters + } + matched_cluster_ids: set[int] = set() + clusters_updated = 0 + + with transaction.atomic(): + for group in cluster_groups: + contents = cast(list[Content], group["contents"]) + member_ids = {content.id for content in contents} + cluster = _match_existing_cluster( + existing_clusters, + existing_memberships, + member_ids, + matched_cluster_ids, + ) + if cluster is None: + cluster = TopicCluster.objects.create( + project_id=project_id, + first_seen_at=min(content.published_date for content in contents), + last_seen_at=max(content.published_date for content in contents), + is_active=True, + member_count=len(contents), + dominant_entity=_resolve_dominant_entity(contents), + ) + existing_clusters.append(cluster) + else: + matched_cluster_ids.add(cluster.id) + cluster.first_seen_at = min( + cluster.first_seen_at, + min(content.published_date for content in contents), + ) + cluster.last_seen_at = max( + content.published_date for content in contents + ) + cluster.is_active = True + cluster.member_count = len(contents) + cluster.dominant_entity = _resolve_dominant_entity(contents) + cluster.save( + update_fields=[ + "first_seen_at", + "last_seen_at", + "is_active", + "member_count", + "dominant_entity", + ] + ) + + matched_cluster_ids.add(cluster.id) + ContentClusterMembership.objects.filter(cluster=cluster).delete() + centroid_vector = _cluster_centroid_for_contents(contents, {}) + ContentClusterMembership.objects.bulk_create( + [ + ContentClusterMembership( + content=content, + cluster=cluster, + project_id=project_id, + similarity=_cosine_similarity( + _content_vector(content, {}), + centroid_vector, + ), + ) + for content in contents + ] + ) + clusters_updated += 1 + + for cluster in existing_clusters: + if cluster.id in matched_cluster_ids: + continue + ContentClusterMembership.objects.filter(cluster=cluster).delete() + if cluster.is_active or cluster.member_count != 0: + cluster.is_active = False + cluster.member_count = 0 + cluster.dominant_entity = None + cluster.save( + update_fields=["is_active", "member_count", "dominant_entity"] + ) + + return clusters_updated + + +def _match_existing_cluster( + existing_clusters: list[TopicCluster], + existing_memberships: dict[int, set[int]], + member_ids: set[int], + matched_cluster_ids: set[int], +) -> TopicCluster | None: + """Reuse the existing cluster with the strongest member overlap.""" + + best_cluster: TopicCluster | None = None + best_overlap_count = 0 + best_overlap_ratio = 0.0 + for cluster in existing_clusters: + if cluster.id in matched_cluster_ids: + continue + prior_member_ids = existing_memberships.get(cluster.id, set()) + overlap_count = len(prior_member_ids & member_ids) + if overlap_count <= 0: + continue + overlap_ratio = overlap_count / len(prior_member_ids | member_ids) + if overlap_count > best_overlap_count or ( + overlap_count == best_overlap_count and overlap_ratio > best_overlap_ratio + ): + best_cluster = cluster + best_overlap_count = overlap_count + best_overlap_ratio = overlap_ratio + return best_cluster + + +def _refresh_cluster_rollup(cluster_id: int) -> None: + """Refresh member counts and dominant entity after one incremental update.""" + + cluster = TopicCluster.objects.filter(pk=cluster_id).first() + if cluster is None: + return + memberships = list( + ContentClusterMembership.objects.filter(cluster_id=cluster_id) + .select_related("content", "content__entity") + .order_by("content__published_date", "content_id") + ) + if not memberships: + cluster.is_active = False + cluster.member_count = 0 + cluster.dominant_entity = None + cluster.save(update_fields=["is_active", "member_count", "dominant_entity"]) + return + + contents = [membership.content for membership in memberships] + cluster.is_active = True + cluster.member_count = len(contents) + cluster.last_seen_at = max(content.published_date for content in contents) + cluster.first_seen_at = min( + cluster.first_seen_at, min(content.published_date for content in contents) + ) + cluster.dominant_entity = _resolve_dominant_entity(contents) + cluster.save( + update_fields=[ + "is_active", + "member_count", + "last_seen_at", + "first_seen_at", + "dominant_entity", + ] + ) + + +def _resolve_dominant_entity(contents: list[Content]) -> Entity | None: + """Return the most frequently referenced entity across clustered content.""" + + if not contents: + return None + content_ids = [content.id for content in contents] + entity_counts: Counter[int] = Counter( + content.entity_id for content in contents if content.entity_id is not None + ) + entity_counts.update( + entity_id + for entity_id in EntityMention.objects.filter( + content_id__in=content_ids + ).values_list("entity_id", flat=True) + ) + if not entity_counts: + return None + dominant_entity_id, _ = max( + entity_counts.items(), key=lambda item: (item[1], -item[0]) + ) + return Entity.objects.filter(pk=dominant_entity_id).first() + + +def _content_vector( + content: Content, + vector_cache: dict[int, list[float]], +) -> list[float]: + """Return one content embedding, caching repeated lookups within a task.""" + + vector = vector_cache.get(content.id) + if vector is not None: + return vector + vector = embed_text(build_content_embedding_text(content)) + vector_cache[content.id] = vector + return vector + + +def _cluster_centroid_for_contents( + contents: list[Content], + vector_cache: dict[int, list[float]], +) -> list[float]: + """Average and normalize the content vectors for one cluster.""" + + vectors = [_content_vector(content, vector_cache) for content in contents] + if not vectors: + return [] + dimension = len(vectors[0]) + mean_vector = [0.0] * dimension + for vector in vectors: + for index, value in enumerate(vector): + mean_vector[index] += float(value) + return _normalize_vector([value / len(vectors) for value in mean_vector]) + + +def _count_within_window( + published_dates: list[datetime], + *, + start: datetime, + end: datetime, +) -> int: + """Count published dates inside one half-open time window.""" + + return sum(1 for published_at in published_dates if start <= published_at < end) + + +def _population_stddev(values: list[int], mean: float) -> float: + """Return the population standard deviation for integer daily counts.""" + + if not values: + return 0.0 + variance = sum((value - mean) ** 2 for value in values) / len(values) + return math.sqrt(variance) + + +def _capped_z_score( + window_count: int, trailing_mean: float, trailing_stddev: float +) -> float: + """Return the capped z-score for one cluster's latest daily activity.""" + + if trailing_stddev <= 0: + if window_count > trailing_mean: + return 3.0 + if window_count < trailing_mean: + return -3.0 + return 0.0 + raw_z_score = (window_count - trailing_mean) / trailing_stddev + return max(-3.0, min(3.0, raw_z_score)) + + +def _smooth_velocity_score(raw_score: float, previous_score: float | None) -> float: + """Apply a short EMA so single-day spikes do not dominate ranking.""" + + if previous_score is None: + return raw_score + return (TOPIC_VELOCITY_EMA_ALPHA * raw_score) + ( + (1.0 - TOPIC_VELOCITY_EMA_ALPHA) * previous_score + ) + + +def _create_topic_velocity_snapshot( + *, + cluster: TopicCluster, + computed_at: datetime, + window_count: int, + trailing_mean: float, + trailing_stddev: float, + z_score: float, + velocity_score: float, +) -> TopicVelocitySnapshot: + """Persist one velocity snapshot while preserving a controlled timestamp.""" + + snapshot = TopicVelocitySnapshot.objects.create( + cluster=cluster, + project_id=cluster.project_id, + window_count=window_count, + trailing_mean=trailing_mean, + trailing_stddev=trailing_stddev, + z_score=z_score, + velocity_score=velocity_score, + ) + if snapshot.computed_at != computed_at: + TopicVelocitySnapshot.objects.filter(pk=snapshot.pk).update( + computed_at=computed_at + ) + snapshot.computed_at = computed_at + return snapshot + + +def _cosine_similarity(left: list[float], right: list[float]) -> float: + """Return cosine similarity between two normalized-compatible vectors.""" + + if not left or not right or len(left) != len(right): + return -1.0 + left_norm = math.sqrt(sum(value * value for value in left)) + right_norm = math.sqrt(sum(value * value for value in right)) + if left_norm <= 0 or right_norm <= 0: + return -1.0 + cosine_similarity = sum( + left_value * right_value for left_value, right_value in zip(left, right) + ) / (left_norm * right_norm) + return max(-1.0, min(1.0, cosine_similarity))