diff --git a/backend/reviews/admin.py b/backend/reviews/admin.py
index b5fb828c70..ffc35f397e 100644
--- a/backend/reviews/admin.py
+++ b/backend/reviews/admin.py
@@ -17,6 +17,20 @@
from users.admin_mixins import ConferencePermissionMixin
+def get_accepted_submissions(conference):
+ return (
+ Submission.objects.filter(conference=conference)
+ .filter(
+ Q(pending_status=Submission.STATUS.accepted)
+ | Q(pending_status__isnull=True, status=Submission.STATUS.accepted)
+ | Q(pending_status="", status=Submission.STATUS.accepted)
+ )
+ .select_related("speaker", "type", "audience_level")
+ .prefetch_related("languages")
+ .order_by("id")
+ )
+
+
class AvailableScoreOptionInline(admin.TabularInline):
model = AvailableScoreOption
@@ -366,16 +380,7 @@ def review_shortlist_view(self, request, review_session_id):
return TemplateResponse(request, adapter.shortlist_template, context)
def _get_accepted_submissions(self, conference):
- return (
- Submission.objects.filter(conference=conference)
- .filter(
- Q(pending_status=Submission.STATUS.accepted)
- | Q(pending_status__isnull=True, status=Submission.STATUS.accepted)
- | Q(pending_status="", status=Submission.STATUS.accepted)
- )
- .select_related("speaker", "type", "audience_level")
- .prefetch_related("languages")
- )
+ return get_accepted_submissions(conference)
def review_recap_view(self, request, review_session_id):
review_session = ReviewSession.objects.get(id=review_session_id)
@@ -448,49 +453,51 @@ def review_recap_compute_analysis_view(self, request, review_session_id):
raise PermissionDenied()
conference = review_session.conference
- accepted_submissions = self._get_accepted_submissions(conference)
+ accepted_submissions = list(self._get_accepted_submissions(conference))
force_recompute = request.GET.get("recompute") == "1"
- from reviews.similar_talks import compute_similar_talks, compute_topic_clusters
+ from django.core.cache import cache
- similar_talks = compute_similar_talks(
- accepted_submissions,
- top_n=5,
- conference_id=conference.id,
- force_recompute=force_recompute,
- )
+ from pycon.tasks import check_pending_heavy_processing_work
+ from reviews.cache_keys import get_cache_key
+ from reviews.tasks import compute_recap_analysis
- topic_clusters = compute_topic_clusters(
- accepted_submissions,
- min_topic_size=3,
- conference_id=conference.id,
- force_recompute=force_recompute,
+ combined_cache_key = get_cache_key(
+ "recap_analysis", conference.id, accepted_submissions
)
- # Build submissions list with similar talks, sorted by highest similarity
- submissions_list = sorted(
- [
- {
- "id": s.id,
- "title": str(s.title),
- "type": s.type.name,
- "speaker": s.speaker.display_name if s.speaker else "Unknown",
- "similar": similar_talks.get(s.id, []),
- }
- for s in accepted_submissions
- ],
- key=lambda x: max(
- (item["similarity"] for item in x["similar"]), default=0
- ),
- reverse=True,
- )
+ if not force_recompute:
+ cached_result = cache.get(combined_cache_key)
+ if cached_result is not None:
+ return JsonResponse(cached_result)
- return JsonResponse(
- {
- "submissions_list": submissions_list,
- "topic_clusters": topic_clusters,
- }
- )
+ # Use cache.add as a lock to prevent duplicate task dispatch.
+ # Short TTL so lock auto-expires if the worker is killed before cleanup.
+ computing_key = f"{combined_cache_key}:computing"
+
+ # Check for stale lock from a crashed/finished task
+ existing_task_id = cache.get(computing_key)
+ if existing_task_id:
+ from celery.result import AsyncResult
+
+ if AsyncResult(existing_task_id).state in (
+ "SUCCESS",
+ "FAILURE",
+ "REVOKED",
+ ):
+ cache.delete(computing_key)
+
+ if cache.add(computing_key, "pending", timeout=300):
+ result = compute_recap_analysis.apply_async(
+ args=[conference.id, combined_cache_key],
+ kwargs={"force_recompute": force_recompute},
+ queue="heavy_processing",
+ )
+ # Store task ID so subsequent requests can detect stale locks
+ cache.set(computing_key, result.id, timeout=300)
+ check_pending_heavy_processing_work.delay()
+
+ return JsonResponse({"status": "processing"})
def review_view(self, request, review_session_id, review_item_id):
review_session = ReviewSession.objects.get(id=review_session_id)
diff --git a/backend/reviews/cache_keys.py b/backend/reviews/cache_keys.py
new file mode 100644
index 0000000000..57334cdbaa
--- /dev/null
+++ b/backend/reviews/cache_keys.py
@@ -0,0 +1,21 @@
+import hashlib
+
+
+def get_embedding_text(submission) -> str:
+ """Combine title, elevator_pitch, and abstract into a single string for embedding."""
+ title = str(submission.title) if submission.title else ""
+ elevator_pitch = str(submission.elevator_pitch) if submission.elevator_pitch else ""
+ abstract = (
+ str(submission.abstract)
+ if hasattr(submission, "abstract") and submission.abstract
+ else ""
+ )
+ return f"{title}. {elevator_pitch}. {abstract}"
+
+
+def get_cache_key(prefix: str, conference_id: int, submissions) -> str:
+ """Generate a cache key based on conference and submission content."""
+ content_hash = hashlib.md5()
+ for s in sorted(submissions, key=lambda x: x.id):
+ content_hash.update(f"{s.id}:{get_embedding_text(s)}".encode())
+ return f"{prefix}:conf_{conference_id}:{content_hash.hexdigest()}"
diff --git a/backend/reviews/similar_talks.py b/backend/reviews/similar_talks.py
index 570dc8259b..4cc63ed8f0 100644
--- a/backend/reviews/similar_talks.py
+++ b/backend/reviews/similar_talks.py
@@ -1,5 +1,4 @@
import functools
-import hashlib
import logging
import nltk
@@ -12,6 +11,9 @@
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
+from reviews.cache_keys import get_cache_key as _get_cache_key
+from reviews.cache_keys import get_embedding_text
+
logger = logging.getLogger(__name__)
CACHE_TIMEOUT = 60 * 60 * 24 # 24 hours
@@ -222,18 +224,6 @@ def get_embedding_model():
return SentenceTransformer("all-MiniLM-L6-v2", token=False)
-def get_embedding_text(submission) -> str:
- """Combine title, elevator_pitch, and abstract into a single string for embedding."""
- title = str(submission.title) if submission.title else ""
- elevator_pitch = str(submission.elevator_pitch) if submission.elevator_pitch else ""
- abstract = (
- str(submission.abstract)
- if hasattr(submission, "abstract") and submission.abstract
- else ""
- )
- return f"{title}. {elevator_pitch}. {abstract}"
-
-
def _get_submission_languages(submissions) -> set[str]:
"""Extract all unique language codes from submissions."""
language_codes = set()
@@ -245,14 +235,6 @@ def _get_submission_languages(submissions) -> set[str]:
return language_codes or {"en"}
-def _get_cache_key(prefix: str, conference_id: int, submissions) -> str:
- """Generate a cache key based on conference and submission content."""
- content_hash = hashlib.md5()
- for s in sorted(submissions, key=lambda x: x.id):
- content_hash.update(f"{s.id}:{get_embedding_text(s)}".encode())
- return f"{prefix}:conf_{conference_id}:{content_hash.hexdigest()}"
-
-
def compute_similar_talks(
submissions, top_n=5, conference_id=None, force_recompute=False
):
diff --git a/backend/reviews/tasks.py b/backend/reviews/tasks.py
new file mode 100644
index 0000000000..d10eee7b69
--- /dev/null
+++ b/backend/reviews/tasks.py
@@ -0,0 +1,86 @@
+import logging
+
+from pycon.celery import app
+
+logger = logging.getLogger(__name__)
+
+RESULT_CACHE_TTL = 60 * 60 * 24 # 24 hours
+ERROR_CACHE_TTL = 60 * 2 # 2 minutes
+
+
+@app.task
+def compute_recap_analysis(conference_id, combined_cache_key, force_recompute=False):
+ from django.core.cache import cache
+
+ from conferences.models import Conference
+ from reviews.admin import get_accepted_submissions
+ from reviews.similar_talks import (
+ compute_similar_talks,
+ compute_topic_clusters,
+ )
+
+ try:
+ conference = Conference.objects.get(id=conference_id)
+ except Conference.DoesNotExist:
+ logger.error(
+ "Conference %s not found for recap analysis", conference_id
+ )
+ cache.delete(f"{combined_cache_key}:computing")
+ return
+
+ accepted_submissions = list(get_accepted_submissions(conference))
+
+ try:
+ # Pass conference_id=None to skip individual function caching;
+ # the combined result is cached under combined_cache_key instead.
+ similar_talks = compute_similar_talks(
+ accepted_submissions,
+ top_n=5,
+ conference_id=None,
+ force_recompute=force_recompute,
+ )
+
+ topic_clusters = compute_topic_clusters(
+ accepted_submissions,
+ min_topic_size=3,
+ conference_id=None,
+ force_recompute=force_recompute,
+ )
+
+ submissions_list = sorted(
+ [
+ {
+ "id": s.id,
+ "title": str(s.title),
+ "type": s.type.name,
+ "speaker": s.speaker.display_name if s.speaker else "Unknown",
+ "similar": similar_talks.get(s.id, []),
+ }
+ for s in accepted_submissions
+ ],
+ key=lambda x: max(
+ (item["similarity"] for item in x["similar"]), default=0
+ ),
+ reverse=True,
+ )
+
+ result = {
+ "submissions_list": submissions_list,
+ "topic_clusters": topic_clusters,
+ }
+
+ cache.set(combined_cache_key, result, RESULT_CACHE_TTL)
+
+ return result
+ except Exception:
+ logger.exception(
+ "Failed to compute recap analysis for conference %s", conference_id
+ )
+ cache.set(
+ combined_cache_key,
+ {"status": "error", "message": "Analysis failed. Please try again."},
+ ERROR_CACHE_TTL,
+ )
+ raise
+ finally:
+ cache.delete(f"{combined_cache_key}:computing")
diff --git a/backend/reviews/templates/reviews-recap.html b/backend/reviews/templates/reviews-recap.html
index 6cf704c797..8c8e2bbe24 100644
--- a/backend/reviews/templates/reviews-recap.html
+++ b/backend/reviews/templates/reviews-recap.html
@@ -570,6 +570,82 @@
🔗 Similar Talks
section.style.display = '';
}
+ var pollTimer = null;
+ var pollStartTime = null;
+ var pollAttempt = 0;
+ var POLL_TIMEOUT = 360000; // 6 minutes – must exceed backend lock TTL (300s)
+
+ function getNextPollInterval() {
+ // Exponential backoff: 1s, 2s, 3s, 5s, 10s, 15s, 15s...
+ var intervals = [1000, 2000, 3000, 5000, 10000, 15000];
+ return intervals[Math.min(pollAttempt, intervals.length - 1)];
+ }
+
+ function stopPolling() {
+ if (pollTimer) {
+ clearTimeout(pollTimer);
+ pollTimer = null;
+ }
+ pollStartTime = null;
+ pollAttempt = 0;
+ }
+
+ function showError(message) {
+ stopPolling();
+ loading.style.display = 'none';
+ btn.disabled = false;
+ btn.textContent = 'Compute Topic Clusters & Similar Talks';
+ recomputeBtn.disabled = false;
+ recomputeBtn.textContent = 'Recompute (ignore cache)';
+ errorDiv.textContent = message;
+ errorDiv.style.display = '';
+ }
+
+ function handleResult(data) {
+ if (data.status === 'error') {
+ showError(data.message || 'Analysis failed. Please try again.');
+ recomputeBtn.style.display = '';
+ btn.style.display = 'none';
+ return;
+ }
+
+ loading.style.display = 'none';
+ btn.style.display = 'none';
+ recomputeBtn.style.display = '';
+ recomputeBtn.disabled = false;
+ recomputeBtn.textContent = 'Recompute (ignore cache)';
+
+ renderTopicClusters(data.topic_clusters);
+ renderSimilarTalks(data.submissions_list);
+ }
+
+ function pollForResults() {
+ if (pollStartTime && (Date.now() - pollStartTime) > POLL_TIMEOUT) {
+ showError('Analysis is taking longer than expected. Please try again later.');
+ return;
+ }
+
+ fetch(computeUrl, {
+ headers: { 'X-Requested-With': 'XMLHttpRequest' }
+ })
+ .then(function(response) {
+ if (!response.ok) throw new Error('Server error: ' + response.status);
+ return response.json();
+ })
+ .then(function(data) {
+ if (data.status === 'processing') {
+ pollAttempt++;
+ pollTimer = setTimeout(pollForResults, getNextPollInterval());
+ return;
+ }
+ stopPolling();
+ handleResult(data);
+ })
+ .catch(function(err) {
+ showError('Failed to compute analysis: ' + err.message);
+ });
+ }
+
function fetchAnalysis(recompute) {
var url = recompute ? computeUrl + '?recompute=1' : computeUrl;
var activeBtn = recompute ? recomputeBtn : btn;
@@ -587,14 +663,13 @@ 🔗 Similar Talks
return response.json();
})
.then(function(data) {
- loading.style.display = 'none';
- btn.style.display = 'none';
- recomputeBtn.style.display = '';
- recomputeBtn.disabled = false;
- recomputeBtn.textContent = 'Recompute (ignore cache)';
-
- renderTopicClusters(data.topic_clusters);
- renderSimilarTalks(data.submissions_list);
+ if (data.status === 'processing') {
+ pollStartTime = Date.now();
+ pollAttempt = 0;
+ pollTimer = setTimeout(pollForResults, getNextPollInterval());
+ return;
+ }
+ handleResult(data);
})
.catch(function(err) {
loading.style.display = 'none';
diff --git a/backend/reviews/tests/test_recap.py b/backend/reviews/tests/test_recap.py
index c24b27b968..e2176430dd 100644
--- a/backend/reviews/tests/test_recap.py
+++ b/backend/reviews/tests/test_recap.py
@@ -3,6 +3,7 @@
import pytest
from django.contrib.admin import AdminSite
from django.core.exceptions import PermissionDenied
+from django.test import override_settings
from conferences.tests.factories import ConferenceFactory
from reviews.admin import ReviewSessionAdmin
@@ -182,24 +183,61 @@ def test_recap_view_redirects_when_shortlist_not_visible(rf, mocker):
# --- review_recap_compute_analysis_view tests ---
-def test_compute_analysis_view_returns_submissions_and_clusters(rf, mocker):
+def _mock_analysis_deps(mocker, cache_return=None, computing_task_id=None):
+ """Mock the lazy-imported dependencies used in the compute analysis view."""
+
+ def cache_get_side_effect(key):
+ if ":computing" in key:
+ return computing_task_id
+ return cache_return
+
+ mock_cache_get = mocker.patch(
+ "django.core.cache.cache.get", side_effect=cache_get_side_effect
+ )
+ mock_cache_add = mocker.patch(
+ "django.core.cache.cache.add", return_value=True
+ )
+ mocker.patch("django.core.cache.cache.set")
+ mocker.patch("django.core.cache.cache.delete")
+ mock_task = mocker.patch("reviews.tasks.compute_recap_analysis.apply_async")
+ mock_check = mocker.patch("pycon.tasks.check_pending_heavy_processing_work.delay")
+ return mock_cache_get, mock_cache_add, mock_task, mock_check
+
+
+def test_compute_analysis_view_returns_cached_result(rf, mocker):
user, conference, review_session, submissions = _create_recap_setup()
sub1, sub2 = submissions
- mocker.patch(
- "reviews.similar_talks.compute_similar_talks",
- return_value={
- sub1.id: [{"id": sub2.id, "title": str(sub2.title), "similarity": 75.0}],
- sub2.id: [],
- },
- )
- mocker.patch(
- "reviews.similar_talks.compute_topic_clusters",
- return_value={
- "topics": [{"name": "ML", "count": 2, "keywords": ["ml"], "submissions": []}],
+ cached_data = {
+ "submissions_list": [
+ {
+ "id": sub1.id,
+ "title": str(sub1.title),
+ "type": sub1.type.name,
+ "speaker": sub1.speaker.display_name,
+ "similar": [
+ {"id": sub2.id, "title": str(sub2.title), "similarity": 75.0}
+ ],
+ },
+ {
+ "id": sub2.id,
+ "title": str(sub2.title),
+ "type": sub2.type.name,
+ "speaker": sub2.speaker.display_name,
+ "similar": [],
+ },
+ ],
+ "topic_clusters": {
+ "topics": [
+ {"name": "ML", "count": 2, "keywords": ["ml"], "submissions": []}
+ ],
"outliers": [],
"submission_topics": {},
},
+ }
+
+ mock_cache_get, _, mock_task, _ = _mock_analysis_deps(
+ mocker, cache_return=cached_data
)
request = rf.get("/")
@@ -211,77 +249,105 @@ def test_compute_analysis_view_returns_submissions_and_clusters(rf, mocker):
assert response.status_code == 200
data = json.loads(response.content)
- # Verify submissions_list structure
assert len(data["submissions_list"]) == 2
- # sub1 has higher similarity (75%) so should be first
assert data["submissions_list"][0]["id"] == sub1.id
- assert data["submissions_list"][0]["similar"] == [
- {"id": sub2.id, "title": str(sub2.title), "similarity": 75.0}
- ]
- assert data["submissions_list"][1]["id"] == sub2.id
- assert data["submissions_list"][1]["similar"] == []
-
- # Each submission entry has required fields
- for entry in data["submissions_list"]:
- assert "id" in entry
- assert "title" in entry
- assert "type" in entry
- assert "speaker" in entry
- assert "similar" in entry
-
- # Verify topic_clusters is passed through
assert data["topic_clusters"]["topics"][0]["name"] == "ML"
- assert data["topic_clusters"]["outliers"] == []
+ # Task should NOT have been dispatched since cache was hit
+ mock_task.assert_not_called()
-def test_compute_analysis_view_passes_recompute_flag(rf, mocker):
- mock_similar = mocker.patch(
- "reviews.similar_talks.compute_similar_talks",
- return_value={},
- )
- mock_clusters = mocker.patch(
- "reviews.similar_talks.compute_topic_clusters",
- return_value={"topics": [], "outliers": [], "submission_topics": {}},
- )
+def test_compute_analysis_view_dispatches_task_on_cache_miss(rf, mocker):
user, conference, review_session, submissions = _create_recap_setup()
+ _, _, mock_task, mock_check = _mock_analysis_deps(mocker, cache_return=None)
+
+ request = rf.get("/")
+ request.user = user
+
+ admin = ReviewSessionAdmin(ReviewSession, AdminSite())
+ response = admin.review_recap_compute_analysis_view(request, review_session.id)
+
+ assert response.status_code == 200
+ data = json.loads(response.content)
+ assert data == {"status": "processing"}
+
+ mock_task.assert_called_once()
+ call_kwargs = mock_task.call_args
+ assert call_kwargs[1]["args"][0] == conference.id
+ assert isinstance(call_kwargs[1]["args"][1], str)
+ assert call_kwargs[1]["args"][1].startswith("recap_analysis:conf_")
+ assert call_kwargs[1]["kwargs"] == {"force_recompute": False}
+ assert call_kwargs[1]["queue"] == "heavy_processing"
+
+ mock_check.assert_called_once()
+
+
+def test_compute_analysis_view_dispatches_task_with_recompute(rf, mocker):
+ user, conference, review_session, submissions = _create_recap_setup()
+
+ _, _, mock_task, _ = _mock_analysis_deps(mocker, cache_return=None)
+
request = rf.get("/?recompute=1")
request.user = user
admin = ReviewSessionAdmin(ReviewSession, AdminSite())
- admin.review_recap_compute_analysis_view(request, review_session.id)
+ response = admin.review_recap_compute_analysis_view(request, review_session.id)
- _, kwargs = mock_similar.call_args
- assert kwargs["force_recompute"] is True
+ assert response.status_code == 200
+ data = json.loads(response.content)
+ assert data == {"status": "processing"}
- _, kwargs = mock_clusters.call_args
- assert kwargs["force_recompute"] is True
+ mock_task.assert_called_once()
+ call_kwargs = mock_task.call_args
+ assert call_kwargs[1]["kwargs"]["force_recompute"] is True
-def test_compute_analysis_view_no_recompute_by_default(rf, mocker):
- mock_similar = mocker.patch(
- "reviews.similar_talks.compute_similar_talks",
- return_value={},
- )
- mock_clusters = mocker.patch(
- "reviews.similar_talks.compute_topic_clusters",
- return_value={"topics": [], "outliers": [], "submission_topics": {}},
+def test_compute_analysis_view_recompute_skips_cache(rf, mocker):
+ user, conference, review_session, submissions = _create_recap_setup()
+
+ cached_data = {"submissions_list": [], "topic_clusters": {"topics": []}}
+ mock_cache_get, _, mock_task, _ = _mock_analysis_deps(
+ mocker, cache_return=cached_data
)
+ request = rf.get("/?recompute=1")
+ request.user = user
+
+ admin = ReviewSessionAdmin(ReviewSession, AdminSite())
+ response = admin.review_recap_compute_analysis_view(request, review_session.id)
+
+ data = json.loads(response.content)
+ assert data == {"status": "processing"}
+
+ # With recompute=1, cache.get should only be called for the computing key,
+ # not for the result cache
+ for call in mock_cache_get.call_args_list:
+ assert ":computing" in call.args[0]
+ mock_task.assert_called_once()
+
+
+def test_compute_analysis_view_skips_dispatch_when_already_computing(rf, mocker):
user, conference, review_session, submissions = _create_recap_setup()
+ mock_cache_get, mock_cache_add, mock_task, mock_check = _mock_analysis_deps(
+ mocker, cache_return=None
+ )
+ # Simulate lock already held — cache.add returns False
+ mock_cache_add.return_value = False
+
request = rf.get("/")
request.user = user
admin = ReviewSessionAdmin(ReviewSession, AdminSite())
- admin.review_recap_compute_analysis_view(request, review_session.id)
+ response = admin.review_recap_compute_analysis_view(request, review_session.id)
- _, kwargs = mock_similar.call_args
- assert kwargs["force_recompute"] is False
+ data = json.loads(response.content)
+ assert data == {"status": "processing"}
- _, kwargs = mock_clusters.call_args
- assert kwargs["force_recompute"] is False
+ # Task should NOT be dispatched since lock was already held
+ mock_task.assert_not_called()
+ mock_check.assert_not_called()
def test_compute_analysis_view_permission_denied_for_non_reviewer(rf):
@@ -317,3 +383,199 @@ def test_compute_analysis_view_permission_denied_when_shortlist_not_visible(rf):
with pytest.raises(PermissionDenied):
admin.review_recap_compute_analysis_view(request, review_session.id)
+
+
+# --- compute_recap_analysis task tests ---
+
+
+LOCMEM_CACHE = {
+ "default": {
+ "BACKEND": "django.core.cache.backends.locmem.LocMemCache",
+ "LOCATION": "test-recap-analysis",
+ }
+}
+
+
+@pytest.mark.django_db
+@override_settings(CACHES=LOCMEM_CACHE)
+def test_task_populates_cache_with_results(mocker):
+ from django.core.cache import cache
+
+ from reviews.tasks import compute_recap_analysis
+
+ user, conference, review_session, submissions = _create_recap_setup()
+ sub1, sub2 = submissions
+
+ mocker.patch(
+ "reviews.similar_talks.compute_similar_talks",
+ return_value={
+ sub1.id: [{"id": sub2.id, "title": str(sub2.title), "similarity": 75.0}],
+ sub2.id: [],
+ },
+ )
+ mocker.patch(
+ "reviews.similar_talks.compute_topic_clusters",
+ return_value={
+ "topics": [
+ {"name": "ML", "count": 2, "keywords": ["ml"], "submissions": []}
+ ],
+ "outliers": [],
+ "submission_topics": {},
+ },
+ )
+
+ cache_key = "recap_analysis:conf_test:integration"
+ # Set computing lock to verify it gets cleaned up
+ cache.set(f"{cache_key}:computing", True)
+
+ result = compute_recap_analysis(conference.id, cache_key)
+
+ assert len(result["submissions_list"]) == 2
+ assert result["submissions_list"][0]["id"] == sub1.id
+ assert result["submissions_list"][0]["similar"][0]["similarity"] == 75.0
+ assert result["topic_clusters"]["topics"][0]["name"] == "ML"
+
+ # Verify cache was populated
+ cached = cache.get(cache_key)
+ assert cached == result
+
+ # Verify computing lock was cleaned up
+ assert cache.get(f"{cache_key}:computing") is None
+
+
+@pytest.mark.django_db
+@override_settings(CACHES=LOCMEM_CACHE)
+def test_task_caches_error_on_failure(mocker):
+ from django.core.cache import cache
+
+ from reviews.tasks import compute_recap_analysis
+
+ user, conference, review_session, submissions = _create_recap_setup()
+
+ mocker.patch(
+ "reviews.similar_talks.compute_similar_talks",
+ side_effect=RuntimeError("ML model failed"),
+ )
+
+ cache_key = "recap_analysis:conf_test:error"
+ cache.set(f"{cache_key}:computing", True)
+
+ with pytest.raises(RuntimeError, match="ML model failed"):
+ compute_recap_analysis(conference.id, cache_key)
+
+ # Verify error was cached
+ cached = cache.get(cache_key)
+ assert cached["status"] == "error"
+ assert "failed" in cached["message"].lower()
+
+ # Verify computing lock was cleaned up
+ assert cache.get(f"{cache_key}:computing") is None
+
+
+def test_task_handles_missing_conference(mocker):
+ from reviews.tasks import compute_recap_analysis
+
+ mock_similar = mocker.patch("reviews.similar_talks.compute_similar_talks")
+
+ result = compute_recap_analysis(999999, "recap_analysis:conf_999999:key")
+
+ assert result is None
+ mock_similar.assert_not_called()
+
+
+@pytest.mark.django_db
+@override_settings(CACHES=LOCMEM_CACHE)
+def test_task_handles_missing_conference_cleans_up_lock(mocker):
+ from django.core.cache import cache
+
+ from reviews.tasks import compute_recap_analysis
+
+ cache_key = "recap_analysis:conf_999999:key"
+ computing_key = f"{cache_key}:computing"
+ cache.set(computing_key, "some-task-id")
+
+ mocker.patch("reviews.similar_talks.compute_similar_talks")
+
+ result = compute_recap_analysis(999999, cache_key)
+
+ assert result is None
+ # Verify computing lock was cleaned up on DoesNotExist
+ assert cache.get(computing_key) is None
+
+
+def test_compute_analysis_view_clears_stale_lock_and_dispatches(rf, mocker):
+ user, conference, review_session, submissions = _create_recap_setup()
+
+ def cache_get_side_effect(key):
+ if ":computing" in key:
+ return "stale-task-id-123"
+ return None
+
+ mocker.patch("django.core.cache.cache.get", side_effect=cache_get_side_effect)
+ mocker.patch("django.core.cache.cache.add", return_value=True)
+ mocker.patch("django.core.cache.cache.set")
+ mock_cache_delete = mocker.patch("django.core.cache.cache.delete")
+ mock_task = mocker.patch("reviews.tasks.compute_recap_analysis.apply_async")
+ mock_check = mocker.patch("pycon.tasks.check_pending_heavy_processing_work.delay")
+
+ # Mock AsyncResult to report task as finished
+ mock_async_result_cls = mocker.patch("celery.result.AsyncResult")
+ mock_async_result_cls.return_value.state = "SUCCESS"
+
+ request = rf.get("/")
+ request.user = user
+
+ admin = ReviewSessionAdmin(ReviewSession, AdminSite())
+ response = admin.review_recap_compute_analysis_view(request, review_session.id)
+
+ data = json.loads(response.content)
+ assert data == {"status": "processing"}
+
+ # Stale lock should have been deleted
+ mock_cache_delete.assert_called()
+ # New task should have been dispatched
+ mock_task.assert_called_once()
+ mock_check.assert_called_once()
+
+
+def test_compute_analysis_view_keeps_active_task_lock(rf, mocker):
+ user, conference, review_session, submissions = _create_recap_setup()
+
+ def cache_get_side_effect(key):
+ if ":computing" in key:
+ return "active-task-id-456"
+ return None
+
+ mocker.patch("django.core.cache.cache.get", side_effect=cache_get_side_effect)
+ mocker.patch("django.core.cache.cache.add", return_value=False)
+ mocker.patch("django.core.cache.cache.set")
+ mock_cache_delete = mocker.patch("django.core.cache.cache.delete")
+ mock_task = mocker.patch("reviews.tasks.compute_recap_analysis.apply_async")
+ mock_check = mocker.patch("pycon.tasks.check_pending_heavy_processing_work.delay")
+
+ # Mock AsyncResult to report task as still running
+ mock_async_result_cls = mocker.patch("celery.result.AsyncResult")
+ mock_async_result_cls.return_value.state = "STARTED"
+
+ request = rf.get("/")
+ request.user = user
+
+ admin = ReviewSessionAdmin(ReviewSession, AdminSite())
+ response = admin.review_recap_compute_analysis_view(request, review_session.id)
+
+ data = json.loads(response.content)
+ assert data == {"status": "processing"}
+
+ # Lock should NOT have been deleted (task still active)
+ mock_cache_delete.assert_not_called()
+ # No new task should be dispatched
+ mock_task.assert_not_called()
+ mock_check.assert_not_called()
+
+
+def test_error_cache_ttl_is_shorter_than_result_ttl():
+ from reviews.tasks import ERROR_CACHE_TTL, RESULT_CACHE_TTL
+
+ assert ERROR_CACHE_TTL == 120 # 2 minutes
+ assert RESULT_CACHE_TTL == 86400 # 24 hours
+ assert ERROR_CACHE_TTL < RESULT_CACHE_TTL