diff --git a/.env.example b/.env.example index 9df9fcb6..9f0658ce 100644 --- a/.env.example +++ b/.env.example @@ -11,6 +11,7 @@ OPENROUTER_API_KEY= OPENROUTER_API_BASE=https://openrouter.ai/api/v1 OPENROUTER_APP_URL= OPENROUTER_APP_NAME=newsletter-maker + AI_CLASSIFICATION_MODEL=meta-llama/llama-3.1-70b-instruct AI_RELEVANCE_MODEL=qwen/qwen-2.5-72b-instruct AI_SUMMARIZATION_MODEL=google/gemma-3-27b-it @@ -21,14 +22,39 @@ AI_RELEVANCE_REVIEW_THRESHOLD=0.4 AI_RELEVANCE_SUMMARIZE_THRESHOLD=0.7 AI_MAX_NODE_RETRIES=2 AI_REQUEST_TIMEOUT_SECONDS=60 + EMBEDDING_PROVIDER=sentence-transformers EMBEDDING_MODEL=sentence-transformers/all-MiniLM-L6-v2 EMBEDDING_TRUST_REMOTE_CODE=false + OLLAMA_URL=http://localhost:11434 + REDDIT_CLIENT_ID= REDDIT_CLIENT_SECRET= REDDIT_USER_AGENT=newsletter-maker/0.1 + +# Outbound mail provider. Use Resend or Amazon SES. +EMAIL_BACKEND=anymail.backends.resend.EmailBackend +DEFAULT_FROM_EMAIL=onboarding@resend.dev +SERVER_EMAIL=onboarding@resend.dev + +# Resend outbound + inbound +RESEND_API_KEY= +RESEND_FROM_EMAIL=onboarding@resend.dev +RESEND_INBOUND_SECRET= + +# Amazon SES outbound + inbound +# EMAIL_BACKEND=anymail.backends.amazon_ses.EmailBackend +# AWS_ACCESS_KEY_ID= +# AWS_SECRET_ACCESS_KEY= +# AWS_DEFAULT_REGION=us-east-1 + +# Shared webhook basic auth for providers that need it. +# Format must be username:password. +ANYMAIL_WEBHOOK_SECRET= + LOG_LEVEL=INFO + CELERY_TASK_ALWAYS_EAGER=false DJANGO_SUPERUSER_USERNAME=admin @@ -40,5 +66,7 @@ NEWSLETTER_API_USERNAME=admin NEWSLETTER_API_PASSWORD=adminpass DEBUG=True + ALLOWED_HOSTS=localhost,127.0.0.1,newslettermaker.tech + FRONTEND_URL=http://localhost:3000 diff --git a/.vscode/settings.json b/.vscode/settings.json index d55b96fa..33023242 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,10 +1,12 @@ { "cSpell.words": [ "ASGI", + "botocore", "buildx", "cbranch", "cfgv", "cstat", + "dateutil", "djlint", "FAVICONS", "Feedly", @@ -33,6 +35,7 @@ "readyz", "Referer", "simplejwt", + "svix", "Unparseable", "unstub", "upserted", diff --git a/README.md b/README.md index d451cbd3..86fb6b76 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Every AI capability is a standalone, documented module following the Claude Skil Seven skills form the core pipeline: | Skill | Description | -|-------|-------------| +| ----- | ----------- | | **Content Classification** | Categorizes raw content (e.g., tutorial, opinion, release notes) and assigns a confidence score. | | **Relevance Scoring** | Evaluates content usefulness using semantic similarity against a reference corpus and LLM judgment. | | **Deduplication** | Compares new content against recent embeddings to group similar topics and pick the best version. | @@ -175,5 +175,3 @@ For the default local bootstrap, `.env` also seeds an `admin` superuser in the c ## License This repository is licensed under the GNU Affero General Public License v3.0 or later. See [LICENSE](LICENSE). - -Based on the current direct dependencies, AGPL is a reasonable fit: the packages in use are permissive or LGPL-compatible licenses such as BSD, MIT, Apache-2.0, and LGPLv3. That said, this is a practical compatibility check, not legal advice, so review it with counsel if you need a formal licensing opinion. diff --git a/core/apps.py b/core/apps.py index c0ce093b..c2c322dd 100644 --- a/core/apps.py +++ b/core/apps.py @@ -4,3 +4,6 @@ class CoreConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" name = "core" + + def ready(self) -> None: + import core.signals # noqa: F401 diff --git a/core/migrations/0002_newsletter_intake.py b/core/migrations/0002_newsletter_intake.py new file mode 100644 index 00000000..70991ab7 --- /dev/null +++ b/core/migrations/0002_newsletter_intake.py @@ -0,0 +1,83 @@ +from django.db import migrations, models +import django.db.models.deletion + +import core.models + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0001_initial"), + ] + + operations = [ + migrations.AddField( + model_name="project", + name="intake_enabled", + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name="project", + name="intake_token", + field=models.CharField(default=core.models.generate_project_intake_token, editable=False, max_length=64, unique=True), + ), + migrations.AddField( + model_name="content", + name="source_metadata", + field=models.JSONField(blank=True, default=dict), + ), + migrations.CreateModel( + name="IntakeAllowlist", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("sender_email", models.EmailField(max_length=254)), + ("confirmed_at", models.DateTimeField(blank=True, null=True)), + ("confirmation_token", models.CharField(default=core.models.generate_confirmation_token, max_length=64, unique=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "project", + models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name="intake_allowlist", to="core.project"), + ), + ], + options={ + "ordering": ["sender_email"], + }, + ), + migrations.CreateModel( + name="NewsletterIntake", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("sender_email", models.EmailField(max_length=254)), + ("subject", models.CharField(max_length=512)), + ("received_at", models.DateTimeField(auto_now_add=True)), + ("raw_html", models.TextField(blank=True)), + ("raw_text", models.TextField(blank=True)), + ("message_id", models.CharField(max_length=255, unique=True)), + ( + "status", + models.CharField( + choices=[("pending", "Pending"), ("extracted", "Extracted"), ("failed", "Failed"), ("rejected", "Rejected")], + default="pending", + max_length=16, + ), + ), + ("extraction_result", models.JSONField(blank=True, null=True)), + ("error_message", models.TextField(blank=True)), + ( + "project", + models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name="newsletter_intakes", to="core.project"), + ), + ], + options={ + "ordering": ["-received_at"], + }, + ), + migrations.AddIndex( + model_name="newsletterintake", + index=models.Index(fields=["project", "sender_email", "status"], name="core_newsle_project_2c63fb_idx"), + ), + migrations.AddConstraint( + model_name="intakeallowlist", + constraint=models.UniqueConstraint(fields=("project", "sender_email"), name="core_allowlist_unique_project_sender"), + ), + ] diff --git a/core/models.py b/core/models.py index e23f7dfe..3b260940 100644 --- a/core/models.py +++ b/core/models.py @@ -1,8 +1,18 @@ +import secrets + from django.conf import settings from django.contrib.auth.models import Group from django.db import models +def generate_project_intake_token() -> str: + return secrets.token_hex(16) + + +def generate_confirmation_token() -> str: + return secrets.token_urlsafe(24) + + class EntityType(models.TextChoices): INDIVIDUAL = "individual", "Individual" VENDOR = "vendor", "Vendor" @@ -26,6 +36,13 @@ class SourcePluginName(models.TextChoices): REDDIT = "reddit", "Reddit" +class NewsletterIntakeStatus(models.TextChoices): + PENDING = "pending", "Pending" + EXTRACTED = "extracted", "Extracted" + FAILED = "failed", "Failed" + REJECTED = "rejected", "Rejected" + + class RunStatus(models.TextChoices): RUNNING = "running", "Running" SUCCESS = "success", "Success" @@ -47,6 +64,8 @@ class Project(models.Model): group = models.ForeignKey(Group, on_delete=models.CASCADE, related_name="projects") topic_description = models.TextField() content_retention_days = models.PositiveIntegerField(default=365) + intake_token = models.CharField(max_length=64, unique=True, default=generate_project_intake_token, editable=False) + intake_enabled = models.BooleanField(default=False) created_at = models.DateTimeField(auto_now_add=True) class Meta: @@ -107,6 +126,7 @@ class Content(models.Model): content_text = models.TextField() relevance_score = models.FloatField(null=True, blank=True) embedding_id = models.CharField(max_length=64, blank=True) + source_metadata = models.JSONField(default=dict, blank=True) is_reference = models.BooleanField(default=False) is_active = models.BooleanField(default=True) @@ -123,6 +143,49 @@ def __str__(self) -> str: return self.title +class IntakeAllowlist(models.Model): + project = models.ForeignKey(Project, on_delete=models.CASCADE, related_name="intake_allowlist") + sender_email = models.EmailField() + confirmed_at = models.DateTimeField(null=True, blank=True) + confirmation_token = models.CharField(max_length=64, unique=True, default=generate_confirmation_token) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + ordering = ["sender_email"] + constraints = [ + models.UniqueConstraint(fields=["project", "sender_email"], name="core_allowlist_unique_project_sender"), + ] + + def __str__(self) -> str: + return f"{self.sender_email} for {self.project.name}" + + @property + def is_confirmed(self) -> bool: + return self.confirmed_at is not None + + +class NewsletterIntake(models.Model): + project = models.ForeignKey(Project, on_delete=models.CASCADE, related_name="newsletter_intakes") + sender_email = models.EmailField() + subject = models.CharField(max_length=512) + received_at = models.DateTimeField(auto_now_add=True) + raw_html = models.TextField(blank=True) + raw_text = models.TextField(blank=True) + message_id = models.CharField(max_length=255, unique=True) + status = models.CharField(max_length=16, choices=NewsletterIntakeStatus.choices, default=NewsletterIntakeStatus.PENDING) + extraction_result = models.JSONField(null=True, blank=True) + error_message = models.TextField(blank=True) + + class Meta: + ordering = ["-received_at"] + indexes = [ + models.Index(fields=["project", "sender_email", "status"]), + ] + + def __str__(self) -> str: + return f"{self.subject or self.message_id}" + + class SkillResult(models.Model): content = models.ForeignKey(Content, on_delete=models.CASCADE, related_name="skill_results") project = models.ForeignKey(Project, on_delete=models.CASCADE, related_name="skill_results") diff --git a/core/newsletter_extraction.py b/core/newsletter_extraction.py new file mode 100644 index 00000000..87e85113 --- /dev/null +++ b/core/newsletter_extraction.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass +from html.parser import HTMLParser + +URL_PATTERN = re.compile(r"https?://[^\s<>'\"]+") + + +@dataclass(slots=True) +class ExtractedNewsletterItem: + url: str + title: str + excerpt: str + position: int + + +class _NewsletterLinkParser(HTMLParser): + def __init__(self) -> None: + super().__init__() + self.links: list[dict[str, str]] = [] + self._active_href: str | None = None + self._active_text: list[str] = [] + + def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: + if tag != "a": + return + for name, value in attrs: + if name == "href" and value and value.startswith(("http://", "https://")): + self._active_href = value + self._active_text = [] + return + + def handle_data(self, data: str) -> None: + if self._active_href is not None: + self._active_text.append(data) + + def handle_endtag(self, tag: str) -> None: + if tag != "a" or self._active_href is None: + return + self.links.append( + { + "url": self._active_href, + "title": " ".join(part.strip() for part in self._active_text if part.strip()), + } + ) + self._active_href = None + self._active_text = [] + + +def extract_newsletter_items(*, subject: str, raw_html: str, raw_text: str) -> list[ExtractedNewsletterItem]: + parser = _NewsletterLinkParser() + if raw_html: + parser.feed(raw_html) + + seen_urls: set[str] = set() + extracted_items: list[ExtractedNewsletterItem] = [] + for candidate in parser.links: + url = candidate["url"].strip() + if not url or url in seen_urls: + continue + seen_urls.add(url) + extracted_items.append( + ExtractedNewsletterItem( + url=url, + title=candidate["title"] or subject or url, + excerpt=raw_text[:500].strip(), + position=len(extracted_items) + 1, + ) + ) + + for match in URL_PATTERN.finditer(raw_text): + url = match.group(0).rstrip(".,)") + if url in seen_urls: + continue + seen_urls.add(url) + extracted_items.append( + ExtractedNewsletterItem( + url=url, + title=subject or url, + excerpt=raw_text[:500].strip(), + position=len(extracted_items) + 1, + ) + ) + + return extracted_items diff --git a/core/newsletters.py b/core/newsletters.py new file mode 100644 index 00000000..a44b45fe --- /dev/null +++ b/core/newsletters.py @@ -0,0 +1,257 @@ +from __future__ import annotations + +from email.utils import parseaddr +from html import escape +from html.parser import HTMLParser +from typing import Any, Iterable, cast + +from celery import current_app +from django.conf import settings as django_settings +from django.core.mail import EmailMultiAlternatives +from django.urls import reverse + +from core.models import IntakeAllowlist, NewsletterIntake, Project +from core.newsletter_extraction import extract_newsletter_items +from core.settings_types import CoreSettings + +settings = cast(CoreSettings, django_settings) + +__all__ = ["extract_newsletter_items"] + + +def normalize_sender_email(value: str) -> str: + _, email_address = parseaddr(value) + return email_address.strip().lower() + + +def sanitize_newsletter_html(raw_html: str) -> str: + without_scripts = _strip_script_blocks(raw_html) + parser = _InlineHandlerStrippingParser() + parser.feed(without_scripts) + parser.close() + return parser.get_html() + + +def _strip_script_blocks(raw_html: str) -> str: + sanitized_parts: list[str] = [] + index = 0 + raw_length = len(raw_html) + while index < raw_length: + script_start = _find_script_start(raw_html, index) + if script_start == -1: + sanitized_parts.append(raw_html[index:]) + break + + sanitized_parts.append(raw_html[index:script_start]) + opening_tag_end = _find_tag_end(raw_html, script_start + 1) + if opening_tag_end == -1: + break + + script_end = _find_script_end(raw_html, opening_tag_end + 1) + if script_end == -1: + break + index = script_end + + return "".join(sanitized_parts) + + +def _find_script_start(raw_html: str, start_index: int) -> int: + search_index = start_index + while True: + candidate = raw_html.lower().find("": + search_index = candidate + 1 + continue + closing_tag_end = _find_tag_end(raw_html, candidate + 1) + if closing_tag_end == -1: + return len(raw_html) + return closing_tag_end + 1 + + +def _find_tag_end(raw_html: str, start_index: int) -> int: + quote_char: str | None = None + for index in range(start_index, len(raw_html)): + current_char = raw_html[index] + if quote_char is not None: + if current_char == quote_char: + quote_char = None + continue + if current_char in {'"', "'"}: + quote_char = current_char + continue + if current_char == ">": + return index + return -1 + + +class _InlineHandlerStrippingParser(HTMLParser): + def __init__(self) -> None: + super().__init__(convert_charrefs=False) + self._parts: list[str] = [] + + def get_html(self) -> str: + return "".join(self._parts) + + def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: + self._parts.append(self._render_tag(tag, attrs)) + + def handle_startendtag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: + rendered = self._render_tag(tag, attrs) + if rendered.endswith(">"): + rendered = f"{rendered[:-1]} />" + self._parts.append(rendered) + + def handle_endtag(self, tag: str) -> None: + self._parts.append(f"{tag}>") + + def handle_data(self, data: str) -> None: + self._parts.append(data) + + def handle_entityref(self, name: str) -> None: + self._parts.append(f"&{name};") + + def handle_charref(self, name: str) -> None: + self._parts.append(f"{name};") + + def handle_comment(self, data: str) -> None: + self._parts.append(f"") + + def handle_decl(self, decl: str) -> None: + self._parts.append(f"") + + def unknown_decl(self, data: str) -> None: + self._parts.append(f"") + + @staticmethod + def _render_tag(tag: str, attrs: list[tuple[str, str | None]]) -> str: + rendered_attrs: list[str] = [] + for name, value in attrs: + if name.lower().startswith("on"): + continue + if value is None: + rendered_attrs.append(name) + continue + rendered_attrs.append(f'{name}="{escape(value, quote=True)}"') + attr_suffix = f" {' '.join(rendered_attrs)}" if rendered_attrs else "" + return f"<{tag}{attr_suffix}>" + + +def extract_project_token(recipient: str) -> str | None: + _, email_address = parseaddr(recipient) + local_part = email_address.partition("@")[0] + prefix, separator, token = local_part.partition("+") + if prefix != "intake" or separator != "+" or not token: + return None + return token + + +def send_confirmation_email(*, to_email: str, confirm_url: str, project_name: str) -> None: + subject = f"Confirm newsletter intake for {project_name}" + text_body = ( + "Confirm this sender for newsletter ingestion.\n\n" + f"Confirm sender: {confirm_url}" + ) + html_body = ( + "
Confirm this sender for newsletter ingestion.
" + f'' + ) + + message = EmailMultiAlternatives( + subject=subject, + body=text_body, + from_email=settings.DEFAULT_FROM_EMAIL, + to=[to_email], + ) + message.attach_alternative(html_body, "text/html") + message.send() + + +def build_confirmation_url(token: str) -> str: + base_url = settings.NEWSLETTER_API_BASE_URL.rstrip("/") + return f"{base_url}{reverse('confirm-newsletter-sender', kwargs={'token': token})}" + + +def process_inbound_newsletter( + *, + recipients: Iterable[str], + sender_email: str, + subject: str, + raw_html: str, + raw_text: str, + message_id: str, +) -> dict[str, Any]: + project = _find_intake_project(recipients) + if project is None: + return {"status": "ignored", "reason": "no_matching_project"} + + normalized_sender_email = normalize_sender_email(sender_email) + normalized_message_id = message_id.strip() + if not normalized_sender_email or not normalized_message_id: + return {"status": "ignored", "reason": "missing_sender_or_message_id"} + + defaults = { + "project": project, + "sender_email": normalized_sender_email, + "subject": subject[:512], + "raw_html": sanitize_newsletter_html(raw_html), + "raw_text": raw_text, + } + intake, created = NewsletterIntake.objects.get_or_create( + message_id=normalized_message_id, + defaults=defaults, + ) + if not created: + return {"id": intake.id, "status": intake.status, "duplicate": True} + + allowlist, allowlist_created = IntakeAllowlist.objects.get_or_create( + project=project, + sender_email=normalized_sender_email, + ) + + if allowlist.is_confirmed: + queue_newsletter_intake(intake.id) + return {"id": intake.id, "status": intake.status} + + if allowlist_created: + send_confirmation_email( + to_email=normalized_sender_email, + confirm_url=build_confirmation_url(allowlist.confirmation_token), + project_name=project.name, + ) + + return {"id": intake.id, "status": intake.status, "confirmation_required": True} + + +def queue_newsletter_intake(intake_id: int) -> None: + process_newsletter_intake = current_app.tasks["core.tasks.process_newsletter_intake"] + if settings.CELERY_TASK_ALWAYS_EAGER: + process_newsletter_intake.apply(args=(intake_id,), throw=True) + else: + process_newsletter_intake.delay(intake_id) + + +def _find_intake_project(recipients: Iterable[str]) -> Project | None: + for recipient in recipients: + token = extract_project_token(recipient) + if token is None: + continue + project = Project.objects.filter(intake_token=token, intake_enabled=True).first() + if project is not None: + return project + return None diff --git a/core/serializers.py b/core/serializers.py index 7f56ef29..16850b4e 100644 --- a/core/serializers.py +++ b/core/serializers.py @@ -5,6 +5,8 @@ Content, Entity, IngestionRun, + IntakeAllowlist, + NewsletterIntake, Project, ProjectConfig, ReviewQueue, @@ -48,7 +50,16 @@ class ProjectSerializer(ProjectScopedSerializerMixin, serializers.ModelSerialize class Meta: model = Project - fields = ["id", "name", "group", "topic_description", "content_retention_days", "created_at"] + fields = [ + "id", + "name", + "group", + "topic_description", + "content_retention_days", + "intake_token", + "intake_enabled", + "created_at", + ] read_only_fields = ["id", "created_at"] @@ -103,6 +114,7 @@ class Meta: "content_text", "relevance_score", "embedding_id", + "source_metadata", "is_reference", "is_active", ] @@ -205,3 +217,29 @@ def validate(self, attrs): if project and content and content.project_id != project.id: raise serializers.ValidationError({"content": "Content must belong to the selected project."}) return attrs + + +class IntakeAllowlistSerializer(ProjectScopedSerializerMixin, serializers.ModelSerializer): + class Meta: + model = IntakeAllowlist + fields = ["id", "project", "sender_email", "confirmed_at", "confirmation_token", "created_at"] + read_only_fields = ["id", "project", "confirmation_token", "created_at"] + + +class NewsletterIntakeSerializer(ProjectScopedSerializerMixin, serializers.ModelSerializer): + class Meta: + model = NewsletterIntake + fields = [ + "id", + "project", + "sender_email", + "subject", + "received_at", + "raw_html", + "raw_text", + "message_id", + "status", + "extraction_result", + "error_message", + ] + read_only_fields = ["id", "project", "received_at", "status", "extraction_result", "error_message"] diff --git a/core/settings_types.py b/core/settings_types.py index b06ac4b6..413570ec 100644 --- a/core/settings_types.py +++ b/core/settings_types.py @@ -2,6 +2,9 @@ class CoreSettings(Protocol): + CELERY_TASK_ALWAYS_EAGER: bool + DEFAULT_FROM_EMAIL: str + NEWSLETTER_API_BASE_URL: str QDRANT_URL: str EMBEDDING_MODEL: str EMBEDDING_PROVIDER: str @@ -11,3 +14,4 @@ class CoreSettings(Protocol): OPENROUTER_API_BASE: str OPENROUTER_APP_URL: str OPENROUTER_APP_NAME: str + RESEND_API_KEY: str diff --git a/core/signals.py b/core/signals.py new file mode 100644 index 00000000..8a4327f6 --- /dev/null +++ b/core/signals.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from anymail.signals import inbound +from django.dispatch import receiver + +from core.newsletters import process_inbound_newsletter + + +def _address_to_string(address) -> str: + if address is None: + return "" + addr_spec = getattr(address, "addr_spec", None) + if isinstance(addr_spec, str): + return addr_spec.strip() + return str(address).strip() + + +@receiver(inbound) +def handle_anymail_inbound(sender, event, esp_name, **kwargs): + message = event.message + + recipients: list[str] = [] + if message.envelope_recipient: + recipients.append(message.envelope_recipient) + recipients.extend( + address.addr_spec + for address in getattr(message, "to", []) + if getattr(address, "addr_spec", "") + ) + + process_inbound_newsletter( + recipients=recipients, + sender_email=message.envelope_sender or _address_to_string(getattr(message, "from_email", None)), + subject=message.subject or "", + raw_html=message.html or "", + raw_text=message.text or "", + message_id=str(message.get("Message-ID", "") or event.event_id or ""), + ) diff --git a/core/tasks.py b/core/tasks.py index eb1568c3..bed6c143 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -5,7 +5,16 @@ from django.utils import timezone from core.embeddings import upsert_content_embedding -from core.models import Content, IngestionRun, RunStatus, SourceConfig +from core.models import ( + Content, + IngestionRun, + IntakeAllowlist, + NewsletterIntake, + NewsletterIntakeStatus, + RunStatus, + SourceConfig, +) +from core.newsletter_extraction import extract_newsletter_items from core.pipeline import ( RELEVANCE_SKILL_NAME, SUMMARIZATION_SKILL_NAME, @@ -107,12 +116,75 @@ def _ingest_source_config(source_config: SourceConfig) -> tuple[int, int]: published_date=item.published_date, content_text=item.content_text, ) - upsert_content_embedding(content) - if settings.CELERY_TASK_ALWAYS_EAGER: - process_content(content.id) - else: - process_content.delay(content.id) + _schedule_content_processing(content) ingested_count += 1 source_config.last_fetched_at = timezone.now() source_config.save(update_fields=["last_fetched_at"]) return len(fetched_items), ingested_count + + +@shared_task(name="core.tasks.process_newsletter_intake") +def process_newsletter_intake(intake_id: int): + intake = NewsletterIntake.objects.select_related("project").get(pk=intake_id) + + allowlist = IntakeAllowlist.objects.filter( + project=intake.project, + sender_email=intake.sender_email, + confirmed_at__isnull=False, + ).first() + if allowlist is None: + intake.status = NewsletterIntakeStatus.PENDING + intake.error_message = "Sender has not confirmed newsletter intake." + intake.save(update_fields=["status", "error_message"]) + return {"status": intake.status, "items_ingested": 0} + + extracted_items = extract_newsletter_items( + subject=intake.subject, + raw_html=intake.raw_html, + raw_text=intake.raw_text, + ) + ingested_count = 0 + for item in extracted_items: + if Content.objects.filter(project=intake.project, url=item.url).exists(): + continue + content = Content.objects.create( + project=intake.project, + url=item.url, + title=item.title[:512], + author=intake.sender_email[:255], + source_plugin="newsletter", + published_date=timezone.now(), + content_text=item.excerpt or intake.raw_text, + source_metadata={ + "newsletter_intake_id": intake.id, + "sender_email": intake.sender_email, + "position": item.position, + }, + ) + _schedule_content_processing(content) + ingested_count += 1 + + intake.status = NewsletterIntakeStatus.EXTRACTED + intake.error_message = "" + intake.extraction_result = { + "method": "heuristic", + "items": [ + { + "url": item.url, + "title": item.title, + "excerpt": item.excerpt, + "position": item.position, + } + for item in extracted_items + ], + } + intake.save(update_fields=["status", "error_message", "extraction_result"]) + return {"status": intake.status, "items_ingested": ingested_count} + + +def _schedule_content_processing(content: Content) -> None: + upsert_content_embedding(content) + if settings.CELERY_TASK_ALWAYS_EAGER: + process_content(content.id) + else: + process_content.delay(content.id) diff --git a/core/tests/test_newsletters.py b/core/tests/test_newsletters.py new file mode 100644 index 00000000..068f4a02 --- /dev/null +++ b/core/tests/test_newsletters.py @@ -0,0 +1,355 @@ +import json +from base64 import b64encode +from datetime import datetime, timezone +from types import SimpleNamespace + +import pytest +from django.contrib.auth.models import Group +from django.core import mail +from django.urls import reverse +from svix.webhooks import Webhook + +from core.models import Content, IntakeAllowlist, NewsletterIntake, NewsletterIntakeStatus, Project +from core.newsletters import ( + extract_newsletter_items, + sanitize_newsletter_html, + send_confirmation_email, +) +from core.signals import handle_anymail_inbound + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def project(): + group = Group.objects.create(name="newsletter-team") + return Project.objects.create( + name="Newsletter Project", + group=group, + topic_description="Platform engineering", + intake_enabled=True, + ) + + +def test_sanitize_newsletter_html_removes_scripts_and_inline_handlers(): + sanitized = sanitize_newsletter_html('') + + assert "Read' + ) + + assert "alert(1)" not in sanitized + assert "