diff --git a/core/tests/test_permissions.py b/core/tests/test_permissions.py index 2b9b8fd7..3b027163 100644 --- a/core/tests/test_permissions.py +++ b/core/tests/test_permissions.py @@ -263,88 +263,95 @@ def test_reader_is_denied_contributor_and_admin_endpoints(self): self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) def test_member_can_use_contributor_endpoints_but_not_admin_only_ones(self): - _typed_client(self.client).force_authenticate(self.member_user) + with patch("entities.extraction.queue_entity_identity_enrichment"): + _typed_client(self.client).force_authenticate(self.member_user) - review_queue_response = self.client.get( - reverse( - "v1:project-review-queue-list", - kwargs={"project_id": _require_pk(self.project)}, + review_queue_response = self.client.get( + reverse( + "v1:project-review-queue-list", + kwargs={"project_id": _require_pk(self.project)}, + ) ) - ) - self.assertEqual(review_queue_response.status_code, status.HTTP_200_OK) + self.assertEqual(review_queue_response.status_code, status.HTTP_200_OK) - source_config_response = self.client.post( - reverse( - "v1:project-source-config-list", - kwargs={"project_id": _require_pk(self.project)}, - ), - { - "plugin_name": SourcePluginName.RSS, - "config": {"feed_url": "https://example.com/feed.xml"}, - "is_active": True, - }, - format="json", - ) - self.assertEqual(source_config_response.status_code, status.HTTP_201_CREATED) + source_config_response = self.client.post( + reverse( + "v1:project-source-config-list", + kwargs={"project_id": _require_pk(self.project)}, + ), + { + "plugin_name": SourcePluginName.RSS, + "config": {"feed_url": "https://example.com/feed.xml"}, + "is_active": True, + }, + format="json", + ) + self.assertEqual( + source_config_response.status_code, status.HTTP_201_CREATED + ) - topic_summary_response = self.client.get( - reverse( - "v1:project-topic-centroid-snapshot-summary", - kwargs={"project_id": _require_pk(self.project)}, + topic_summary_response = self.client.get( + reverse( + "v1:project-topic-centroid-snapshot-summary", + kwargs={"project_id": _require_pk(self.project)}, + ) ) - ) - self.assertEqual(topic_summary_response.status_code, status.HTTP_200_OK) + self.assertEqual(topic_summary_response.status_code, status.HTTP_200_OK) - accept_candidate_response = self.client.post( - reverse( - "v1:project-entity-candidate-accept", - kwargs={ - "project_id": _require_pk(self.project), - "pk": _require_pk(self.entity_candidate), - }, - ), - format="json", - ) - self.assertEqual(accept_candidate_response.status_code, status.HTTP_200_OK) + accept_candidate_response = self.client.post( + reverse( + "v1:project-entity-candidate-accept", + kwargs={ + "project_id": _require_pk(self.project), + "pk": _require_pk(self.entity_candidate), + }, + ), + format="json", + ) + self.assertEqual(accept_candidate_response.status_code, status.HTTP_200_OK) - delete_own_feedback_response = self.client.delete( - reverse( - "v1:project-feedback-detail", - kwargs={ - "project_id": _require_pk(self.project), - "pk": _require_pk(self.member_feedback), - }, + delete_own_feedback_response = self.client.delete( + reverse( + "v1:project-feedback-detail", + kwargs={ + "project_id": _require_pk(self.project), + "pk": _require_pk(self.member_feedback), + }, + ) + ) + self.assertEqual( + delete_own_feedback_response.status_code, status.HTTP_204_NO_CONTENT ) - ) - self.assertEqual( - delete_own_feedback_response.status_code, status.HTTP_204_NO_CONTENT - ) - update_project_response = self.client.patch( - reverse("v1:project-detail", kwargs={"id": _require_pk(self.project)}), - {"name": "Member Update"}, - format="json", - ) - self.assertEqual(update_project_response.status_code, status.HTTP_403_FORBIDDEN) + update_project_response = self.client.patch( + reverse("v1:project-detail", kwargs={"id": _require_pk(self.project)}), + {"name": "Member Update"}, + format="json", + ) + self.assertEqual( + update_project_response.status_code, status.HTTP_403_FORBIDDEN + ) - list_credentials_response = self.client.get( - reverse( - "v1:project-bluesky-credentials-list", - kwargs={"project_id": _require_pk(self.project)}, + list_credentials_response = self.client.get( + reverse( + "v1:project-bluesky-credentials-list", + kwargs={"project_id": _require_pk(self.project)}, + ) + ) + self.assertEqual( + list_credentials_response.status_code, status.HTTP_403_FORBIDDEN ) - ) - self.assertEqual( - list_credentials_response.status_code, status.HTTP_403_FORBIDDEN - ) - rotate_token_response = self.client.post( - reverse( - "v1:project-rotate-intake-token", - kwargs={"id": _require_pk(self.project)}, - ), - format="json", - ) - self.assertEqual(rotate_token_response.status_code, status.HTTP_403_FORBIDDEN) + rotate_token_response = self.client.post( + reverse( + "v1:project-rotate-intake-token", + kwargs={"id": _require_pk(self.project)}, + ), + format="json", + ) + self.assertEqual( + rotate_token_response.status_code, status.HTTP_403_FORBIDDEN + ) def test_member_cannot_delete_other_users_feedback(self): _typed_client(self.client).force_authenticate(self.member_user) diff --git a/entities/admin.py b/entities/admin.py index a461bcfb..7c76629c 100644 --- a/entities/admin.py +++ b/entities/admin.py @@ -174,7 +174,9 @@ class EntityCandidateAdmin(admin.ModelAdmin): "project", "suggested_type", "occurrence_count", + "cluster_key", "status", + "auto_promotion_blocked_reason", "merged_into", "first_seen_in", "created_at", @@ -196,7 +198,7 @@ def accept_selected_candidates(self, request, queryset): for candidate in queryset.select_related("project"): if candidate.status == EntityCandidateStatus.ACCEPTED: continue - accept_entity_candidate(candidate) + accept_entity_candidate(candidate, schedule_enrichment=True) accepted_count += 1 self.message_user( request, @@ -238,7 +240,11 @@ def merge_into_existing_entities( if matching_entities.count() != 1: unresolved_names.append(candidate.name) continue - merge_entity_candidate(candidate, matching_entities.get()) + merge_entity_candidate( + candidate, + matching_entities.get(), + schedule_enrichment=True, + ) merged_count += 1 if merged_count: diff --git a/entities/api.py b/entities/api.py index 34202bdf..ebc4c949 100644 --- a/entities/api.py +++ b/entities/api.py @@ -13,17 +13,17 @@ build_crud_action_overrides, document_project_owned_viewset, ) -from entities.extraction import ( - accept_entity_candidate, - merge_entity_candidate, - reject_entity_candidate, -) from core.permissions import ( IsProjectAdmin, IsProjectContributor, IsProjectMember, IsProjectMemberWritable, ) +from entities.extraction import ( + accept_entity_candidate, + merge_entity_candidate, + reject_entity_candidate, +) from entities.models import Entity, EntityCandidate, EntityMention from entities.serializers import ( EntityAuthoritySnapshotSerializer, @@ -55,6 +55,7 @@ class EntityViewSet(ProjectOwnedQuerysetMixin, viewsets.ModelViewSet): queryset = ( Entity.objects.select_related("project") .annotate(mention_count=Count("mentions", distinct=True)) + .prefetch_related("identity_claims") .prefetch_related( Prefetch( "mentions", @@ -180,7 +181,7 @@ def accept(self, request, *args, **kwargs): """Accept an entity candidate and return its updated representation.""" candidate = self.get_object() - accept_entity_candidate(candidate) + accept_entity_candidate(candidate, schedule_enrichment=True) candidate.refresh_from_db() serializer = self.get_serializer(candidate) return Response(serializer.data) @@ -226,7 +227,11 @@ def merge(self, request, *args, **kwargs): context=self.get_serializer_context(), ) serializer.is_valid(raise_exception=True) - merge_entity_candidate(candidate, serializer.validated_data["merged_into"]) + merge_entity_candidate( + candidate, + serializer.validated_data["merged_into"], + schedule_enrichment=True, + ) candidate.refresh_from_db() response_serializer = self.get_serializer(candidate) return Response(response_serializer.data) diff --git a/entities/extraction.py b/entities/extraction.py index 486839fe..d1e98273 100644 --- a/entities/extraction.py +++ b/entities/extraction.py @@ -18,13 +18,22 @@ from entities.models import ( Entity, EntityCandidate, + EntityCandidateEvidence, EntityCandidateStatus, + EntityIdentityClaim, EntityMention, EntityMentionRole, EntityMentionSentiment, EntityType, + IdentitySurface, ) from pipeline.models import SkillResult, SkillStatus +from projects.model_support import ( + SourcePluginName, + normalize_bluesky_handle, + normalize_linkedin_url, + normalize_mastodon_handle, +) ENTITY_EXTRACTION_SKILL_NAME = "entity_extraction" ENTITY_RETRIEVAL_LIMIT = 8 @@ -229,6 +238,11 @@ def persist_entity_candidates( "status": EntityCandidateStatus.PENDING, }, ) + evidence_created = _record_candidate_evidence( + candidate, + content, + candidate_name=name, + ) if not created: update_fields: list[str] = [] suggested_type = candidate_payload.get( @@ -241,7 +255,7 @@ def persist_entity_candidates( if candidate.first_seen_in is None: candidate.first_seen_in = content update_fields.append("first_seen_in") - if not is_rerun: + if evidence_created and not is_rerun: candidate.occurrence_count += 1 update_fields.append("occurrence_count") if update_fields: @@ -251,7 +265,11 @@ def persist_entity_candidates( @transaction.atomic -def accept_entity_candidate(candidate: EntityCandidate) -> Entity: +def accept_entity_candidate( + candidate: EntityCandidate, + *, + schedule_enrichment: bool = False, +) -> Entity: """Accept a candidate, create the tracked entity, and backfill recent mentions.""" entity, _ = Entity.objects.get_or_create( @@ -263,19 +281,46 @@ def accept_entity_candidate(candidate: EntityCandidate) -> Entity: ) candidate.status = EntityCandidateStatus.ACCEPTED candidate.merged_into = entity - candidate.save(update_fields=["status", "merged_into", "updated_at"]) + candidate.auto_promotion_blocked_reason = "" + candidate.save( + update_fields=[ + "status", + "merged_into", + "auto_promotion_blocked_reason", + "updated_at", + ] + ) backfill_entity_mentions(entity, candidate_name=candidate.name) + _sync_identity_claims_from_candidate(entity, candidate) + if schedule_enrichment: + queue_entity_identity_enrichment(_require_pk(entity)) return entity @transaction.atomic -def merge_entity_candidate(candidate: EntityCandidate, entity: Entity) -> Entity: +def merge_entity_candidate( + candidate: EntityCandidate, + entity: Entity, + *, + schedule_enrichment: bool = False, +) -> Entity: """Merge a candidate into an existing tracked entity and backfill mentions.""" candidate.status = EntityCandidateStatus.MERGED candidate.merged_into = entity - candidate.save(update_fields=["status", "merged_into", "updated_at"]) + candidate.auto_promotion_blocked_reason = "" + candidate.save( + update_fields=[ + "status", + "merged_into", + "auto_promotion_blocked_reason", + "updated_at", + ] + ) backfill_entity_mentions(entity, candidate_name=candidate.name) + _sync_identity_claims_from_candidate(entity, candidate) + if schedule_enrichment: + queue_entity_identity_enrichment(_require_pk(entity)) return entity @@ -283,7 +328,21 @@ def reject_entity_candidate(candidate: EntityCandidate) -> None: """Reject an extracted candidate without creating a tracked entity.""" candidate.status = EntityCandidateStatus.REJECTED - candidate.save(update_fields=["status", "updated_at"]) + candidate.auto_promotion_blocked_reason = "" + candidate.save( + update_fields=["status", "auto_promotion_blocked_reason", "updated_at"] + ) + + +def queue_entity_identity_enrichment(entity_id: int) -> None: + """Queue the asynchronous identity-enrichment pass for a promoted entity.""" + + from entities.tasks import enrich_entity_identity + + if settings.CELERY_TASK_ALWAYS_EAGER: + enrich_entity_identity(entity_id) + return + enrich_entity_identity.delay(entity_id) def backfill_entity_mentions( @@ -706,11 +765,142 @@ def _serialize_candidate(candidate: EntityCandidate) -> dict[str, Any]: "id": _require_pk(candidate), "name": candidate.name, "suggested_type": candidate.suggested_type, + "cluster_key": candidate.cluster_key, "occurrence_count": candidate.occurrence_count, "status": candidate.status, } +def _record_candidate_evidence( + candidate: EntityCandidate, + content: Content, + *, + candidate_name: str, +) -> bool: + """Store one content-backed evidence row for a discovered candidate.""" + + identity_surface, claim_url = _candidate_identity_hint(content, candidate_name) + _evidence, created = EntityCandidateEvidence.objects.update_or_create( + candidate=candidate, + content=content, + defaults={ + "project": content.project, + "source_plugin": content.source_plugin, + "context_excerpt": _candidate_context_excerpt(content, candidate_name), + "identity_surface": identity_surface, + "claim_url": claim_url, + }, + ) + return created + + +def _candidate_context_excerpt(content: Content, candidate_name: str) -> str: + """Capture a short context window around the candidate occurrence.""" + + combined_text = "\n".join( + part + for part in [ + content.author or "", + content.title or "", + content.content_text or "", + ] + if part + ) + if not combined_text: + return "" + match = re.search(re.escape(candidate_name), combined_text, re.IGNORECASE) + if match is None: + return combined_text[:240].strip() + start = max(0, match.start() - 90) + end = min(len(combined_text), match.end() + 150) + return combined_text[start:end].strip() + + +def _candidate_identity_hint(content: Content, candidate_name: str) -> tuple[str, str]: + """Extract one verified identity hint from plugin metadata when available.""" + + metadata = content.source_metadata or {} + if not _candidate_matches_author(content.author, candidate_name): + return "", "" + + if content.source_plugin == SourcePluginName.BLUESKY: + author_handle = normalize_bluesky_handle(str(metadata.get("author_handle", ""))) + if author_handle: + return IdentitySurface.BLUESKY, f"https://bsky.app/profile/{author_handle}" + + if content.source_plugin == SourcePluginName.LINKEDIN: + author_profile_url = normalize_linkedin_url( + str(metadata.get("author_profile_url", "")) + ) + if author_profile_url: + return IdentitySurface.LINKEDIN, author_profile_url + + if content.source_plugin == SourcePluginName.MASTODON: + author_acct = normalize_mastodon_handle( + str(metadata.get("author_acct", "")), + instance_url=str(metadata.get("instance_url", "")), + ) + if author_acct and "@" in author_acct: + username, host = author_acct.split("@", 1) + return IdentitySurface.MASTODON, f"https://{host}/@{username}" + + website_url = _candidate_website_hint(content) + if website_url: + return IdentitySurface.WEBSITE, website_url + return "", "" + + +def _candidate_matches_author(author: str, candidate_name: str) -> bool: + """Return whether the candidate name appears to be the content author.""" + + normalized_author = _normalize_name(author or "") + normalized_candidate = _normalize_name(candidate_name) + if not normalized_author or not normalized_candidate: + return False + return ( + normalized_author == normalized_candidate + or normalized_candidate in normalized_author + or normalized_author in normalized_candidate + ) + + +def _candidate_website_hint(content: Content) -> str: + """Return a website claim when the content URL appears to be canonical.""" + + candidate_url = (content.canonical_url or content.url or "").strip() + if not candidate_url: + return "" + parsed_url = urlsplit(candidate_url) + if not parsed_url.scheme or not parsed_url.netloc: + return "" + return f"{parsed_url.scheme.lower()}://{parsed_url.netloc.lower()}" + + +def _sync_identity_claims_from_candidate( + entity: Entity, candidate: EntityCandidate +) -> None: + """Copy verified candidate identity hints onto the promoted entity.""" + + claim_rows = candidate.evidence.exclude(identity_surface="").exclude(claim_url="") + for evidence in claim_rows: + defaults = { + "verified": True, + "verified_at": timezone.now(), + "verification_method": "candidate_evidence", + } + claim, created = EntityIdentityClaim.objects.get_or_create( + entity=entity, + surface=evidence.identity_surface, + claim_url=evidence.claim_url, + defaults=defaults, + ) + if not created and not claim.verified: + claim.verified = True + claim.verified_at = timezone.now() + claim.verification_method = "candidate_evidence" + claim.save(update_fields=["verified", "verified_at", "verification_method"]) + + def _normalize_role(value: Any) -> str: """Normalize free-form role strings into the supported enum values.""" diff --git a/entities/migrations/0003_entitycandidate_auto_promotion_blocked_reason_and_more.py b/entities/migrations/0003_entitycandidate_auto_promotion_blocked_reason_and_more.py new file mode 100644 index 00000000..23709aee --- /dev/null +++ b/entities/migrations/0003_entitycandidate_auto_promotion_blocked_reason_and_more.py @@ -0,0 +1,156 @@ +# Generated by Django 6.0.4 on 2026-05-02 15:32 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("content", "0002_content_newsletter_promotion"), + ("entities", "0002_alter_entitycandidate_first_seen_in_and_more"), + ("projects", "0007_alter_sourceconfig_plugin_name"), + ] + + operations = [ + migrations.AddField( + model_name="entitycandidate", + name="auto_promotion_blocked_reason", + field=models.CharField(blank=True, max_length=128), + ), + migrations.AddField( + model_name="entitycandidate", + name="cluster_key", + field=models.CharField(blank=True, db_index=True, max_length=64), + ), + migrations.AddField( + model_name="entitycandidate", + name="contextual_embedding_id", + field=models.UUIDField(blank=True, null=True), + ), + migrations.CreateModel( + name="EntityCandidateEvidence", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("source_plugin", models.CharField(max_length=64)), + ("context_excerpt", models.TextField(blank=True)), + ( + "identity_surface", + models.CharField( + blank=True, + choices=[ + ("github", "GitHub"), + ("linkedin", "LinkedIn"), + ("bluesky", "Bluesky"), + ("mastodon", "Mastodon"), + ("website", "Website"), + ], + max_length=32, + ), + ), + ("claim_url", models.URLField(blank=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "candidate", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="evidence", + to="entities.entitycandidate", + ), + ), + ( + "content", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="entity_candidate_evidence", + to="content.content", + ), + ), + ( + "project", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="entity_candidate_evidence", + to="projects.project", + ), + ), + ], + options={ + "db_table": "core_entitycandidateevidence", + "ordering": ["-created_at"], + "indexes": [ + models.Index( + fields=["candidate", "source_plugin"], + name="core_entitycand_candidate_source_idx", + ), + models.Index( + fields=["project", "created_at"], + name="core_entitycand_project_created_idx", + ), + ], + "constraints": [ + models.UniqueConstraint( + fields=("candidate", "content"), + name="core_entitycandidateevidence_unique_candidate_content", + ) + ], + }, + ), + migrations.CreateModel( + name="EntityIdentityClaim", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "surface", + models.CharField( + choices=[ + ("github", "GitHub"), + ("linkedin", "LinkedIn"), + ("bluesky", "Bluesky"), + ("mastodon", "Mastodon"), + ("website", "Website"), + ], + max_length=32, + ), + ), + ("claim_url", models.URLField()), + ("verified", models.BooleanField(default=False)), + ("verified_at", models.DateTimeField(blank=True, null=True)), + ("verification_method", models.CharField(blank=True, max_length=64)), + ( + "entity", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="identity_claims", + to="entities.entity", + ), + ), + ], + options={ + "db_table": "core_entityidentityclaim", + "ordering": ["surface", "claim_url"], + "constraints": [ + models.UniqueConstraint( + fields=("entity", "surface", "claim_url"), + name="core_entityidentityclaim_unique_entity_surface_url", + ) + ], + }, + ), + ] diff --git a/entities/models.py b/entities/models.py index 479c5781..7f2abaae 100644 --- a/entities/models.py +++ b/entities/models.py @@ -39,6 +39,16 @@ class EntityCandidateStatus(models.TextChoices): MERGED = "merged", "Merged" +class IdentitySurface(models.TextChoices): + """Supported identity surfaces that can back an entity claim.""" + + GITHUB = "github", "GitHub" + LINKEDIN = "linkedin", "LinkedIn" + BLUESKY = "bluesky", "Bluesky" + MASTODON = "mastodon", "Mastodon" + WEBSITE = "website", "Website" + + class Entity(models.Model): """Represents a person, vendor, or organization tracked inside a project.""" @@ -70,6 +80,34 @@ def __str__(self) -> str: return self.name +class EntityIdentityClaim(models.Model): + """Stores one resolved external identity claim for a tracked entity.""" + + entity = models.ForeignKey( + Entity, + on_delete=models.CASCADE, + related_name="identity_claims", + ) + surface = models.CharField(max_length=32, choices=IdentitySurface.choices) + claim_url = models.URLField() + verified = models.BooleanField(default=False) + verified_at = models.DateTimeField(null=True, blank=True) + verification_method = models.CharField(max_length=64, blank=True) + + class Meta: + ordering = ["surface", "claim_url"] + db_table = "core_entityidentityclaim" + constraints = [ + models.UniqueConstraint( + fields=["entity", "surface", "claim_url"], + name="core_entityidentityclaim_unique_entity_surface_url", + ) + ] + + def __str__(self) -> str: + return f"{self.entity.name} on {self.surface}" + + class EntityAuthoritySnapshot(models.Model): """Captures one authority-score recomputation for a tracked entity.""" @@ -169,6 +207,9 @@ class EntityCandidate(models.Model): related_name="entity_candidates", ) occurrence_count = models.IntegerField(default=1) + cluster_key = models.CharField(max_length=64, blank=True, db_index=True) + auto_promotion_blocked_reason = models.CharField(max_length=128, blank=True) + contextual_embedding_id = models.UUIDField(null=True, blank=True) status = models.CharField( max_length=16, choices=EntityCandidateStatus.choices, @@ -202,3 +243,55 @@ class Meta: def __str__(self) -> str: return self.name + + +class EntityCandidateEvidence(models.Model): + """Captures one content-backed occurrence and identity hint for a candidate.""" + + candidate = models.ForeignKey( + EntityCandidate, + on_delete=models.CASCADE, + related_name="evidence", + ) + project = models.ForeignKey( + "projects.Project", + on_delete=models.CASCADE, + related_name="entity_candidate_evidence", + ) + content = models.ForeignKey( + "content.Content", + on_delete=models.CASCADE, + related_name="entity_candidate_evidence", + ) + source_plugin = models.CharField(max_length=64) + context_excerpt = models.TextField(blank=True) + identity_surface = models.CharField( + max_length=32, + choices=IdentitySurface.choices, + blank=True, + ) + claim_url = models.URLField(blank=True) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + ordering = ["-created_at"] + db_table = "core_entitycandidateevidence" + constraints = [ + models.UniqueConstraint( + fields=["candidate", "content"], + name="core_entitycandidateevidence_unique_candidate_content", + ) + ] + indexes = [ + models.Index( + fields=["candidate", "source_plugin"], + name="core_entitycand_candidate_source_idx", + ), + models.Index( + fields=["project", "created_at"], + name="core_entitycand_project_created_idx", + ), + ] + + def __str__(self) -> str: + return f"{self.candidate.name} in {self.content.title}" diff --git a/entities/serializers.py b/entities/serializers.py index 33e6bf03..1f9f0a17 100644 --- a/entities/serializers.py +++ b/entities/serializers.py @@ -7,13 +7,31 @@ Entity, EntityAuthoritySnapshot, EntityCandidate, + EntityIdentityClaim, EntityMention, ) +class EntityIdentityClaimSerializer(serializers.ModelSerializer): + """Serialize one verified external identity claim for a tracked entity.""" + + class Meta: + model = EntityIdentityClaim + fields = [ + "id", + "surface", + "claim_url", + "verified", + "verified_at", + "verification_method", + ] + read_only_fields = fields + + class EntitySerializer(ProjectScopedSerializerMixin, serializers.ModelSerializer): """Serialize tracked entities for a project.""" + identity_claims = EntityIdentityClaimSerializer(many=True, read_only=True) mention_count = serializers.IntegerField(read_only=True) latest_mentions = serializers.SerializerMethodField() @@ -32,6 +50,7 @@ class Meta: "bluesky_handle", "mastodon_handle", "twitter_handle", + "identity_claims", "mention_count", "latest_mentions", "created_at", @@ -107,6 +126,8 @@ class Meta: "first_seen_in", "first_seen_title", "occurrence_count", + "cluster_key", + "auto_promotion_blocked_reason", "status", "merged_into", "merged_into_name", diff --git a/entities/tasks.py b/entities/tasks.py new file mode 100644 index 00000000..19564dc5 --- /dev/null +++ b/entities/tasks.py @@ -0,0 +1,885 @@ +"""Celery tasks and helpers for automated entity discovery.""" + +from __future__ import annotations + +import html +import logging +import re +from collections import defaultdict +from dataclasses import dataclass +from difflib import SequenceMatcher +from hashlib import sha1 +from itertools import combinations +from math import sqrt +from typing import Any, Protocol, cast +from urllib.parse import urlsplit +from uuid import NAMESPACE_URL, uuid5 + +import httpx +from atproto import Client +from celery import shared_task +from django.conf import settings +from django.db.models import Model +from django.utils import timezone +from django.utils.html import strip_tags +from django.utils.text import slugify +from mastodon import Mastodon + +from core.embeddings import build_entity_embedding_text, embed_text +from core.llm import openrouter_chat_json +from entities.extraction import ( + _normalize_name, + accept_entity_candidate, + merge_entity_candidate, +) +from entities.models import ( + Entity, + EntityCandidate, + EntityCandidateStatus, + EntityIdentityClaim, + IdentitySurface, +) +from ingestion.plugins.bluesky import PUBLIC_APPVIEW_BASE_URL +from ingestion.plugins.mastodon import MastodonSourcePlugin +from projects.model_support import ( + normalize_bluesky_handle, + normalize_linkedin_url, + normalize_mastodon_handle, +) +from projects.models import Project + +logger = logging.getLogger(__name__) + +AUTO_PROMOTION_MIN_OCCURRENCES = 5 +AUTO_PROMOTION_MIN_DISTINCT_SOURCES = 2 +AUTO_PROMOTION_MIN_CONFIDENCE = 0.85 +ENTITY_MATCH_SIMILARITY_THRESHOLD = 0.92 +CANDIDATE_CLUSTER_SIMILARITY_THRESHOLD = 0.78 +MAX_CANDIDATE_CONTEXTS = 5 +IDENTITY_CONFLICT_FIELDS = { + "github_url", + "linkedin_url", + "website_url", + "bluesky_handle", + "mastodon_handle", +} +GITHUB_API_BASE_URL = "https://api.github.com" +HTML_TITLE_PATTERN = re.compile( + r"(?P<title>.*?)", re.IGNORECASE | re.DOTALL +) + + +@dataclass(slots=True) +class IdentityProbeResult: + """Normalized result from probing one external identity surface.""" + + claim_url: str + verification_method: str + field_updates: dict[str, str] + + +class DelayedTask(Protocol): + """Protocol for Celery tasks that can run eagerly or via ``delay``.""" + + def __call__(self, *args: object, **kwargs: object) -> object: + pass + + def delay(self, *args: object, **kwargs: object) -> object: + pass + + +def _enqueue_task(task: object, *args: object) -> None: + """Dispatch a Celery task through a typed ``delay`` seam.""" + + cast(DelayedTask, task).delay(*args) + + +def _require_pk(instance: Model) -> int: + """Return a saved model primary key as an ``int``.""" + + instance_pk = instance.pk + if instance_pk is None: + raise ValueError(f"{instance.__class__.__name__} must be saved first.") + return int(instance_pk) + + +@shared_task(name="entities.tasks.run_all_entity_candidate_clustering") +def run_all_entity_candidate_clustering() -> int: + """Queue candidate clustering for every project.""" + + project_ids = list(Project.objects.values_list("id", flat=True)) + for project_id in project_ids: + if settings.CELERY_TASK_ALWAYS_EAGER: + cluster_entity_candidates(project_id) + else: + _enqueue_task(cluster_entity_candidates, project_id) + return len(project_ids) + + +@shared_task(name="entities.tasks.run_all_entity_candidate_auto_promotions") +def run_all_entity_candidate_auto_promotions() -> int: + """Queue automated candidate promotion checks for every project.""" + + project_ids = list(Project.objects.values_list("id", flat=True)) + for project_id in project_ids: + if settings.CELERY_TASK_ALWAYS_EAGER: + auto_promote_entity_candidates(project_id) + else: + _enqueue_task(auto_promote_entity_candidates, project_id) + return len(project_ids) + + +@shared_task(name="entities.tasks.cluster_entity_candidates") +def cluster_entity_candidates(project_id: int) -> dict[str, int]: + """Assign stable cluster keys to pending candidates inside one project.""" + + candidates = list( + EntityCandidate.objects.filter( + project_id=project_id, + status=EntityCandidateStatus.PENDING, + ) + .select_related("first_seen_in") + .prefetch_related("evidence") + .order_by("name") + ) + if not candidates: + return { + "project_id": project_id, + "clusters_created": 0, + "candidates_updated": 0, + } + + texts = { + _require_pk(candidate): _candidate_embedding_text(candidate) + for candidate in candidates + } + vectors = _candidate_vectors(texts) + groups = _candidate_groups(candidates, texts, vectors) + + candidates_updated = 0 + for group in groups: + cluster_key = _cluster_key_for_group(group) + for candidate in group: + contextual_embedding_id = uuid5( + NAMESPACE_URL, + f"entity-candidate:{_require_pk(candidate)}:{texts[_require_pk(candidate)]}", + ) + update_fields: list[str] = [] + if candidate.cluster_key != cluster_key: + candidate.cluster_key = cluster_key + update_fields.append("cluster_key") + if candidate.contextual_embedding_id != contextual_embedding_id: + candidate.contextual_embedding_id = contextual_embedding_id + update_fields.append("contextual_embedding_id") + if update_fields: + candidate.save(update_fields=update_fields + ["updated_at"]) + candidates_updated += 1 + + return { + "project_id": project_id, + "clusters_created": len(groups), + "candidates_updated": candidates_updated, + } + + +@shared_task(name="entities.tasks.auto_promote_entity_candidates") +def auto_promote_entity_candidates(project_id: int) -> dict[str, int]: + """Promote or merge pending candidates that meet the WP2 thresholds.""" + + candidates = list( + EntityCandidate.objects.filter( + project_id=project_id, + status=EntityCandidateStatus.PENDING, + ) + .select_related("project", "first_seen_in") + .prefetch_related("evidence") + .order_by("-occurrence_count", "name") + ) + entities = list(Entity.objects.filter(project_id=project_id).order_by("name")) + if not candidates: + return { + "project_id": project_id, + "promoted": 0, + "merged": 0, + "blocked": 0, + } + + promoted = 0 + merged = 0 + blocked = 0 + for candidate in candidates: + exact_match = _exact_matching_entity(candidate, entities) + if exact_match is not None: + merge_entity_candidate(candidate, exact_match, schedule_enrichment=True) + merged += 1 + continue + + blocked_reason = _candidate_blocked_reason(candidate) + if not blocked_reason: + best_match, similarity = _best_existing_entity_match(candidate, entities) + if ( + best_match is not None + and similarity >= ENTITY_MATCH_SIMILARITY_THRESHOLD + ): + blocked_reason = "matches_existing_entity" + + if blocked_reason: + blocked += 1 + if candidate.auto_promotion_blocked_reason != blocked_reason: + candidate.auto_promotion_blocked_reason = blocked_reason + candidate.save( + update_fields=["auto_promotion_blocked_reason", "updated_at"] + ) + continue + + entity = accept_entity_candidate(candidate, schedule_enrichment=True) + entities.append(entity) + promoted += 1 + + return { + "project_id": project_id, + "promoted": promoted, + "merged": merged, + "blocked": blocked, + } + + +@shared_task(name="entities.tasks.enrich_entity_identity") +def enrich_entity_identity(entity_id: int) -> dict[str, Any]: + """Project verified identity claims back onto the tracked entity fields.""" + + entity = ( + Entity.objects.filter(pk=entity_id) + .prefetch_related("identity_claims") + .select_related("project") + .get() + ) + claims = list( + entity.identity_claims.filter(verified=True).order_by("surface", "claim_url") + ) + if not claims: + return {"entity_id": entity_id, "claims_considered": 0, "fields_updated": 0} + + fields_updated = 0 + updated_fields: set[str] = set() + for claim in claims: + try: + probe_result = _probe_identity_claim(entity, claim) + except Exception: + logger.exception( + "Failed to probe identity claim id=%s for entity id=%s surface=%s", + _require_pk(claim), + entity_id, + claim.surface, + ) + continue + + _update_claim_from_probe(claim, probe_result) + for field_name, field_value in probe_result.field_updates.items(): + if not field_name or not field_value: + continue + if field_name in IDENTITY_CONFLICT_FIELDS and _claim_conflicts( + entity, field_name, field_value + ): + logger.warning( + "Skipping conflicting identity claim for entity id=%s field=%s value=%s", + entity_id, + field_name, + field_value, + ) + continue + if field_name in {"website_url", "description"} and getattr( + entity, field_name + ): + continue + if _entity_field_matches(entity, field_name, field_value): + continue + setattr(entity, field_name, field_value) + updated_fields.add(field_name) + fields_updated += 1 + + if updated_fields: + entity.save(update_fields=sorted(updated_fields)) + + return { + "entity_id": entity_id, + "claims_considered": len(claims), + "fields_updated": fields_updated, + } + + +def _probe_identity_claim( + entity: Entity, + claim: EntityIdentityClaim, +) -> IdentityProbeResult: + """Probe one identity claim and return canonicalized field updates.""" + + if claim.surface == IdentitySurface.GITHUB: + return _probe_github_claim(claim) + if claim.surface == IdentitySurface.BLUESKY: + return _probe_bluesky_claim(claim) + if claim.surface == IdentitySurface.MASTODON: + return _probe_mastodon_claim(claim) + if claim.surface == IdentitySurface.LINKEDIN: + return _probe_linkedin_claim(claim) + if claim.surface == IdentitySurface.WEBSITE: + return _probe_website_claim(claim) + return _default_probe_result_from_claim(claim) + + +def _probe_github_claim(claim: EntityIdentityClaim) -> IdentityProbeResult: + """Probe the GitHub users API for one stored profile claim.""" + + github_login = _github_login_from_claim_url(claim.claim_url) + if not github_login: + return _default_probe_result_from_claim(claim) + response = httpx.get( + f"{GITHUB_API_BASE_URL}/users/{github_login}", + headers={ + "Accept": "application/vnd.github+json", + "User-Agent": settings.OPENROUTER_APP_NAME or "newsletter-maker", + }, + timeout=20, + ) + response.raise_for_status() + payload = response.json() + if not isinstance(payload, dict): + raise RuntimeError("GitHub profile probe must return a JSON object.") + + field_updates = { + "github_url": str(payload.get("html_url") or claim.claim_url) + .strip() + .rstrip("/"), + } + blog_url = str(payload.get("blog") or "").strip() + if blog_url: + field_updates["website_url"] = blog_url.rstrip("/") + description = _github_description(payload) + if description: + field_updates["description"] = description + return IdentityProbeResult( + claim_url=field_updates["github_url"], + verification_method="github_api", + field_updates=field_updates, + ) + + +def _probe_bluesky_claim(claim: EntityIdentityClaim) -> IdentityProbeResult: + """Probe the Bluesky AppView profile API for one stored handle claim.""" + + handle = _bluesky_handle_from_claim(claim.claim_url) + if not handle: + return _default_probe_result_from_claim(claim) + + client = Client(base_url=PUBLIC_APPVIEW_BASE_URL) + profile = client.app.bsky.actor.get_profile({"actor": handle}) + canonical_handle = normalize_bluesky_handle( + str(getattr(profile, "handle", "") or handle) + ) + field_updates = {"bluesky_handle": canonical_handle} + description = str(getattr(profile, "description", "") or "").strip() + if description: + field_updates["description"] = description + return IdentityProbeResult( + claim_url=f"https://bsky.app/profile/{canonical_handle}", + verification_method="bluesky_appview", + field_updates=field_updates, + ) + + +def _probe_mastodon_claim(claim: EntityIdentityClaim) -> IdentityProbeResult: + """Probe the Mastodon account lookup API for one stored profile claim.""" + + canonical_handle = _mastodon_handle_from_claim(claim.claim_url) + if not canonical_handle: + return _default_probe_result_from_claim(claim) + parsed_claim = urlsplit(claim.claim_url) + instance_url = f"{parsed_claim.scheme or 'https'}://{parsed_claim.netloc}" + client = Mastodon(api_base_url=instance_url) + account = client.account_lookup(canonical_handle) + verified_handle = MastodonSourcePlugin._account_acct( + account, + instance_url=instance_url, + ) + account_url = str(MastodonSourcePlugin._nested_value(account, "url") or "").strip() + note_html = str(MastodonSourcePlugin._nested_value(account, "note") or "").strip() + + field_updates = {"mastodon_handle": verified_handle} + description = html.unescape(strip_tags(note_html)).strip() + if description: + field_updates["description"] = description + return IdentityProbeResult( + claim_url=account_url or claim.claim_url, + verification_method="mastodon_account_lookup", + field_updates=field_updates, + ) + + +def _probe_linkedin_claim(claim: EntityIdentityClaim) -> IdentityProbeResult: + """Probe one LinkedIn public profile URL and canonicalize redirects.""" + + response = httpx.get(claim.claim_url, follow_redirects=True, timeout=20) + response.raise_for_status() + canonical_url = normalize_linkedin_url(str(response.url)) + field_updates = {"linkedin_url": canonical_url} + description = _html_title(response.text) + if description: + field_updates["description"] = description + return IdentityProbeResult( + claim_url=canonical_url, + verification_method="linkedin_profile_url", + field_updates=field_updates, + ) + + +def _probe_website_claim(claim: EntityIdentityClaim) -> IdentityProbeResult: + """Probe one website claim and normalize the final origin URL.""" + + response = httpx.get(claim.claim_url, follow_redirects=True, timeout=20) + response.raise_for_status() + final_url = str(response.url).rstrip("/") + parsed_url = urlsplit(final_url) + canonical_origin = f"{parsed_url.scheme.lower()}://{parsed_url.netloc.lower()}" + return IdentityProbeResult( + claim_url=canonical_origin, + verification_method="website_http_probe", + field_updates={"website_url": canonical_origin}, + ) + + +def _default_probe_result_from_claim(claim: EntityIdentityClaim) -> IdentityProbeResult: + """Return the current claim mapping when no richer live probe exists.""" + + field_name, field_value = _entity_field_value_from_claim(claim) + field_updates = {field_name: field_value} if field_name and field_value else {} + return IdentityProbeResult( + claim_url=claim.claim_url, + verification_method=claim.verification_method or "candidate_evidence", + field_updates=field_updates, + ) + + +def _update_claim_from_probe( + claim: EntityIdentityClaim, + probe_result: IdentityProbeResult, +) -> None: + """Persist canonical claim details after a successful live probe.""" + + update_fields: list[str] = [] + if claim.claim_url != probe_result.claim_url: + claim.claim_url = probe_result.claim_url + update_fields.append("claim_url") + if claim.verification_method != probe_result.verification_method: + claim.verification_method = probe_result.verification_method + update_fields.append("verification_method") + if not claim.verified: + claim.verified = True + update_fields.append("verified") + if claim.verified_at is None: + claim.verified_at = timezone.now() + update_fields.append("verified_at") + if update_fields: + claim.save(update_fields=update_fields) + + +def _entity_field_matches(entity: Entity, field_name: str, field_value: str) -> bool: + """Return whether the entity already stores the normalized field value.""" + + existing_value = getattr(entity, field_name) + if field_name in IDENTITY_CONFLICT_FIELDS: + return _normalized_field_value( + field_name, existing_value + ) == _normalized_field_value(field_name, field_value) + return str(existing_value).strip() == str(field_value).strip() + + +def _github_login_from_claim_url(claim_url: str) -> str: + """Extract the GitHub login from a stored claim URL.""" + + path_parts = [part for part in urlsplit(claim_url).path.split("/") if part] + if not path_parts: + return "" + return path_parts[0] + + +def _github_description(payload: dict[str, Any]) -> str: + """Build a short description from the GitHub profile payload.""" + + description_parts = [ + str(payload.get("name") or "").strip(), + str(payload.get("bio") or "").strip(), + str(payload.get("company") or "").strip(), + str(payload.get("location") or "").strip(), + ] + return " - ".join(part for part in description_parts if part)[:500] + + +def _html_title(response_text: str) -> str: + """Extract a readable page title from HTML when one is present.""" + + match = HTML_TITLE_PATTERN.search(response_text or "") + if match is None: + return "" + return html.unescape(strip_tags(match.group("title"))).strip()[:500] + + +def _candidate_vectors(texts: dict[int, str]) -> dict[int, list[float]]: + """Embed candidate cluster texts when the embedding backend is available.""" + + vectors: dict[int, list[float]] = {} + for candidate_id, text in texts.items(): + try: + vectors[candidate_id] = embed_text(text) + except Exception: + logger.exception( + "Failed to embed entity candidate id=%s for clustering", candidate_id + ) + vectors[candidate_id] = [] + return vectors + + +def _candidate_groups( + candidates: list[EntityCandidate], + texts: dict[int, str], + vectors: dict[int, list[float]], +) -> list[list[EntityCandidate]]: + """Build candidate groups from name and context similarity.""" + + parents = { + _require_pk(candidate): _require_pk(candidate) for candidate in candidates + } + + def find(candidate_id: int) -> int: + parent_id = parents[candidate_id] + if parent_id != candidate_id: + parents[candidate_id] = find(parent_id) + return parents[candidate_id] + + def union(first_id: int, second_id: int) -> None: + first_root = find(first_id) + second_root = find(second_id) + if first_root != second_root: + parents[second_root] = first_root + + for first_candidate, second_candidate in combinations(candidates, 2): + first_id = _require_pk(first_candidate) + second_id = _require_pk(second_candidate) + similarity = _candidate_pair_similarity( + first_candidate, + second_candidate, + texts[first_id], + texts[second_id], + vectors.get(first_id, []), + vectors.get(second_id, []), + ) + if similarity >= CANDIDATE_CLUSTER_SIMILARITY_THRESHOLD: + union(first_id, second_id) + + grouped_candidates: dict[int, list[EntityCandidate]] = defaultdict(list) + for candidate in candidates: + grouped_candidates[find(_require_pk(candidate))].append(candidate) + return [ + sorted(group, key=lambda row: (-row.occurrence_count, row.name.casefold())) + for group in grouped_candidates.values() + ] + + +def _candidate_pair_similarity( + first_candidate: EntityCandidate, + second_candidate: EntityCandidate, + first_text: str, + second_text: str, + first_vector: list[float], + second_vector: list[float], +) -> float: + """Return the strongest available similarity signal for two candidates.""" + + normalized_first = _normalize_name(first_candidate.name) + normalized_second = _normalize_name(second_candidate.name) + name_similarity = SequenceMatcher(None, normalized_first, normalized_second).ratio() + token_similarity = _token_similarity(normalized_first, normalized_second) + vector_similarity = _cosine_similarity(first_vector, second_vector) + context_similarity = SequenceMatcher(None, first_text, second_text).ratio() + return max(name_similarity, token_similarity, vector_similarity, context_similarity) + + +def _token_similarity(first_value: str, second_value: str) -> float: + """Return a Jaccard-like overlap score for normalized token sets.""" + + first_tokens = {token for token in first_value.split() if token} + second_tokens = {token for token in second_value.split() if token} + if not first_tokens or not second_tokens: + return 0.0 + intersection = len(first_tokens & second_tokens) + union = len(first_tokens | second_tokens) + return intersection / union if union else 0.0 + + +def _cosine_similarity(first_vector: list[float], second_vector: list[float]) -> float: + """Return cosine similarity for two dense vectors.""" + + if not first_vector or not second_vector or len(first_vector) != len(second_vector): + return 0.0 + numerator = sum( + first * second for first, second in zip(first_vector, second_vector) + ) + first_magnitude = sqrt(sum(value * value for value in first_vector)) + second_magnitude = sqrt(sum(value * value for value in second_vector)) + if not first_magnitude or not second_magnitude: + return 0.0 + return numerator / (first_magnitude * second_magnitude) + + +def _cluster_key_for_group(group: list[EntityCandidate]) -> str: + """Build a stable cluster label for a candidate group.""" + + primary_name = group[0].name + name_slug = slugify(primary_name)[:40] or "candidate" + digest = sha1( + ",".join(str(_require_pk(candidate)) for candidate in group).encode("utf-8") + ).hexdigest()[:8] + return f"{name_slug}-{digest}"[:64] + + +def _candidate_embedding_text(candidate: EntityCandidate) -> str: + """Build the combined name, identity, and context text for one candidate.""" + + evidence_rows = list(candidate.evidence.all()[:MAX_CANDIDATE_CONTEXTS]) + claim_urls = [row.claim_url for row in evidence_rows if row.claim_url] + context_lines = [ + row.context_excerpt for row in evidence_rows if row.context_excerpt + ] + return "\n\n".join( + part + for part in [ + candidate.name, + candidate.suggested_type, + *claim_urls, + *context_lines, + ] + if part + ) + + +def _candidate_blocked_reason(candidate: EntityCandidate) -> str: + """Return the first auto-promotion blocker for a candidate.""" + + if candidate.occurrence_count < AUTO_PROMOTION_MIN_OCCURRENCES: + return "needs_more_occurrences" + distinct_sources = ( + candidate.evidence.values_list("source_plugin", flat=True).distinct().count() + ) + if distinct_sources < AUTO_PROMOTION_MIN_DISTINCT_SOURCES: + return "needs_more_source_diversity" + if not _candidate_has_verified_identity(candidate): + return "missing_verified_identity" + if _candidate_disambiguation_confidence(candidate) < AUTO_PROMOTION_MIN_CONFIDENCE: + return "low_disambiguation_confidence" + return "" + + +def _candidate_has_verified_identity(candidate: EntityCandidate) -> bool: + """Return whether the candidate has at least one verified identity hint.""" + + return ( + candidate.evidence.exclude(identity_surface="").exclude(claim_url="").exists() + ) + + +def _candidate_disambiguation_confidence(candidate: EntityCandidate) -> float: + """Score how likely the candidate refers to one stable real-world entity.""" + + evidence_rows = list(candidate.evidence.all()[:MAX_CANDIDATE_CONTEXTS]) + if not evidence_rows: + return 0.0 + if not settings.OPENROUTER_API_KEY: + return _heuristic_candidate_confidence(candidate, evidence_rows) + + try: + response = openrouter_chat_json( + model=settings.AI_CLASSIFICATION_MODEL, + system_prompt=( + "You validate whether an extracted entity candidate refers to one " + "stable real-world entity that is safe to auto-promote. " + "Return JSON with confidence (0 to 1) and explanation." + ), + user_prompt=_candidate_disambiguation_prompt(candidate, evidence_rows), + ) + except Exception: + logger.exception( + "Falling back to heuristic candidate confidence for candidate id=%s", + _require_pk(candidate), + ) + return _heuristic_candidate_confidence(candidate, evidence_rows) + return _clamp_unit_interval(response.payload.get("confidence", 0.0)) + + +def _candidate_disambiguation_prompt( + candidate: EntityCandidate, + evidence_rows: list[Any], +) -> str: + """Build the JSON-oriented prompt for candidate promotion confidence.""" + + evidence_payload = [ + { + "source_plugin": evidence.source_plugin, + "context_excerpt": evidence.context_excerpt, + "identity_surface": evidence.identity_surface, + "claim_url": evidence.claim_url, + } + for evidence in evidence_rows + ] + return ( + f"candidate_name:\n{candidate.name}\n\n" + f"suggested_type:\n{candidate.suggested_type}\n\n" + f"occurrence_count:\n{candidate.occurrence_count}\n\n" + f"evidence:\n{evidence_payload}\n\n" + "Return only a JSON object with fields: confidence, explanation" + ) + + +def _heuristic_candidate_confidence( + candidate: EntityCandidate, evidence_rows: list[Any] +) -> float: + """Score candidate stability when LLM-backed disambiguation is unavailable.""" + + distinct_sources = len({row.source_plugin for row in evidence_rows}) + has_identity = any(row.identity_surface and row.claim_url for row in evidence_rows) + title_case_name = ( + len([token for token in candidate.name.split() if token[:1].isupper()]) >= 2 + ) + score = 0.2 + score += min(candidate.occurrence_count, 8) * 0.05 + score += min(distinct_sources, 3) * 0.15 + if has_identity: + score += 0.25 + if title_case_name: + score += 0.1 + return min(score, 0.95) + + +def _best_existing_entity_match( + candidate: EntityCandidate, entities: list[Entity] +) -> tuple[Entity | None, float]: + """Find the closest existing entity match for a pending candidate.""" + + if not entities: + return None, 0.0 + try: + candidate_vector = embed_text(_candidate_embedding_text(candidate)) + except Exception: + logger.exception( + "Failed to embed entity candidate id=%s for entity matching", + _require_pk(candidate), + ) + return None, 0.0 + + best_entity: Entity | None = None + best_similarity = 0.0 + for entity in entities: + try: + entity_vector = embed_text(build_entity_embedding_text(entity)) + except Exception: + logger.exception( + "Failed to embed entity id=%s while matching entity candidate id=%s", + _require_pk(entity), + _require_pk(candidate), + ) + continue + similarity = _cosine_similarity(candidate_vector, entity_vector) + if similarity > best_similarity: + best_similarity = similarity + best_entity = entity + return best_entity, best_similarity + + +def _exact_matching_entity( + candidate: EntityCandidate, entities: list[Entity] +) -> Entity | None: + """Return a case-insensitive exact-name match for the pending candidate.""" + + normalized_candidate = _normalize_name(candidate.name) + for entity in entities: + if _normalize_name(entity.name) == normalized_candidate: + return entity + return None + + +def _entity_field_value_from_claim(claim: EntityIdentityClaim) -> tuple[str, str]: + """Map a verified identity claim to the corresponding entity field value.""" + + if claim.surface == IdentitySurface.GITHUB: + return "github_url", claim.claim_url.rstrip("/") + if claim.surface == IdentitySurface.LINKEDIN: + return "linkedin_url", normalize_linkedin_url(claim.claim_url) + if claim.surface == IdentitySurface.WEBSITE: + return "website_url", claim.claim_url.rstrip("/") + if claim.surface == IdentitySurface.BLUESKY: + handle = _bluesky_handle_from_claim(claim.claim_url) + return "bluesky_handle", handle + if claim.surface == IdentitySurface.MASTODON: + handle = _mastodon_handle_from_claim(claim.claim_url) + return "mastodon_handle", handle + return "", "" + + +def _claim_conflicts(entity: Entity, field_name: str, field_value: str) -> bool: + """Return whether another entity in the project already owns the same claim.""" + + for other_entity in Entity.objects.filter(project=entity.project).exclude( + pk=entity.pk + ): + other_value = getattr(other_entity, field_name) + if _normalized_field_value(field_name, other_value) == _normalized_field_value( + field_name, field_value + ): + return True + return False + + +def _normalized_field_value(field_name: str, value: str) -> str: + """Normalize entity identity fields for conflict detection.""" + + if field_name == "linkedin_url": + return normalize_linkedin_url(value) + if field_name == "bluesky_handle": + return normalize_bluesky_handle(value) + if field_name == "mastodon_handle": + return normalize_mastodon_handle(value) + return value.strip().rstrip("/").casefold() + + +def _bluesky_handle_from_claim(claim_url: str) -> str: + """Extract a Bluesky handle from a canonical claim URL.""" + + path_parts = [part for part in urlsplit(claim_url).path.split("/") if part] + if len(path_parts) >= 2 and path_parts[0] == "profile": + return normalize_bluesky_handle(path_parts[1]) + return "" + + +def _mastodon_handle_from_claim(claim_url: str) -> str: + """Extract a Mastodon handle from a canonical profile URL.""" + + parsed_url = urlsplit(claim_url) + host = parsed_url.netloc.lower() + path_parts = [part for part in parsed_url.path.split("/") if part] + if not host or not path_parts: + return "" + username = path_parts[-1].removeprefix("@") + return normalize_mastodon_handle(f"{username}@{host}") + + +def _clamp_unit_interval(value: object) -> float: + """Clamp arbitrary values into the closed unit interval.""" + + if isinstance(value, bool): + numeric_value = 1.0 if value else 0.0 + elif isinstance(value, (int, float, str)): + try: + numeric_value = float(value) + except ValueError: + numeric_value = 0.0 + else: + numeric_value = 0.0 + return max(0.0, min(1.0, numeric_value)) diff --git a/entities/tests/test_admin.py b/entities/tests/test_admin.py index 5769c7ed..4f56bb9b 100644 --- a/entities/tests/test_admin.py +++ b/entities/tests/test_admin.py @@ -150,6 +150,7 @@ def test_entity_authority_snapshot_admin_helpers_render_expected_values( def test_accept_selected_entity_candidates_creates_entity_and_backfills_mentions( source_admin_context, mocker ): + mocker.patch("entities.extraction.queue_entity_identity_enrichment") content = Content.objects.create( project=source_admin_context.project, url="https://example.com/river-labs-launch", @@ -212,6 +213,7 @@ def test_reject_selected_entity_candidates_marks_candidates_rejected( def test_merge_selected_entity_candidates_uses_existing_same_name_entity( source_admin_context, mocker ): + mocker.patch("entities.extraction.queue_entity_identity_enrichment") content = Content.objects.create( project=source_admin_context.project, url="https://example.com/acme-merge", diff --git a/entities/tests/test_api.py b/entities/tests/test_api.py index a25e7148..0e3d6999 100644 --- a/entities/tests/test_api.py +++ b/entities/tests/test_api.py @@ -1,4 +1,5 @@ from typing import Any, cast +from unittest.mock import patch from django.contrib.auth import get_user_model from django.db.models import Model @@ -12,6 +13,7 @@ EntityAuthoritySnapshot, EntityCandidate, EntityCandidateStatus, + EntityIdentityClaim, EntityMention, ) from projects.models import Project, ProjectMembership, ProjectRole @@ -146,6 +148,27 @@ def test_entity_list_includes_recent_mentions(self): self.owner_content.title, ) + def test_entity_list_includes_identity_claims(self): + EntityIdentityClaim.objects.create( + entity=self.owner_entity, + surface="linkedin", + claim_url="https://www.linkedin.com/company/owner-entity", + verified=True, + verification_method="candidate_evidence", + ) + + response = self.client.get( + reverse( + "v1:project-entity-list", + kwargs={"project_id": _require_pk(self.owner_project)}, + ) + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual( + response.json()[0]["identity_claims"][0]["surface"], "linkedin" + ) + def test_entity_mentions_action_returns_full_mention_history(self): first_mention = EntityMention.objects.create( project=self.owner_project, @@ -266,6 +289,8 @@ def test_entity_candidate_list_is_scoped_to_request_user_project(self): name="Owner Candidate", suggested_type="vendor", first_seen_in=self.owner_content, + cluster_key="owner-candidate-abcd1234", + auto_promotion_blocked_reason="needs_more_occurrences", ) EntityCandidate.objects.create( project=self.other_project, @@ -284,25 +309,31 @@ def test_entity_candidate_list_is_scoped_to_request_user_project(self): self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.json()), 1) self.assertEqual(response.json()[0]["id"], _require_pk(owner_candidate)) + self.assertEqual(response.json()[0]["cluster_key"], owner_candidate.cluster_key) + self.assertEqual( + response.json()[0]["auto_promotion_blocked_reason"], + owner_candidate.auto_promotion_blocked_reason, + ) def test_entity_candidate_accept_action_returns_updated_candidate(self): - candidate = EntityCandidate.objects.create( - project=self.owner_project, - name="River Labs", - suggested_type="vendor", - first_seen_in=self.owner_content, - ) + with patch("entities.extraction.queue_entity_identity_enrichment"): + candidate = EntityCandidate.objects.create( + project=self.owner_project, + name="River Labs", + suggested_type="vendor", + first_seen_in=self.owner_content, + ) - response = self.client.post( - reverse( - "v1:project-entity-candidate-accept", - kwargs={ - "project_id": _require_pk(self.owner_project), - "pk": _require_pk(candidate), - }, - ), - format="json", - ) + response = self.client.post( + reverse( + "v1:project-entity-candidate-accept", + kwargs={ + "project_id": _require_pk(self.owner_project), + "pk": _require_pk(candidate), + }, + ), + format="json", + ) candidate.refresh_from_db() self.assertEqual(response.status_code, status.HTTP_200_OK) @@ -358,29 +389,28 @@ def test_entity_candidate_merge_rejects_cross_project_entity(self): self.assert_standardized_validation_error(response.json(), "merged_into") def test_entity_candidate_merge_action_returns_updated_candidate(self): - candidate = EntityCandidate.objects.create( - project=self.owner_project, - name="Owner Entity Alias", - suggested_type="vendor", - first_seen_in=self.owner_content, - ) + with patch("entities.extraction.queue_entity_identity_enrichment"): + candidate = EntityCandidate.objects.create( + project=self.owner_project, + name="Owner Entity Alias", + suggested_type="vendor", + first_seen_in=self.owner_content, + ) - response = self.client.post( - reverse( - "v1:project-entity-candidate-merge", - kwargs={ - "project_id": _require_pk(self.owner_project), - "pk": _require_pk(candidate), - }, - ), - {"merged_into": _require_pk(self.owner_entity)}, - format="json", - ) + response = self.client.post( + reverse( + "v1:project-entity-candidate-merge", + kwargs={ + "project_id": _require_pk(self.owner_project), + "pk": _require_pk(candidate), + }, + ), + {"merged_into": _require_pk(self.owner_entity)}, + format="json", + ) candidate.refresh_from_db() self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(candidate.status, EntityCandidateStatus.MERGED) self.assertEqual(candidate.merged_into, self.owner_entity) - self.assertEqual( - response.json()["merged_into"], _require_pk(self.owner_entity) - ) + self.assertEqual(response.json()["merged_into"], _require_pk(self.owner_entity)) diff --git a/entities/tests/test_extraction.py b/entities/tests/test_extraction.py new file mode 100644 index 00000000..250a1d25 --- /dev/null +++ b/entities/tests/test_extraction.py @@ -0,0 +1,62 @@ +from types import SimpleNamespace + +import pytest +from django.utils import timezone + +from content.models import Content +from entities.extraction import run_entity_extraction +from entities.models import EntityCandidate, EntityCandidateEvidence +from projects.model_support import SourcePluginName +from projects.models import Project + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def extraction_context(): + project = Project.objects.create( + name="Extraction Project", + topic_description="Infra", + ) + content = Content.objects.create( + project=project, + url="https://www.linkedin.com/posts/river-labs-post", + title="weekly infrastructure notes", + author="River Labs", + source_plugin=SourcePluginName.LINKEDIN, + published_date=timezone.now(), + content_text="River Labs shared its latest platform roadmap.", + source_metadata={ + "author_profile_url": "https://www.linkedin.com/company/river-labs/", + }, + ) + return SimpleNamespace(project=project, content=content) + + +def test_run_entity_extraction_persists_linkedin_candidate_identity_evidence( + extraction_context, + mocker, + settings, +): + settings.OPENROUTER_API_KEY = "" + mocker.patch( + "entities.extraction.search_similar_entities_for_content", + return_value=[], + ) + + result = run_entity_extraction(extraction_context.content) + + candidate = EntityCandidate.objects.get( + project=extraction_context.project, + name="River Labs", + ) + evidence = EntityCandidateEvidence.objects.get(candidate=candidate) + + assert any( + candidate_payload["name"] == "River Labs" + for candidate_payload in result["candidate_entities"] + ) + assert evidence.source_plugin == SourcePluginName.LINKEDIN + assert evidence.identity_surface == "linkedin" + assert evidence.claim_url == "https://www.linkedin.com/company/river-labs" + assert "River Labs" in evidence.context_excerpt diff --git a/entities/tests/test_tasks.py b/entities/tests/test_tasks.py new file mode 100644 index 00000000..ed202cc3 --- /dev/null +++ b/entities/tests/test_tasks.py @@ -0,0 +1,270 @@ +from types import SimpleNamespace + +import pytest +from django.utils import timezone + +from content.models import Content +from entities.models import ( + Entity, + EntityCandidate, + EntityCandidateEvidence, + EntityCandidateStatus, + EntityIdentityClaim, +) +from entities.tasks import ( + IdentityProbeResult, + auto_promote_entity_candidates, + cluster_entity_candidates, + enrich_entity_identity, +) +from projects.model_support import SourcePluginName +from projects.models import Project + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def entity_discovery_context(): + project = Project.objects.create( + name="Discovery Project", topic_description="Infra" + ) + return SimpleNamespace(project=project) + + +def _content(project: Project, *, suffix: str, source_plugin: str) -> Content: + return Content.objects.create( + project=project, + url=f"https://example.com/{suffix}", + title=f"{suffix} title", + author="River Labs", + source_plugin=source_plugin, + published_date=timezone.now(), + content_text=f"{suffix} content for River Labs.", + ) + + +def test_cluster_entity_candidates_groups_similar_candidates( + entity_discovery_context, + mocker, +): + first_candidate = EntityCandidate.objects.create( + project=entity_discovery_context.project, + name="River Labs", + suggested_type="vendor", + occurrence_count=3, + ) + second_candidate = EntityCandidate.objects.create( + project=entity_discovery_context.project, + name="River Lab", + suggested_type="vendor", + occurrence_count=2, + ) + third_candidate = EntityCandidate.objects.create( + project=entity_discovery_context.project, + name="Acme Security", + suggested_type="vendor", + occurrence_count=2, + ) + mocker.patch( + "entities.tasks.embed_text", + side_effect=lambda text: { + "River Labs": [1.0, 0.0], + "River Lab": [0.98, 0.02], + "Acme Security": [0.0, 1.0], + }.get(text.split("\n\n")[0], [0.5, 0.5]), + ) + + result = cluster_entity_candidates(entity_discovery_context.project.id) + + first_candidate.refresh_from_db() + second_candidate.refresh_from_db() + third_candidate.refresh_from_db() + + assert result["clusters_created"] == 2 + assert first_candidate.cluster_key == second_candidate.cluster_key + assert first_candidate.cluster_key != third_candidate.cluster_key + assert first_candidate.contextual_embedding_id is not None + + +def test_auto_promote_entity_candidates_promotes_verified_multi_source_candidate( + entity_discovery_context, + settings, + mocker, +): + settings.CELERY_TASK_ALWAYS_EAGER = True + candidate = EntityCandidate.objects.create( + project=entity_discovery_context.project, + name="River Labs", + suggested_type="vendor", + occurrence_count=5, + ) + contents = [ + _content( + entity_discovery_context.project, + suffix=f"item-{index}", + source_plugin=plugin, + ) + for index, plugin in enumerate( + [ + SourcePluginName.LINKEDIN, + SourcePluginName.RSS, + SourcePluginName.LINKEDIN, + SourcePluginName.RSS, + SourcePluginName.LINKEDIN, + ], + start=1, + ) + ] + for content in contents: + EntityCandidateEvidence.objects.create( + candidate=candidate, + project=entity_discovery_context.project, + content=content, + source_plugin=content.source_plugin, + context_excerpt=f"{candidate.name} context from {content.source_plugin}", + identity_surface=( + "linkedin" if content.source_plugin == SourcePluginName.LINKEDIN else "" + ), + claim_url=( + "https://www.linkedin.com/company/river-labs" + if content.source_plugin == SourcePluginName.LINKEDIN + else "" + ), + ) + mocker.patch( + "entities.tasks._candidate_disambiguation_confidence", return_value=0.9 + ) + mocker.patch( + "entities.tasks._probe_identity_claim", + return_value=IdentityProbeResult( + claim_url="https://www.linkedin.com/company/river-labs", + verification_method="linkedin_profile_url", + field_updates={ + "linkedin_url": "https://www.linkedin.com/company/river-labs" + }, + ), + ) + + result = auto_promote_entity_candidates(entity_discovery_context.project.id) + + candidate.refresh_from_db() + entity = Entity.objects.get( + project=entity_discovery_context.project, name="River Labs" + ) + claim = EntityIdentityClaim.objects.get(entity=entity, surface="linkedin") + + assert result["promoted"] == 1 + assert candidate.status == EntityCandidateStatus.ACCEPTED + assert candidate.merged_into == entity + assert entity.linkedin_url == "https://www.linkedin.com/company/river-labs" + assert claim.verified is True + + +def test_enrich_entity_identity_uses_github_profile_probe( + entity_discovery_context, + mocker, +): + entity = Entity.objects.create( + project=entity_discovery_context.project, + name="River Labs", + type="vendor", + ) + claim = EntityIdentityClaim.objects.create( + entity=entity, + surface="github", + claim_url="https://github.com/river-labs", + verified=True, + verification_method="candidate_evidence", + ) + github_response = mocker.Mock() + github_response.raise_for_status.return_value = None + github_response.json.return_value = { + "html_url": "https://github.com/river-labs", + "blog": "https://riverlabs.dev", + "name": "River Labs", + "bio": "Cloud infrastructure tooling.", + "company": "River Labs", + "location": "Remote", + } + mocker.patch("entities.tasks.httpx.get", return_value=github_response) + + result = enrich_entity_identity(entity.id) + + entity.refresh_from_db() + claim.refresh_from_db() + + assert result["claims_considered"] == 1 + assert entity.github_url == "https://github.com/river-labs" + assert entity.website_url == "https://riverlabs.dev" + assert "Cloud infrastructure tooling." in entity.description + assert claim.verification_method == "github_api" + + +def test_enrich_entity_identity_uses_bluesky_profile_probe( + entity_discovery_context, + mocker, +): + entity = Entity.objects.create( + project=entity_discovery_context.project, + name="River Labs", + type="vendor", + ) + claim = EntityIdentityClaim.objects.create( + entity=entity, + surface="bluesky", + claim_url="https://bsky.app/profile/riverlabs.bsky.social", + verified=True, + verification_method="candidate_evidence", + ) + bluesky_client = mocker.Mock() + bluesky_client.app.bsky.actor.get_profile.return_value = SimpleNamespace( + handle="riverlabs.bsky.social", + description="Infra notes from River Labs.", + ) + mocker.patch("entities.tasks.Client", return_value=bluesky_client) + + result = enrich_entity_identity(entity.id) + + entity.refresh_from_db() + claim.refresh_from_db() + + assert result["claims_considered"] == 1 + assert entity.bluesky_handle == "riverlabs.bsky.social" + assert entity.description == "Infra notes from River Labs." + assert claim.verification_method == "bluesky_appview" + + +def test_auto_promote_entity_candidates_blocks_single_source_candidates( + entity_discovery_context, + mocker, +): + candidate = EntityCandidate.objects.create( + project=entity_discovery_context.project, + name="Single Source Vendor", + suggested_type="vendor", + occurrence_count=5, + ) + for index in range(5): + content = _content( + entity_discovery_context.project, + suffix=f"single-{index}", + source_plugin=SourcePluginName.RSS, + ) + EntityCandidateEvidence.objects.create( + candidate=candidate, + project=entity_discovery_context.project, + content=content, + source_plugin=content.source_plugin, + context_excerpt=f"Single Source Vendor context {index}", + ) + mocker.patch( + "entities.tasks._candidate_disambiguation_confidence", return_value=0.9 + ) + + result = auto_promote_entity_candidates(entity_discovery_context.project.id) + + candidate.refresh_from_db() + + assert result["blocked"] == 1 + assert candidate.status == EntityCandidateStatus.PENDING + assert candidate.auto_promotion_blocked_reason == "needs_more_source_diversity" diff --git a/newsletter_maker/settings/celery.py b/newsletter_maker/settings/celery.py index ef7a6dc0..d7e4df40 100644 --- a/newsletter_maker/settings/celery.py +++ b/newsletter_maker/settings/celery.py @@ -35,6 +35,14 @@ "task": "core.tasks.run_all_topic_cluster_recomputations", "schedule": crontab(hour=4, minute=0), }, + "run-all-entity-candidate-clustering-nightly": { + "task": "entities.tasks.run_all_entity_candidate_clustering", + "schedule": crontab(hour=4, minute=30), + }, + "run-all-entity-candidate-auto-promotions-nightly": { + "task": "entities.tasks.run_all_entity_candidate_auto_promotions", + "schedule": crontab(hour=4, minute=45), + }, } __all__ = [