diff --git a/.env.example b/.env.example index 991d1d85..9c6cfdbe 100644 --- a/.env.example +++ b/.env.example @@ -34,3 +34,7 @@ CELERY_TASK_ALWAYS_EAGER=false DJANGO_SUPERUSER_USERNAME=admin DJANGO_SUPERUSER_EMAIL=admin@example.com DJANGO_SUPERUSER_PASSWORD=adminpass + +NEWSLETTER_API_BASE_URL=http://127.0.0.1:8080 +NEWSLETTER_API_USERNAME=admin +NEWSLETTER_API_PASSWORD=adminpass diff --git a/.github/dependabot.yml b/.github/dependabot.yml index b8b83eca..e474455f 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -10,6 +10,16 @@ updates: patterns: - "*" + - package-ecosystem: "npm" + directory: "/frontend" + schedule: + interval: "weekly" + open-pull-requests-limit: 5 + groups: + frontend-dependencies: + patterns: + - "*" + - package-ecosystem: "github-actions" directory: "/" schedule: diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index d4c379b2..4a03ddee 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -22,7 +22,7 @@ jobs: strategy: fail-fast: false matrix: - language: [python, actions] + language: [python, javascript-typescript, actions] steps: - name: Checkout repository diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 8cbb3439..d5b5c402 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -28,13 +28,18 @@ jobs: cache: pip cache-dependency-path: requirements.txt + - name: Set up Node.js + uses: actions/setup-node@v5 + with: + node-version: "22" + cache: npm + cache-dependency-path: frontend/package-lock.json + - name: Install just uses: extractions/setup-just@v4 - name: Install dependencies - run: | - python -m pip install --upgrade pip - python -m pip install -r requirements.txt + run: just install - name: Install pre-commit hooks run: pre-commit install --install-hooks diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b0003e78..cc504a15 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -28,13 +28,18 @@ jobs: cache: pip cache-dependency-path: requirements.txt + - name: Set up Node.js + uses: actions/setup-node@v5 + with: + node-version: "22" + cache: npm + cache-dependency-path: frontend/package-lock.json + - name: Install just uses: extractions/setup-just@v4 - name: Install dependencies - run: | - python -m pip install --upgrade pip - python -m pip install -r requirements.txt + run: just install - name: Run tests run: just test diff --git a/.gitignore b/.gitignore index 639cdcc9..187dccf9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,8 +6,11 @@ __pycache__/ .venv/ venv/ .env +frontend/.env.local celerybeat-schedule* db.sqlite3 staticfiles/ +frontend/.next/ +frontend/node_modules/ docs/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a2512cf8..c11fce88 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,3 +13,8 @@ repos: entry: just lint language: system pass_filenames: false + - id: just-test + name: just test + entry: just test + language: system + pass_filenames: false diff --git a/celerybeat-schedule b/celerybeat-schedule index 1089d88f..023c1aed 100644 Binary files a/celerybeat-schedule and b/celerybeat-schedule differ diff --git a/core/admin.py b/core/admin.py index 8128070d..33b66f8a 100644 --- a/core/admin.py +++ b/core/admin.py @@ -1,36 +1,44 @@ -from django.contrib import admin +import json + +from django.contrib import admin, messages +from django.db.models import Avg +from django.utils import timezone from django.utils.html import format_html +from django.utils.safestring import mark_safe from import_export.admin import ExportActionMixin +from unfold.admin import ModelAdmin from core.models import ( - Content, - Entity, - IngestionRun, - ReviewQueue, - SkillResult, - SourceConfig, - Tenant, - TenantConfig, - UserFeedback, + Content, + Entity, + IngestionRun, + ReviewQueue, + SkillResult, + SourceConfig, + Tenant, + TenantConfig, + UserFeedback, ) +from core.plugins import get_plugin_for_source_config, validate_plugin_config +from core.tasks import process_content @admin.register(Tenant) class TenantAdmin(ExportActionMixin, admin.ModelAdmin): - list_display = ("name", "user", "content_retention_days", "created_at") + list_display = ("name", "user", "content_retention_days", "created_at") - # Better navigation - date_hierarchy = "created_at" - list_filter = ("created_at",) + # Better navigation + date_hierarchy = "created_at" + list_filter = ("created_at",) - # Faster searching - search_fields = ("name", "user__username", "user__email") + # Faster searching + search_fields = ("name", "user__username", "user__email") - # Performance for large user lists - autocomplete_fields = ("user",) + # Performance for large user lists + autocomplete_fields = ("user",) - # Quick editing - list_editable = ("content_retention_days",) + # Quick editing + list_editable = ("content_retention_days",) @admin.register(TenantConfig) @@ -40,59 +48,576 @@ class TenantConfigAdmin(admin.ModelAdmin): @admin.register(Entity) class EntityAdmin(admin.ModelAdmin): - # Replace 'authority_score' with your new method name - list_display = ("name", "tenant", "type", "colored_score", "created_at") - - @admin.display(description="Authority Score", ordering="authority_score") - def colored_score(self, obj): - # Choose a color based on the value - if obj.authority_score >= 80: - color = "green" - elif obj.authority_score >= 50: - color = "orange" - else: - color = "red" + # Replace 'authority_score' with your new method name + list_display = ("name", "tenant", "type", "colored_score", "created_at") - return format_html( - '{}', - color, - obj.authority_score, + @admin.display(description="Authority Score", ordering="authority_score") + def colored_score(self, obj): + # Choose a color based on the value + if obj.authority_score >= 80: + color = "green" + elif obj.authority_score >= 50: + color = "orange" + else: + color = "red" + + return format_html( + '{}', + color, + obj.authority_score, + ) + + +class HighValueFilter(admin.SimpleListFilter): + title = 'Content Value' + parameter_name = 'value_tier' + + def lookups(self, request, model_admin): + return ( + ('high_value', '🔥 High Value (Score > 80 & Reference)'), ) + def queryset(self, request, queryset): + if self.value() == 'high_value': + return queryset.filter(relevance_score__gt=80, is_reference=True) + return queryset + @admin.register(Content) class ContentAdmin(admin.ModelAdmin): - list_display = ("title", "tenant", "source_plugin", "published_date", "relevance_score", "is_reference", "is_active") - list_filter = ("tenant", "source_plugin", "is_reference", "is_active") - search_fields = ("title", "author", "url") + list_display = ( + "display_relevance", + "is_active", + "is_reference", + "preview_content", + "source_plugin", + "tenant", + "title", + "view_trace", + ) + list_editable = ("is_reference", "is_active") + list_filter = ( + HighValueFilter, + ("tenant", admin.RelatedOnlyFieldListFilter), + "source_plugin", + "is_active" + ) + search_fields = ("title", "author", "url") + actions = ["generate_newsletter_ideas"] + + @admin.display(description="Preview") + def preview_content(self, obj): + """Adds a 'Quick Look' icon that shows the description in an Unfold modal.""" + if not obj.description: + return "-" + return format_html( + '🔍 View', + obj.description[:500] + ) + + @admin.display(description="AI Trace") + def view_trace(self, obj): + """Link to the latest external trace when present, otherwise the internal skill run history.""" + from urllib.parse import urlencode + + from django.conf import settings + from django.urls import reverse + + latest_skill_result = obj.skill_results.filter( + superseded_by__isnull=True, + ).order_by("-created_at").first() + if latest_skill_result is None: + return "-" + + result_data = latest_skill_result.result_data or {} + trace_sections = [result_data] + for section_name in ("trace", "langsmith", "langfuse", "observability", "telemetry"): + section = result_data.get(section_name) + if isinstance(section, dict): + trace_sections.append(section) + + trace_url = "" + trace_id = "" + for section in trace_sections: + for key in ("trace_url", "traceUrl", "langsmith_run_url", "langfuse_trace_url"): + value = section.get(key) + if isinstance(value, str) and value: + trace_url = value + break + if trace_url: + break + for key in ("trace_id", "traceId", "run_id", "runId", "langsmith_run_id", "langfuse_trace_id"): + value = section.get(key) + if isinstance(value, str) and value: + trace_id = value + break + + if not trace_url and trace_id and getattr(settings, "AI_TRACE_URL_TEMPLATE", ""): + trace_url = settings.AI_TRACE_URL_TEMPLATE.format( + content_id=obj.id, + run_id=trace_id, + skill_name=latest_skill_result.skill_name, + skill_result_id=latest_skill_result.id, + tenant_id=obj.tenant_id, + trace_id=trace_id, + ) + + if trace_url: + link_label = "📈 Trace" + link_title = f"Open external trace for {latest_skill_result.skill_name}" + else: + trace_url = "{}?{}".format( + reverse("admin:core_skillresult_changelist"), + urlencode({"content__id__exact": obj.id}), + ) + link_label = "🧠 Skill runs" + link_title = f"Open persisted skill runs for {obj.title}" + + return format_html( + '{}', + trace_url, + link_title, + link_label, + ) + + @admin.display(description="Score") + def display_relevance(self, obj): + if obj.relevance_score is None: + return "-" + color = "green" if obj.relevance_score > 75 else "orange" if obj.relevance_score > 40 else "red" + return format_html('{}%', color, obj.relevance_score) + + def changelist_view(self, request, extra_context=None): + queryset = self.get_queryset(request) + metrics = queryset.aggregate(avg_score=Avg('relevance_score')) + + extra_context = extra_context or {} + extra_context["dashboard_stats"] = [ + { + "title": "Avg Relevance", + "value": f"{metrics['avg_score'] or 0:.1f}%", + "icon": "insights", + "color": "success" if (metrics['avg_score'] or 0) > 70 else "warning", + }, + { + "title": "Total Filtered", + "value": queryset.count(), + "icon": "inventory_2", + }, + ] + + return super().changelist_view(request, extra_context=extra_context) + + @admin.action(description="Generate Ideas for Newsletter") + def generate_newsletter_ideas(self, request, queryset): + content_ids = list(queryset.values_list("id", flat=True)) + for content_id in content_ids: + process_content.delay(content_id) + self.message_user( + request, + f"Successfully queued the pipeline for {len(content_ids)} items.", + messages.SUCCESS, + ) @admin.register(SkillResult) -class SkillResultAdmin(admin.ModelAdmin): - list_display = ("skill_name", "content", "tenant", "status", "model_used", "created_at") - list_filter = ("status", "skill_name", "tenant") - search_fields = ("skill_name", "content__title", "model_used") +class SkillResultAdmin(ModelAdmin): + list_display = ( + "skill_name", + "get_content_link", + "display_status", + "display_performance", + "preview_json", + "is_current", + "model_used", + "created_at", + ) + list_filter = ("status", "skill_name", "tenant", "model_used") + search_fields = ("skill_name", "content__title", "model_used", "error_message") + actions = ["retry_selected_skills"] + readonly_fields = ("pretty_result_data", "latency_ms", "created_at", "superseded_by") + fieldsets = ( + ("Execution Details", { + "fields": ("skill_name", "content", "tenant", "status", "model_used") + }), + ("AI Output", { + "fields": ("pretty_result_data", "error_message"), + }), + ("Performance Metrics", { + "fields": ("latency_ms", "confidence", "created_at", "superseded_by"), + }), + ) + + @admin.action(description="Retry Selected Skills") + def retry_selected_skills(self, request, queryset): + """Resets status to PENDING and clears errors for retry by the worker.""" + updated = queryset.update(status="PENDING", error_message="") + self.message_user( + request, + f"Successfully reset {updated} skills to PENDING for retry.", + messages.SUCCESS + ) + + @admin.display(description="Result Preview") + def preview_json(self, obj): + """Link that triggers Unfold's detail view (can be opened in side-panel).""" + if not obj.result_data: + return "-" + return format_html( + '🔍 Preview', + f"{obj.pk}/change/" + ) + + @admin.display(description="Content") + def get_content_link(self, obj): + return obj.content.title[:30] + "..." if obj.content.title else "Untitled" + + @admin.display(description="Status") + def display_status(self, obj): + colors = {"COMPLETED": "green", "FAILED": "red", "PENDING": "orange"} + color = colors.get(obj.status, "gray") + return format_html( + '● {}', + color, obj.status + ) + + @admin.display(description="Perf / Conf") + def display_performance(self, obj): + latency = f"{obj.latency_ms}ms" if obj.latency_ms else "-" + conf = f"{int(obj.confidence * 100)}%" if obj.confidence is not None else "-" + return f"{latency} / {conf}" + + @admin.display(description="Current", boolean=True) + def is_current(self, obj): + return obj.superseded_by is None + + @admin.display(description="Result Data JSON") + def pretty_result_data(self, obj): + if not obj.result_data: + return "No data available" + formatted_json = json.dumps(obj.result_data, indent=4) + return mark_safe( + f'
'
+            f'{formatted_json}'
+            f'
' + ) + + def changelist_view(self, request, extra_context=None): + qs = self.get_queryset(request) + extra_context = extra_context or {} + metrics = qs.aggregate(avg_lat=Avg('latency_ms')) + avg_latency = metrics['avg_lat'] or 0 + failure_count = qs.filter(status='FAILED').count() + total_count = qs.count() or 1 + + extra_context["dashboard_stats"] = [ + { + "title": "Avg Latency", + "value": f"{avg_latency:.0f}ms", + "icon": "timer", + "color": "warning" if avg_latency > 2000 else "success", + }, + { + "title": "Failure Rate", + "value": f"{(failure_count / total_count) * 100:.1f}%", + "icon": "error", + "color": "danger" if failure_count > 0 else "success", + }, + ] + return super().changelist_view(request, extra_context=extra_context) @admin.register(UserFeedback) -class UserFeedbackAdmin(admin.ModelAdmin): - list_display = ("content", "tenant", "user", "feedback_type", "created_at") - list_filter = ("feedback_type", "tenant") +class UserFeedbackAdmin(ModelAdmin): + list_display = ( + "display_feedback", + "get_content_title", + "get_ai_score", + "tenant", + "user", + "created_at" + ) + list_filter = ("feedback_type", ("tenant", admin.RelatedOnlyFieldListFilter)) + search_fields = ("content__title", "user__email", "user__username") + + @admin.display(description="Type") + def display_feedback(self, obj): + if obj.feedback_type == "UPVOTE": + return format_html('👍') + return format_html('👎') + + @admin.display(description="Content Title") + def get_content_title(self, obj): + return obj.content.title[:50] + "..." + + @admin.display(description="AI Score") + def get_ai_score(self, obj): + """Displays the original AI score to compare with user feedback.""" + score = obj.content.relevance_score + if score is None: + return "-" + color = "green" if score > 75 else "red" if score < 40 else "orange" + return format_html('{}%', color, score) + + def changelist_view(self, request, extra_context=None): + qs = self.get_queryset(request) + extra_context = extra_context or {} + upvotes = qs.filter(feedback_type="UPVOTE").count() + total = qs.count() or 1 + approval_rate = (upvotes / total) * 100 + + extra_context["dashboard_stats"] = [ + { + "title": "Approval Rate", + "value": f"{approval_rate:.1f}%", + "icon": "thumb_up", + "color": "success" if approval_rate > 80 else "warning", + }, + { + "title": "Total Feedback", + "value": total, + "icon": "forum", + }, + ] + return super().changelist_view(request, extra_context=extra_context) @admin.register(IngestionRun) -class IngestionRunAdmin(admin.ModelAdmin): - list_display = ("plugin_name", "tenant", "status", "items_fetched", "items_ingested", "started_at") - list_filter = ("plugin_name", "status", "tenant") +class IngestionRunAdmin(ModelAdmin): + list_display = ( + "plugin_name", + "tenant", + "display_status", + "display_efficiency", + "display_duration", + "started_at", + ) + list_filter = ("plugin_name", "status", ("tenant", admin.RelatedOnlyFieldListFilter)) + search_fields = ("plugin_name", "error_message", "tenant__name") + readonly_fields = ("display_duration", "started_at", "completed_at") + fieldsets = ( + ("Run Info", { + "fields": ("plugin_name", "tenant", "status") + }), + ("Data Metrics", { + "fields": ("items_fetched", "items_ingested", "display_efficiency") + }), + ("Timing", { + "fields": ("started_at", "completed_at", "display_duration") + }), + ("Logs", { + "fields": ("error_message",), + "classes": ("collapse",) + }), + ) + + @admin.display(description="Status") + def display_status(self, obj): + colors = {"COMPLETED": "success", "FAILED": "danger", "RUNNING": "info"} + return format_html( + '{}', + colors.get(obj.status, "warning"), + obj.status + ) + + @admin.display(description="Efficiency (Ingested/Fetched)") + def display_efficiency(self, obj): + if obj.items_fetched == 0: + return "0/0" + percent = (obj.items_ingested / obj.items_fetched) * 100 + color = "green" if percent > 90 else "orange" if percent > 50 else "red" + return format_html( + '{} / {} ({:.0f}%)', + obj.items_ingested, obj.items_fetched, color, percent + ) + + @admin.display(description="Duration") + def display_duration(self, obj): + if not obj.completed_at: + return "In Progress..." + duration = obj.completed_at - obj.started_at + seconds = duration.total_seconds() + return f"{int(seconds // 60)}m {int(seconds % 60)}s" + + def changelist_view(self, request, extra_context=None): + qs = self.get_queryset(request) + extra_context = extra_context or {} + total_runs = qs.count() + failed_runs = qs.filter(status="FAILED").count() + total_ingested = sum(qs.values_list('items_ingested', flat=True)) + + extra_context["dashboard_stats"] = [ + { + "title": "Total Content Ingested", + "value": f"{total_ingested:,}", + "icon": "cloud_download", + }, + { + "title": "Success Rate", + "value": f"{((total_runs - failed_runs) / (total_runs or 1)) * 100:.1f}%", + "icon": "check_circle", + "color": "success" if failed_runs == 0 else "warning", + }, + ] + return super().changelist_view(request, extra_context=extra_context) @admin.register(SourceConfig) -class SourceConfigAdmin(admin.ModelAdmin): - list_display = ("plugin_name", "tenant", "is_active", "last_fetched_at") - list_filter = ("plugin_name", "is_active", "tenant") +class SourceConfigAdmin(ModelAdmin): + list_display = ( + "plugin_name", + "tenant", + "display_health", + "is_active", + "last_fetched_at", + ) + list_filter = ("is_active", "plugin_name", ("tenant", admin.RelatedOnlyFieldListFilter)) + list_editable = ("is_active",) + search_fields = ("plugin_name", "tenant__name") + actions = ["test_source_connection"] + readonly_fields = ("last_fetched_at", "pretty_config") + fieldsets = ( + ("Core Settings", { + "fields": ("plugin_name", "tenant", "is_active") + }), + ("Configuration", { + "fields": ("pretty_config", "config"), + }), + ("Activity", { + "fields": ("last_fetched_at",), + }), + ) + + @admin.display(description="Status") + def display_health(self, obj): + if not obj.is_active: + return format_html('● Paused') + + if obj.last_fetched_at: + hours_since = (timezone.now() - obj.last_fetched_at).total_seconds() / 3600 + if hours_since > 24: + return format_html('● Stale') + return format_html('● Healthy') + + return format_html('● Never Run') + + @admin.display(description="Config Preview") + def pretty_config(self, obj): + """Displays the JSON config in a readable format.""" + if not obj.config: + return "Empty" + formatted_json = json.dumps(obj.config, indent=4) + return mark_safe(f'
{formatted_json}
') + + @admin.action(description="Test Source Connectivity") + def test_source_connection(self, request, queryset): + """ + Custom action to trigger a dry-run fetch for the selected sources. + """ + healthy_sources = [] + failed_sources = [] + + for source_config in queryset.select_related("tenant"): + try: + source_config.config = validate_plugin_config( + source_config.plugin_name, + source_config.config, + ) + plugin = get_plugin_for_source_config(source_config) + if not plugin.health_check(): + raise RuntimeError("Health check returned an unhealthy status.") + except Exception as exc: + failed_sources.append(f"{source_config}: {exc}") + else: + healthy_sources.append(str(source_config)) + + if healthy_sources: + self.message_user( + request, + f"Connectivity check passed for {len(healthy_sources)} source(s).", + messages.SUCCESS, + ) + + if failed_sources: + self.message_user( + request, + "Connectivity check failed for: " + "; ".join(failed_sources), + messages.ERROR, + ) + + def changelist_view(self, request, extra_context=None): + qs = self.get_queryset(request) + extra_context = extra_context or {} + active_count = qs.filter(is_active=True).count() + total_count = qs.count() or 1 + + extra_context["dashboard_stats"] = [ + { + "title": "Active Sources", + "value": f"{active_count} / {total_count}", + "icon": "settings_input_component", + "color": "success" if active_count == total_count else "warning", + }, + { + "title": "Plugin Variety", + "value": qs.values('plugin_name').distinct().count(), + "icon": "extension", + }, + ] + return super().changelist_view(request, extra_context=extra_context) @admin.register(ReviewQueue) -class ReviewQueueAdmin(admin.ModelAdmin): - list_display = ("content", "tenant", "reason", "confidence", "resolved", "created_at") - list_filter = ("reason", "resolved", "tenant") +class ReviewQueueAdmin(ModelAdmin): + list_display = ( + "get_content_title", + "tenant", + "reason", + "display_confidence", + "resolved", + "resolution", + "created_at", + ) + list_filter = ("resolved", "reason", ("tenant", admin.RelatedOnlyFieldListFilter)) + list_editable = ("resolved", "resolution") + actions = ["mark_as_approved", "mark_as_rejected"] + + @admin.display(description="Content") + def get_content_title(self, obj): + return obj.content.title[:50] + "..." + + @admin.display(description="Confidence") + def display_confidence(self, obj): + color = "red" if obj.confidence < 0.3 else "orange" if obj.confidence < 0.6 else "green" + return format_html('{:.0f}%', color, obj.confidence * 100) + + @admin.action(description="Approve selected items") + def mark_as_approved(self, request, queryset): + queryset.update(resolved=True, resolution="APPROVED") + self.message_user(request, "Selected items approved.", messages.SUCCESS) + + @admin.action(description="Reject selected items") + def mark_as_rejected(self, request, queryset): + queryset.update(resolved=True, resolution="REJECTED") + self.message_user(request, "Selected items rejected.", messages.WARNING) + + def changelist_view(self, request, extra_context=None): + qs = self.get_queryset(request) + extra_context = extra_context or {} + pending_count = qs.filter(resolved=False).count() + avg_conf = qs.aggregate(avg_confidence=Avg("confidence"))["avg_confidence"] or 0 + + extra_context["dashboard_stats"] = [ + { + "title": "Pending Review", + "value": pending_count, + "icon": "pending_actions", + "color": "danger" if pending_count > 10 else "success", + }, + { + "title": "Avg Confidence", + "value": f"{avg_conf * 100:.0f}%", + "icon": "psychology", + }, + ] + return super().changelist_view(request, extra_context=extra_context) diff --git a/core/api.py b/core/api.py index 46782db9..71caa405 100644 --- a/core/api.py +++ b/core/api.py @@ -8,8 +8,10 @@ extend_schema_view, inline_serializer, ) -from rest_framework import serializers, viewsets +from rest_framework import serializers, status, viewsets +from rest_framework.decorators import action from rest_framework.exceptions import NotFound +from rest_framework.response import Response from core.models import ( Content, @@ -22,6 +24,13 @@ TenantConfig, UserFeedback, ) +from core.pipeline import ( + CLASSIFICATION_SKILL_NAME, + RELATED_CONTENT_SKILL_NAME, + RELEVANCE_SKILL_NAME, + SUMMARIZATION_SKILL_NAME, + execute_ad_hoc_skill, +) from core.serializers import ( ContentSerializer, EntitySerializer, @@ -33,6 +42,7 @@ TenantSerializer, UserFeedbackSerializer, ) +from core.tasks import queue_content_skill TENANT_ID_PARAMETER = OpenApiParameter( name="tenant_id", @@ -41,6 +51,16 @@ description="The unique ID of the tenant that owns this nested resource.", ) +SKILL_NAME_PARAMETER = OpenApiParameter( + name="skill_name", + type=str, + location=OpenApiParameter.PATH, + description=( + "The skill to run for this content item. Supported values: " + "content_classification, relevance_scoring, summarization, find_related." + ), +) + TENANT_CREATE_REQUEST_EXAMPLE = OpenApiExample( "Create Tenant Request", value={ @@ -498,6 +518,49 @@ class ContentViewSet(TenantOwnedQuerysetMixin, viewsets.ModelViewSet): serializer_class = ContentSerializer queryset = Content.objects.select_related("tenant", "entity") + @extend_schema( + summary="Run content skill", + description=( + "Run one ad hoc skill for the selected content item and persist the outcome as a SkillResult. " + "Supported skill names are content_classification, relevance_scoring, summarization, and find_related." + ), + tags=["AI Processing"], + parameters=[TENANT_ID_PARAMETER, SKILL_NAME_PARAMETER], + request=None, + responses={ + 201: SkillResultSerializer, + 202: SkillResultSerializer, + 403: AUTHENTICATION_REQUIRED_RESPONSE, + }, + ) + @action(detail=True, methods=["post"], url_path=r"skills/(?P[^/.]+)") + def run_skill(self, request, *args, **kwargs): + skill_name = str(kwargs["skill_name"]) + if skill_name not in { + CLASSIFICATION_SKILL_NAME, + RELEVANCE_SKILL_NAME, + SUMMARIZATION_SKILL_NAME, + RELATED_CONTENT_SKILL_NAME, + }: + raise serializers.ValidationError( + { + "skill_name": ( + "Unsupported skill. Choose one of: content_classification, relevance_scoring, " + "summarization, find_related." + ) + } + ) + + content = self.get_object() + if skill_name in {RELEVANCE_SKILL_NAME, SUMMARIZATION_SKILL_NAME}: + skill_result = queue_content_skill(content, skill_name) + serializer = SkillResultSerializer(skill_result, context=self.get_serializer_context()) + return Response(serializer.data, status=status.HTTP_202_ACCEPTED) + + skill_result = execute_ad_hoc_skill(content, skill_name) + serializer = SkillResultSerializer(skill_result, context=self.get_serializer_context()) + return Response(serializer.data, status=status.HTTP_201_CREATED) + @document_tenant_owned_viewset( resource_plural="skill results", diff --git a/core/pipeline.py b/core/pipeline.py index 521a94db..611b081a 100644 --- a/core/pipeline.py +++ b/core/pipeline.py @@ -8,7 +8,7 @@ from django.conf import settings from langgraph.graph import END, StateGraph -from core.embeddings import build_content_embedding_text, embed_text, get_reference_similarity +from core.embeddings import build_content_embedding_text, embed_text, get_reference_similarity, search_similar_content from core.llm import openrouter_chat_json from core.models import Content, ReviewQueue, ReviewReason, SkillResult, SkillStatus @@ -17,6 +17,8 @@ CLASSIFICATION_SKILL_NAME = "content_classification" RELEVANCE_SKILL_NAME = "relevance_scoring" SUMMARIZATION_SKILL_NAME = "summarization" +RELATED_CONTENT_SKILL_NAME = "find_related" +ASYNC_AD_HOC_SKILL_NAMES = frozenset({RELEVANCE_SKILL_NAME, SUMMARIZATION_SKILL_NAME}) CONTENT_TYPES = ( "technical_article", @@ -274,6 +276,173 @@ def run_summarization(content: Content) -> dict[str, Any]: } +def execute_ad_hoc_skill(content: Content, skill_name: str) -> SkillResult: + if skill_name == CLASSIFICATION_SKILL_NAME: + return _execute_ad_hoc_classification(content) + if skill_name == RELEVANCE_SKILL_NAME: + return _execute_ad_hoc_relevance(content) + if skill_name == SUMMARIZATION_SKILL_NAME: + return _execute_ad_hoc_summarization(content) + if skill_name == RELATED_CONTENT_SKILL_NAME: + return _execute_ad_hoc_related_content(content) + raise ValueError(f"Unsupported skill name: {skill_name}") + + +def create_pending_skill_result(content: Content, skill_name: str) -> SkillResult: + if skill_name not in ASYNC_AD_HOC_SKILL_NAMES: + raise ValueError(f"Unsupported async skill name: {skill_name}") + return _create_skill_result( + content, + skill_name=skill_name, + status=SkillStatus.PENDING, + ) + + +def execute_background_skill_result(skill_result_id: int, skill_name: str) -> SkillResult: + skill_result = SkillResult.objects.select_related("content", "content__tenant").get(pk=skill_result_id) + if skill_result.skill_name != skill_name: + raise ValueError( + f"Skill result {skill_result.id} is for {skill_result.skill_name}, not {skill_name}." + ) + + _update_skill_result(skill_result, status=SkillStatus.RUNNING, error_message="") + + try: + if skill_name == RELEVANCE_SKILL_NAME: + relevance, relevance_score = _run_ad_hoc_relevance(skill_result.content) + return _update_skill_result( + skill_result, + status=SkillStatus.COMPLETED, + result_data=relevance, + model_used=relevance["model_used"], + latency_ms=relevance["latency_ms"], + confidence=relevance_score, + error_message="", + ) + if skill_name == SUMMARIZATION_SKILL_NAME: + summary = _run_ad_hoc_summarization(skill_result.content) + return _update_skill_result( + skill_result, + status=SkillStatus.COMPLETED, + result_data=summary, + model_used=summary["model_used"], + latency_ms=summary["latency_ms"], + error_message="", + ) + except Exception as exc: + return _update_skill_result( + skill_result, + status=SkillStatus.FAILED, + result_data=None, + model_used="", + latency_ms=None, + confidence=None, + error_message=str(exc), + ) + + raise ValueError(f"Unsupported async skill name: {skill_name}") + + +def _execute_ad_hoc_classification(content: Content) -> SkillResult: + try: + classification = _execute_with_retries(CLASSIFICATION_SKILL_NAME, lambda: run_content_classification(content)) + content.content_type = classification["content_type"] + content.save(update_fields=["content_type"]) + if classification["confidence"] < settings.AI_CLASSIFICATION_REVIEW_THRESHOLD: + _upsert_review_queue_item( + content, + reason=ReviewReason.LOW_CONFIDENCE_CLASSIFICATION, + confidence=float(classification["confidence"]), + ) + return _create_skill_result( + content, + skill_name=CLASSIFICATION_SKILL_NAME, + status=SkillStatus.COMPLETED, + result_data=classification, + model_used=classification["model_used"], + latency_ms=classification["latency_ms"], + confidence=classification["confidence"], + ) + except Exception as exc: + return _create_failed_skill_result(content, skill_name=CLASSIFICATION_SKILL_NAME, error_message=str(exc)) + + +def _execute_ad_hoc_relevance(content: Content) -> SkillResult: + try: + relevance, relevance_score = _run_ad_hoc_relevance(content) + return _create_skill_result( + content, + skill_name=RELEVANCE_SKILL_NAME, + status=SkillStatus.COMPLETED, + result_data=relevance, + model_used=relevance["model_used"], + latency_ms=relevance["latency_ms"], + confidence=relevance_score, + ) + except Exception as exc: + return _create_failed_skill_result(content, skill_name=RELEVANCE_SKILL_NAME, error_message=str(exc)) + + +def _execute_ad_hoc_summarization(content: Content) -> SkillResult: + try: + summary = _run_ad_hoc_summarization(content) + return _create_skill_result( + content, + skill_name=SUMMARIZATION_SKILL_NAME, + status=SkillStatus.COMPLETED, + result_data=summary, + model_used=summary["model_used"], + latency_ms=summary["latency_ms"], + ) + except Exception as exc: + return _create_failed_skill_result(content, skill_name=SUMMARIZATION_SKILL_NAME, error_message=str(exc)) + + +def _execute_ad_hoc_related_content(content: Content) -> SkillResult: + try: + matches = search_similar_content(content, limit=5, is_reference=False) + related_items = [_serialize_related_match(match) for match in matches] + top_score = max((item["score"] for item in related_items), default=None) + return _create_skill_result( + content, + skill_name=RELATED_CONTENT_SKILL_NAME, + status=SkillStatus.COMPLETED, + result_data={ + "related_items": related_items, + "limit": 5, + }, + model_used=f"embedding:{settings.EMBEDDING_MODEL}", + latency_ms=0, + confidence=top_score, + ) + except Exception as exc: + return _create_failed_skill_result(content, skill_name=RELATED_CONTENT_SKILL_NAME, error_message=str(exc)) + + +def _run_ad_hoc_relevance(content: Content) -> tuple[dict[str, Any], float]: + relevance = _execute_with_retries(RELEVANCE_SKILL_NAME, lambda: run_relevance_scoring(content)) + relevance_score = float(relevance["relevance_score"]) + content.relevance_score = relevance_score + content.is_active = relevance_score >= settings.AI_RELEVANCE_REVIEW_THRESHOLD + content.save(update_fields=["relevance_score", "is_active"]) + if settings.AI_RELEVANCE_REVIEW_THRESHOLD <= relevance_score < settings.AI_RELEVANCE_SUMMARIZE_THRESHOLD: + _upsert_review_queue_item( + content, + reason=ReviewReason.BORDERLINE_RELEVANCE, + confidence=relevance_score, + ) + return relevance, relevance_score + + +def _run_ad_hoc_summarization(content: Content) -> dict[str, Any]: + if (content.relevance_score or 0.0) < settings.AI_RELEVANCE_SUMMARIZE_THRESHOLD: + raise ValueError( + "Summarization requires relevance_score >= " + f"{settings.AI_RELEVANCE_SUMMARIZE_THRESHOLD:.2f}. Run relevance scoring first or review the content." + ) + return _execute_with_retries(SUMMARIZATION_SKILL_NAME, lambda: run_summarization(content)) + + def _execute_with_retries(skill_name: str, fn): last_exc: Exception | None = None for attempt in range(settings.AI_MAX_NODE_RETRIES + 1): @@ -289,6 +458,18 @@ def _execute_with_retries(skill_name: str, fn): raise last_exc +def _serialize_related_match(match: Any) -> dict[str, Any]: + payload = dict(getattr(match, "payload", {}) or {}) + return { + "content_id": payload.get("content_id"), + "title": payload.get("title"), + "url": payload.get("url"), + "published_date": payload.get("published_date"), + "source_plugin": payload.get("source_plugin"), + "score": float(getattr(match, "score", 0.0)), + } + + def _heuristic_classification(content: Content) -> dict[str, Any]: text = f"{content.title}\n{content.content_text}".lower() keyword_sets = { @@ -386,3 +567,42 @@ def _create_skill_result( previous.superseded_by = skill_result previous.save(update_fields=["superseded_by"]) return skill_result + + +def _create_failed_skill_result(content: Content, *, skill_name: str, error_message: str) -> SkillResult: + return _create_skill_result( + content, + skill_name=skill_name, + status=SkillStatus.FAILED, + result_data=None, + error_message=error_message, + ) + + +def _update_skill_result( + skill_result: SkillResult, + *, + status: SkillStatus, + result_data: dict[str, Any] | None = None, + error_message: str = "", + model_used: str = "", + latency_ms: int | None = None, + confidence: float | None = None, +) -> SkillResult: + skill_result.status = status + skill_result.result_data = result_data + skill_result.error_message = error_message + skill_result.model_used = model_used + skill_result.latency_ms = latency_ms + skill_result.confidence = confidence + skill_result.save( + update_fields=[ + "status", + "result_data", + "error_message", + "model_used", + "latency_ms", + "confidence", + ] + ) + return skill_result diff --git a/core/tasks.py b/core/tasks.py index 5fc8e945..1ed8cb6a 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -6,7 +6,13 @@ from core.embeddings import upsert_content_embedding from core.models import Content, IngestionRun, RunStatus, SourceConfig -from core.pipeline import process_content_pipeline +from core.pipeline import ( + RELEVANCE_SKILL_NAME, + SUMMARIZATION_SKILL_NAME, + create_pending_skill_result, + execute_background_skill_result, + process_content_pipeline, +) from core.plugins import get_plugin_for_source_config logger = logging.getLogger(__name__) @@ -54,6 +60,36 @@ def process_content(content_id: int): return process_content_pipeline(content_id) +@shared_task(name="core.tasks.run_relevance_scoring_skill", ignore_result=True) +def run_relevance_scoring_skill(skill_result_id: int): + return execute_background_skill_result(skill_result_id, RELEVANCE_SKILL_NAME) + + +@shared_task(name="core.tasks.run_summarization_skill", ignore_result=True) +def run_summarization_skill(skill_result_id: int): + return execute_background_skill_result(skill_result_id, SUMMARIZATION_SKILL_NAME) + + +def queue_content_skill(content: Content, skill_name: str): + skill_result = create_pending_skill_result(content, skill_name) + + if skill_name == RELEVANCE_SKILL_NAME: + if settings.CELERY_TASK_ALWAYS_EAGER: + run_relevance_scoring_skill(skill_result.id) + else: + run_relevance_scoring_skill.delay(skill_result.id) + elif skill_name == SUMMARIZATION_SKILL_NAME: + if settings.CELERY_TASK_ALWAYS_EAGER: + run_summarization_skill(skill_result.id) + else: + run_summarization_skill.delay(skill_result.id) + else: + raise ValueError(f"Unsupported async skill name: {skill_name}") + + skill_result.refresh_from_db() + return skill_result + + def _ingest_source_config(source_config: SourceConfig) -> tuple[int, int]: plugin = get_plugin_for_source_config(source_config) fetched_items = plugin.fetch_new_content(source_config.last_fetched_at) diff --git a/core/tests/test_admin.py b/core/tests/test_admin.py new file mode 100644 index 00000000..31756aec --- /dev/null +++ b/core/tests/test_admin.py @@ -0,0 +1,106 @@ +from types import SimpleNamespace +from unittest.mock import ANY + +import pytest +from django.contrib import messages +from django.contrib.admin.sites import AdminSite +from django.utils import timezone + +from core.admin import ReviewQueueAdmin, SourceConfigAdmin +from core.models import Content, ReviewQueue, ReviewReason, SourceConfig, SourcePluginName, Tenant + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def source_admin_context(django_user_model): + user = django_user_model.objects.create_user(username="admin-owner", password="testpass123") + tenant = Tenant.objects.create(name="Admin Tenant", user=user, topic_description="Infra") + return SimpleNamespace(user=user, tenant=tenant) + + +def test_test_source_connection_reports_success(source_admin_context, mocker): + source_config = SourceConfig.objects.create( + tenant=source_admin_context.tenant, + plugin_name=SourcePluginName.RSS, + config={"feed_url": "https://example.com/feed.xml"}, + ) + plugin = mocker.Mock() + plugin.health_check.return_value = True + validate_mock = mocker.patch( + "core.admin.validate_plugin_config", + return_value={"feed_url": "https://example.com/feed.xml"}, + ) + get_plugin_mock = mocker.patch("core.admin.get_plugin_for_source_config", return_value=plugin) + admin_instance = SourceConfigAdmin(SourceConfig, AdminSite()) + admin_instance.message_user = mocker.Mock() + + admin_instance.test_source_connection( + request=SimpleNamespace(), + queryset=SourceConfig.objects.filter(pk=source_config.pk), + ) + + validate_mock.assert_called_once_with(SourcePluginName.RSS, {"feed_url": "https://example.com/feed.xml"}) + get_plugin_mock.assert_called_once() + plugin.health_check.assert_called_once_with() + admin_instance.message_user.assert_called_once_with( + ANY, + "Connectivity check passed for 1 source(s).", + messages.SUCCESS, + ) + + +def test_test_source_connection_reports_failures(source_admin_context, mocker): + source_config = SourceConfig.objects.create( + tenant=source_admin_context.tenant, + plugin_name=SourcePluginName.RSS, + config={"feed_url": "https://example.com/feed.xml"}, + ) + mocker.patch( + "core.admin.validate_plugin_config", + side_effect=ValueError("Missing required config field: feed_url"), + ) + admin_instance = SourceConfigAdmin(SourceConfig, AdminSite()) + admin_instance.message_user = mocker.Mock() + + admin_instance.test_source_connection( + request=SimpleNamespace(), + queryset=SourceConfig.objects.filter(pk=source_config.pk), + ) + + admin_instance.message_user.assert_called_once_with( + ANY, + "Connectivity check failed for: rss source for Admin Tenant: Missing required config field: feed_url", + messages.ERROR, + ) + + +def test_review_queue_changelist_view_builds_dashboard_stats(source_admin_context, mocker): + content = Content.objects.create( + tenant=source_admin_context.tenant, + url="https://example.com/review-item", + title="Review Item", + author="Reviewer", + source_plugin=SourcePluginName.RSS, + published_date=timezone.now(), + content_text="Review queue content", + ) + ReviewQueue.objects.create( + tenant=source_admin_context.tenant, + content=content, + reason=ReviewReason.BORDERLINE_RELEVANCE, + confidence=0.42, + resolved=False, + ) + admin_instance = ReviewQueueAdmin(ReviewQueue, AdminSite()) + mocker.patch.object(admin_instance, "get_queryset", return_value=ReviewQueue.objects.all()) + super_changelist_view = mocker.patch( + "core.admin.ModelAdmin.changelist_view", + side_effect=lambda request, extra_context=None: extra_context, + ) + + response = admin_instance.changelist_view(request=SimpleNamespace()) + + super_changelist_view.assert_called_once() + assert response["dashboard_stats"][0]["value"] == 1 + assert response["dashboard_stats"][1]["value"] == "42%" diff --git a/core/tests/test_api.py b/core/tests/test_api.py index 33ed6e20..5e8e1349 100644 --- a/core/tests/test_api.py +++ b/core/tests/test_api.py @@ -1,3 +1,6 @@ +from types import SimpleNamespace +from unittest.mock import patch + from django.contrib.auth import get_user_model from django.urls import reverse from rest_framework import status @@ -185,6 +188,71 @@ def test_content_create_uses_tenant_from_url(self): created_content = Content.objects.get(title="New Content") self.assertEqual(created_content.tenant, self.owner_tenant) + @patch("core.tasks.run_relevance_scoring_skill.delay") + def test_content_skill_action_queues_relevance_scoring(self, run_relevance_scoring_delay_mock): + + response = self.client.post( + f"/api/v1/tenants/{self.owner_tenant.id}/contents/{self.owner_content.id}/skills/relevance_scoring/", + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) + pending_result = SkillResult.objects.get( + content=self.owner_content, + skill_name="relevance_scoring", + superseded_by__isnull=True, + ) + run_relevance_scoring_delay_mock.assert_called_once_with(pending_result.id) + self.owner_content.refresh_from_db() + self.assertIsNone(self.owner_content.relevance_score) + self.assertEqual(response.json()["skill_name"], "relevance_scoring") + self.assertEqual(response.json()["status"], SkillStatus.PENDING) + + @patch("core.tasks.run_summarization_skill.delay") + def test_content_skill_action_queues_summarization(self, run_summarization_delay_mock): + self.owner_content.relevance_score = 0.25 + self.owner_content.save(update_fields=["relevance_score"]) + + response = self.client.post( + f"/api/v1/tenants/{self.owner_tenant.id}/contents/{self.owner_content.id}/skills/summarization/", + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) + pending_result = SkillResult.objects.get( + content=self.owner_content, + skill_name="summarization", + superseded_by__isnull=True, + ) + run_summarization_delay_mock.assert_called_once_with(pending_result.id) + self.assertEqual(response.json()["skill_name"], "summarization") + self.assertEqual(response.json()["status"], SkillStatus.PENDING) + + @patch("core.pipeline.search_similar_content") + def test_content_skill_action_runs_find_related(self, search_similar_content_mock): + search_similar_content_mock.return_value = [ + SimpleNamespace( + score=0.91, + payload={ + "content_id": self.other_content.id, + "title": self.other_content.title, + "url": self.other_content.url, + "published_date": self.other_content.published_date, + "source_plugin": self.other_content.source_plugin, + }, + ) + ] + + response = self.client.post( + f"/api/v1/tenants/{self.owner_tenant.id}/contents/{self.owner_content.id}/skills/find_related/", + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + self.assertEqual(response.json()["skill_name"], "find_related") + self.assertEqual(response.json()["status"], SkillStatus.COMPLETED) + self.assertEqual(response.json()["result_data"]["related_items"][0]["content_id"], self.other_content.id) + def test_authenticated_nested_list_endpoints_smoke(self): list_endpoints = [ reverse("v1:tenant-config-list", kwargs={"tenant_id": self.owner_tenant.id}), diff --git a/core/tests/test_tasks.py b/core/tests/test_tasks.py index 02de1c52..95b91423 100644 --- a/core/tests/test_tasks.py +++ b/core/tests/test_tasks.py @@ -3,8 +3,24 @@ import pytest -from core.models import Content, Entity, IngestionRun, RunStatus, SourceConfig, SourcePluginName, Tenant -from core.tasks import run_all_ingestions, run_ingestion +from core.models import ( + Content, + Entity, + IngestionRun, + RunStatus, + SkillStatus, + SourceConfig, + SourcePluginName, + Tenant, +) +from core.pipeline import RELEVANCE_SKILL_NAME, SUMMARIZATION_SKILL_NAME +from core.tasks import ( + queue_content_skill, + run_all_ingestions, + run_ingestion, + run_relevance_scoring_skill, + run_summarization_skill, +) pytestmark = pytest.mark.django_db @@ -167,3 +183,83 @@ def test_run_ingestion_marks_failure_when_plugin_errors(source_plugin_context, m ingestion_run = IngestionRun.objects.get(tenant=source_plugin_context.tenant, plugin_name=SourcePluginName.RSS) assert ingestion_run.status == RunStatus.FAILED assert ingestion_run.error_message == "feed unavailable" + + +def test_queue_content_skill_enqueues_relevance_task(source_plugin_context, mocker): + content = Content.objects.create( + tenant=source_plugin_context.tenant, + entity=source_plugin_context.entity, + url="https://example.com/manual-content", + title="Manual Content", + author="Author", + source_plugin=SourcePluginName.RSS, + published_date="2026-04-20T12:00:00Z", + content_text="Manual content body", + ) + delay_mock = mocker.patch("core.tasks.run_relevance_scoring_skill.delay") + + skill_result = queue_content_skill(content, RELEVANCE_SKILL_NAME) + + assert skill_result.status == SkillStatus.PENDING + delay_mock.assert_called_once_with(skill_result.id) + + +def test_run_relevance_scoring_skill_updates_pending_result(source_plugin_context, mocker): + content = Content.objects.create( + tenant=source_plugin_context.tenant, + entity=source_plugin_context.entity, + url="https://example.com/relevance-content", + title="Relevance Content", + author="Author", + source_plugin=SourcePluginName.RSS, + published_date="2026-04-20T12:00:00Z", + content_text="Manual content body", + ) + mocker.patch( + "core.pipeline.run_relevance_scoring", + return_value={ + "relevance_score": 0.82, + "explanation": "Strong match for the tenant topic.", + "used_llm": False, + "model_used": "embedding:test", + "latency_ms": 0, + }, + ) + delay_mock = mocker.patch("core.tasks.run_relevance_scoring_skill.delay") + + pending_result = queue_content_skill(content, RELEVANCE_SKILL_NAME) + delay_mock.assert_called_once_with(pending_result.id) + + result = run_relevance_scoring_skill(pending_result.id) + + content.refresh_from_db() + pending_result.refresh_from_db() + assert result.status == SkillStatus.COMPLETED + assert pending_result.status == SkillStatus.COMPLETED + assert content.relevance_score == pytest.approx(0.82) + assert content.is_active is True + + +def test_run_summarization_skill_marks_result_failed_when_relevance_is_too_low(source_plugin_context, mocker): + content = Content.objects.create( + tenant=source_plugin_context.tenant, + entity=source_plugin_context.entity, + url="https://example.com/summary-content", + title="Summary Content", + author="Author", + source_plugin=SourcePluginName.RSS, + published_date="2026-04-20T12:00:00Z", + content_text="Manual content body", + relevance_score=0.25, + ) + delay_mock = mocker.patch("core.tasks.run_summarization_skill.delay") + + pending_result = queue_content_skill(content, SUMMARIZATION_SKILL_NAME) + delay_mock.assert_called_once_with(pending_result.id) + + result = run_summarization_skill(pending_result.id) + + pending_result.refresh_from_db() + assert result.status == SkillStatus.FAILED + assert pending_result.status == SkillStatus.FAILED + assert "Summarization requires relevance_score" in pending_result.error_message diff --git a/docker-compose.yml b/docker-compose.yml index 464a834c..60f3056d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -98,7 +98,26 @@ services: volumes: - ./docker/nginx/default.conf:/etc/nginx/conf.d/default.conf:ro + frontend: + image: node:22-alpine + working_dir: /app/frontend + command: ["sh", "-lc", "npm install && npm run dev -- --hostname 0.0.0.0 --port 3000"] + env_file: + - .env + environment: + NEWSLETTER_API_BASE_URL: http://nginx + NEXT_TELEMETRY_DISABLED: "1" + depends_on: + nginx: + condition: service_started + ports: + - "3000:3000" + volumes: + - ./frontend:/app/frontend + - frontend_node_modules:/app/frontend/node_modules + volumes: postgres_data: redis_data: qdrant_data: + frontend_node_modules: diff --git a/frontend/.env.example b/frontend/.env.example new file mode 100644 index 00000000..47ec6d04 --- /dev/null +++ b/frontend/.env.example @@ -0,0 +1,5 @@ +# Copy this file to .env.local when running the Next.js app outside Docker. +NEWSLETTER_API_BASE_URL=http://127.0.0.1:8080 +NEWSLETTER_API_USERNAME=admin +NEWSLETTER_API_PASSWORD=adminpass +NEXT_TELEMETRY_DISABLED=1 diff --git a/frontend/.eslintrc b/frontend/.eslintrc new file mode 100644 index 00000000..e69de29b diff --git a/frontend/.prettierignore b/frontend/.prettierignore new file mode 100644 index 00000000..8fe21205 --- /dev/null +++ b/frontend/.prettierignore @@ -0,0 +1,5 @@ +.next +node_modules +package-lock.json +next-env.d.ts +tsconfig.tsbuildinfo diff --git a/frontend/.prettierrc b/frontend/.prettierrc new file mode 100644 index 00000000..cce9d3c0 --- /dev/null +++ b/frontend/.prettierrc @@ -0,0 +1,3 @@ +{ + "semi": false +} diff --git a/frontend/app/admin/health/page.tsx b/frontend/app/admin/health/page.tsx new file mode 100644 index 00000000..b88264e8 --- /dev/null +++ b/frontend/app/admin/health/page.tsx @@ -0,0 +1,157 @@ +import { AppShell } from "@/components/app-shell" +import { StatusBadge } from "@/components/status-badge" +import { + getTenantIngestionRuns, + getTenants, + getTenantSourceConfigs, +} from "@/lib/api" +import type { HealthStatus } from "@/lib/types" +import { formatDate, healthTone, selectTenant } from "@/lib/view-helpers" + +type HealthPageProps = { + searchParams: Promise> +} + +const panelClass = + "rounded-3xl border border-[#1f2b27]/12 bg-[rgba(255,250,244,0.86)] p-5 shadow-[0_24px_60px_rgba(35,30,22,0.12)] backdrop-blur-xl" +const emptyStateClass = + "rounded-[18px] bg-[#1f2b27]/6 px-4 py-4 text-sm leading-6 text-[#5d6d67]" +const metaRowClass = "mt-2 flex flex-wrap gap-2 text-sm text-[#5d6d67]" + +function deriveSourceStatus( + isActive: boolean, + latestRunStatus: string | null, + lastFetchedAt: string | null, +): HealthStatus { + if (!isActive) { + return "idle" + } + if (latestRunStatus === "failed") { + return "failing" + } + if (latestRunStatus === "running") { + return "degraded" + } + if (!lastFetchedAt) { + return "degraded" + } + return "healthy" +} + +export default async function HealthPage({ searchParams }: HealthPageProps) { + const resolvedSearchParams = await searchParams + const tenants = await getTenants() + const selectedTenant = selectTenant(tenants, resolvedSearchParams) + + if (!selectedTenant) { + return ( + +
+ Create a tenant first in Django admin. +
+
+ ) + } + + const [sourceConfigs, ingestionRuns] = await Promise.all([ + getTenantSourceConfigs(selectedTenant.id), + getTenantIngestionRuns(selectedTenant.id), + ]) + + const latestRunByPlugin = new Map() + for (const ingestionRun of ingestionRuns) { + if (!latestRunByPlugin.has(ingestionRun.plugin_name)) { + latestRunByPlugin.set(ingestionRun.plugin_name, ingestionRun) + } + } + + return ( + +
+
+ + + + + + + + + + + + + {sourceConfigs.length === 0 ? ( + + + + ) : null} + {sourceConfigs.map((sourceConfig) => { + const latestRun = + latestRunByPlugin.get(sourceConfig.plugin_name) ?? null + const status = deriveSourceStatus( + sourceConfig.is_active, + latestRun?.status ?? null, + sourceConfig.last_fetched_at, + ) + return ( + + + + + + + + + ) + })} + +
SourceStatusLast fetchLatest runItemsErrors
+
+ No source configurations exist for this tenant yet. +
+
+ + {sourceConfig.plugin_name} + +
+ Config #{sourceConfig.id} + + {sourceConfig.is_active ? "active" : "disabled"} + +
+
+ + {status} + + + {formatDate(sourceConfig.last_fetched_at)} + + {latestRun + ? `${latestRun.status} at ${formatDate(latestRun.started_at)}` + : "No runs yet"} + + {latestRun + ? `${latestRun.items_ingested}/${latestRun.items_fetched}` + : "0/0"} + + {latestRun?.error_message || "-"} +
+
+
+
+ ) +} diff --git a/frontend/app/admin/sources/page.tsx b/frontend/app/admin/sources/page.tsx new file mode 100644 index 00000000..82d2c54c --- /dev/null +++ b/frontend/app/admin/sources/page.tsx @@ -0,0 +1,233 @@ +import { AppShell } from "@/components/app-shell" +import { StatusBadge } from "@/components/status-badge" +import { + getTenantIngestionRuns, + getTenants, + getTenantSourceConfigs, +} from "@/lib/api" +import { + formatDate, + getErrorMessage, + getSuccessMessage, + selectTenant, +} from "@/lib/view-helpers" + +type SourcesPageProps = { + searchParams: Promise> +} + +const panelClass = + "rounded-3xl border border-[#1f2b27]/12 bg-[rgba(255,250,244,0.86)] p-5 shadow-[0_24px_60px_rgba(35,30,22,0.12)] backdrop-blur-xl" +const eyebrowClass = "m-0 text-[0.78rem] uppercase tracking-[0.12em] opacity-70" +const emptyStateClass = + "rounded-[18px] bg-[#1f2b27]/6 px-4 py-4 text-sm leading-6 text-[#5d6d67]" +const errorBannerClass = + "rounded-[18px] bg-[#c55f4d]/14 px-4 py-4 text-sm leading-6 text-[#7c3023]" +const metaRowClass = "flex flex-wrap gap-2 text-sm text-[#5d6d67]" +const inputClass = + "w-full rounded-2xl border border-[#1f2b27]/12 bg-white/70 px-4 py-3 text-[#1f2b27] outline-none transition focus:border-[#156f68]/40 focus:ring-2 focus:ring-[#156f68]/15" +const labelClass = "grid gap-2" +const labelTextClass = "text-sm font-medium text-[#1f2b27]" +const primaryButtonClass = + "inline-flex min-h-11 items-center justify-center rounded-full bg-[linear-gradient(135deg,#156f68,#1d8d83)] px-4 py-3 text-sm font-medium text-white transition hover:brightness-105 disabled:cursor-not-allowed disabled:opacity-50" + +export default async function SourcesPage({ searchParams }: SourcesPageProps) { + const resolvedSearchParams = await searchParams + const tenants = await getTenants() + const selectedTenant = selectTenant(tenants, resolvedSearchParams) + + if (!selectedTenant) { + return ( + +
+ Create a tenant first in Django admin. +
+
+ ) + } + + const [sourceConfigs, ingestionRuns] = await Promise.all([ + getTenantSourceConfigs(selectedTenant.id), + getTenantIngestionRuns(selectedTenant.id), + ]) + const latestRunByPlugin = new Map() + for (const ingestionRun of ingestionRuns) { + if (!latestRunByPlugin.has(ingestionRun.plugin_name)) { + latestRunByPlugin.set(ingestionRun.plugin_name, ingestionRun) + } + } + + const errorMessage = getErrorMessage(resolvedSearchParams) + const successMessage = getSuccessMessage(resolvedSearchParams) + + return ( + + {errorMessage ? ( +
{errorMessage}
+ ) : null} + {successMessage ? ( +
{successMessage}
+ ) : null} + +
+
+

Add source

+
+ + + +