From 9b811550651d08b8e036a4b25aa298aa4a6b5dfa Mon Sep 17 00:00:00 2001 From: Patrick Arminio Date: Mon, 9 Feb 2026 15:43:56 +0100 Subject: [PATCH 1/6] Move ML recap analysis to heavy-processing Celery worker Offload compute_similar_talks and compute_topic_clusters to the heavy_processing Celery queue to prevent OOM in Gunicorn web workers. The admin view now checks cache first, dispatches a Celery task on miss, and returns {"status": "processing"} for the frontend to poll. --- backend/reviews/admin.py | 54 +++---- backend/reviews/tasks.py | 72 +++++++++ backend/reviews/templates/reviews-recap.html | 78 +++++++++- backend/reviews/tests/test_recap.py | 145 +++++++++++-------- 4 files changed, 246 insertions(+), 103 deletions(-) create mode 100644 backend/reviews/tasks.py diff --git a/backend/reviews/admin.py b/backend/reviews/admin.py index b5fb828c70..df641ba776 100644 --- a/backend/reviews/admin.py +++ b/backend/reviews/admin.py @@ -448,49 +448,33 @@ def review_recap_compute_analysis_view(self, request, review_session_id): raise PermissionDenied() conference = review_session.conference - accepted_submissions = self._get_accepted_submissions(conference) + accepted_submissions = list(self._get_accepted_submissions(conference)) force_recompute = request.GET.get("recompute") == "1" - from reviews.similar_talks import compute_similar_talks, compute_topic_clusters + from django.core.cache import cache - similar_talks = compute_similar_talks( - accepted_submissions, - top_n=5, - conference_id=conference.id, - force_recompute=force_recompute, - ) + from pycon.tasks import check_pending_heavy_processing_work + from reviews.similar_talks import _get_cache_key + from reviews.tasks import compute_recap_analysis - topic_clusters = compute_topic_clusters( - accepted_submissions, - min_topic_size=3, - conference_id=conference.id, - force_recompute=force_recompute, + combined_cache_key = _get_cache_key( + "recap_analysis", conference.id, accepted_submissions ) - # Build submissions list with similar talks, sorted by highest similarity - submissions_list = sorted( - [ - { - "id": s.id, - "title": str(s.title), - "type": s.type.name, - "speaker": s.speaker.display_name if s.speaker else "Unknown", - "similar": similar_talks.get(s.id, []), - } - for s in accepted_submissions - ], - key=lambda x: max( - (item["similarity"] for item in x["similar"]), default=0 - ), - reverse=True, - ) + if not force_recompute: + cached_result = cache.get(combined_cache_key) + if cached_result is not None: + return JsonResponse(cached_result) - return JsonResponse( - { - "submissions_list": submissions_list, - "topic_clusters": topic_clusters, - } + # Dispatch the Celery task to the heavy_processing queue + compute_recap_analysis.apply_async( + args=[conference.id], + kwargs={"force_recompute": force_recompute}, + queue="heavy_processing", ) + check_pending_heavy_processing_work.delay() + + return JsonResponse({"status": "processing"}) def review_view(self, request, review_session_id, review_item_id): review_session = ReviewSession.objects.get(id=review_session_id) diff --git a/backend/reviews/tasks.py b/backend/reviews/tasks.py new file mode 100644 index 0000000000..79262e59fe --- /dev/null +++ b/backend/reviews/tasks.py @@ -0,0 +1,72 @@ +import logging + +from pycon.celery import app + +logger = logging.getLogger(__name__) + + +@app.task +def compute_recap_analysis(conference_id, force_recompute=False): + from django.core.cache import cache + from django.db.models import Q + + from reviews.similar_talks import ( + _get_cache_key, + compute_similar_talks, + compute_topic_clusters, + ) + from submissions.models import Submission + + accepted_submissions = list( + Submission.objects.filter(conference_id=conference_id) + .filter( + Q(pending_status=Submission.STATUS.accepted) + | Q(pending_status__isnull=True, status=Submission.STATUS.accepted) + | Q(pending_status="", status=Submission.STATUS.accepted) + ) + .select_related("speaker", "type", "audience_level") + .prefetch_related("languages") + ) + + similar_talks = compute_similar_talks( + accepted_submissions, + top_n=5, + conference_id=conference_id, + force_recompute=force_recompute, + ) + + topic_clusters = compute_topic_clusters( + accepted_submissions, + min_topic_size=3, + conference_id=conference_id, + force_recompute=force_recompute, + ) + + submissions_list = sorted( + [ + { + "id": s.id, + "title": str(s.title), + "type": s.type.name, + "speaker": s.speaker.display_name if s.speaker else "Unknown", + "similar": similar_talks.get(s.id, []), + } + for s in accepted_submissions + ], + key=lambda x: max( + (item["similarity"] for item in x["similar"]), default=0 + ), + reverse=True, + ) + + result = { + "submissions_list": submissions_list, + "topic_clusters": topic_clusters, + } + + combined_cache_key = _get_cache_key( + "recap_analysis", conference_id, accepted_submissions + ) + cache.set(combined_cache_key, result, 60 * 60 * 24) + + return result diff --git a/backend/reviews/templates/reviews-recap.html b/backend/reviews/templates/reviews-recap.html index 6cf704c797..9ae937f2d2 100644 --- a/backend/reviews/templates/reviews-recap.html +++ b/backend/reviews/templates/reviews-recap.html @@ -570,6 +570,70 @@

🔗 Similar Talks

section.style.display = ''; } + var pollTimer = null; + var pollStartTime = null; + var POLL_INTERVAL = 3000; + var POLL_TIMEOUT = 120000; + + function stopPolling() { + if (pollTimer) { + clearTimeout(pollTimer); + pollTimer = null; + } + pollStartTime = null; + } + + function handleResult(data, recompute) { + loading.style.display = 'none'; + btn.style.display = 'none'; + recomputeBtn.style.display = ''; + recomputeBtn.disabled = false; + recomputeBtn.textContent = 'Recompute (ignore cache)'; + + renderTopicClusters(data.topic_clusters); + renderSimilarTalks(data.submissions_list); + } + + function pollForResults(recompute) { + if (pollStartTime && (Date.now() - pollStartTime) > POLL_TIMEOUT) { + stopPolling(); + loading.style.display = 'none'; + var activeBtn = recompute ? recomputeBtn : btn; + activeBtn.disabled = false; + activeBtn.textContent = recompute ? 'Recompute (ignore cache)' : 'Compute Topic Clusters & Similar Talks'; + errorDiv.textContent = 'Analysis is taking longer than expected. Please try again later.'; + errorDiv.style.display = ''; + return; + } + + var url = recompute ? computeUrl + '?recompute=1' : computeUrl; + + fetch(url, { + headers: { 'X-Requested-With': 'XMLHttpRequest' } + }) + .then(function(response) { + if (!response.ok) throw new Error('Server error: ' + response.status); + return response.json(); + }) + .then(function(data) { + if (data.status === 'processing') { + pollTimer = setTimeout(function() { pollForResults(false); }, POLL_INTERVAL); + return; + } + stopPolling(); + handleResult(data, recompute); + }) + .catch(function(err) { + stopPolling(); + loading.style.display = 'none'; + var activeBtn = recompute ? recomputeBtn : btn; + activeBtn.disabled = false; + activeBtn.textContent = recompute ? 'Recompute (ignore cache)' : 'Compute Topic Clusters & Similar Talks'; + errorDiv.textContent = 'Failed to compute analysis: ' + err.message; + errorDiv.style.display = ''; + }); + } + function fetchAnalysis(recompute) { var url = recompute ? computeUrl + '?recompute=1' : computeUrl; var activeBtn = recompute ? recomputeBtn : btn; @@ -587,14 +651,12 @@

🔗 Similar Talks

return response.json(); }) .then(function(data) { - loading.style.display = 'none'; - btn.style.display = 'none'; - recomputeBtn.style.display = ''; - recomputeBtn.disabled = false; - recomputeBtn.textContent = 'Recompute (ignore cache)'; - - renderTopicClusters(data.topic_clusters); - renderSimilarTalks(data.submissions_list); + if (data.status === 'processing') { + pollStartTime = Date.now(); + pollTimer = setTimeout(function() { pollForResults(false); }, POLL_INTERVAL); + return; + } + handleResult(data, recompute); }) .catch(function(err) { loading.style.display = 'none'; diff --git a/backend/reviews/tests/test_recap.py b/backend/reviews/tests/test_recap.py index c24b27b968..eab1774d60 100644 --- a/backend/reviews/tests/test_recap.py +++ b/backend/reviews/tests/test_recap.py @@ -182,25 +182,51 @@ def test_recap_view_redirects_when_shortlist_not_visible(rf, mocker): # --- review_recap_compute_analysis_view tests --- -def test_compute_analysis_view_returns_submissions_and_clusters(rf, mocker): +def _mock_analysis_deps(mocker, cache_return=None): + """Mock the lazy-imported dependencies used in the compute analysis view.""" + mock_cache = mocker.patch("django.core.cache.cache.get", return_value=cache_return) + mock_task = mocker.patch("reviews.tasks.compute_recap_analysis.apply_async") + mock_check = mocker.patch("pycon.tasks.check_pending_heavy_processing_work.delay") + mocker.patch( + "reviews.similar_talks._get_cache_key", + return_value="recap_analysis:conf_test:abc123", + ) + return mock_cache, mock_task, mock_check + + +def test_compute_analysis_view_returns_cached_result(rf, mocker): user, conference, review_session, submissions = _create_recap_setup() sub1, sub2 = submissions - mocker.patch( - "reviews.similar_talks.compute_similar_talks", - return_value={ - sub1.id: [{"id": sub2.id, "title": str(sub2.title), "similarity": 75.0}], - sub2.id: [], - }, - ) - mocker.patch( - "reviews.similar_talks.compute_topic_clusters", - return_value={ - "topics": [{"name": "ML", "count": 2, "keywords": ["ml"], "submissions": []}], + cached_data = { + "submissions_list": [ + { + "id": sub1.id, + "title": str(sub1.title), + "type": sub1.type.name, + "speaker": sub1.speaker.display_name, + "similar": [ + {"id": sub2.id, "title": str(sub2.title), "similarity": 75.0} + ], + }, + { + "id": sub2.id, + "title": str(sub2.title), + "type": sub2.type.name, + "speaker": sub2.speaker.display_name, + "similar": [], + }, + ], + "topic_clusters": { + "topics": [ + {"name": "ML", "count": 2, "keywords": ["ml"], "submissions": []} + ], "outliers": [], "submission_topics": {}, }, - ) + } + + mock_cache, mock_task, _ = _mock_analysis_deps(mocker, cache_return=cached_data) request = rf.get("/") request.user = user @@ -211,77 +237,76 @@ def test_compute_analysis_view_returns_submissions_and_clusters(rf, mocker): assert response.status_code == 200 data = json.loads(response.content) - # Verify submissions_list structure assert len(data["submissions_list"]) == 2 - # sub1 has higher similarity (75%) so should be first assert data["submissions_list"][0]["id"] == sub1.id - assert data["submissions_list"][0]["similar"] == [ - {"id": sub2.id, "title": str(sub2.title), "similarity": 75.0} - ] - assert data["submissions_list"][1]["id"] == sub2.id - assert data["submissions_list"][1]["similar"] == [] - - # Each submission entry has required fields - for entry in data["submissions_list"]: - assert "id" in entry - assert "title" in entry - assert "type" in entry - assert "speaker" in entry - assert "similar" in entry - - # Verify topic_clusters is passed through assert data["topic_clusters"]["topics"][0]["name"] == "ML" - assert data["topic_clusters"]["outliers"] == [] + # Task should NOT have been dispatched since cache was hit + mock_task.assert_not_called() -def test_compute_analysis_view_passes_recompute_flag(rf, mocker): - mock_similar = mocker.patch( - "reviews.similar_talks.compute_similar_talks", - return_value={}, - ) - mock_clusters = mocker.patch( - "reviews.similar_talks.compute_topic_clusters", - return_value={"topics": [], "outliers": [], "submission_topics": {}}, + +def test_compute_analysis_view_dispatches_task_on_cache_miss(rf, mocker): + user, conference, review_session, submissions = _create_recap_setup() + + _, mock_task, mock_check = _mock_analysis_deps(mocker, cache_return=None) + + request = rf.get("/") + request.user = user + + admin = ReviewSessionAdmin(ReviewSession, AdminSite()) + response = admin.review_recap_compute_analysis_view(request, review_session.id) + + assert response.status_code == 200 + data = json.loads(response.content) + assert data == {"status": "processing"} + + mock_task.assert_called_once_with( + args=[conference.id], + kwargs={"force_recompute": False}, + queue="heavy_processing", ) + mock_check.assert_called_once() + + +def test_compute_analysis_view_dispatches_task_with_recompute(rf, mocker): user, conference, review_session, submissions = _create_recap_setup() + _, mock_task, _ = _mock_analysis_deps(mocker, cache_return=None) + request = rf.get("/?recompute=1") request.user = user admin = ReviewSessionAdmin(ReviewSession, AdminSite()) - admin.review_recap_compute_analysis_view(request, review_session.id) - - _, kwargs = mock_similar.call_args - assert kwargs["force_recompute"] is True + response = admin.review_recap_compute_analysis_view(request, review_session.id) - _, kwargs = mock_clusters.call_args - assert kwargs["force_recompute"] is True + assert response.status_code == 200 + data = json.loads(response.content) + assert data == {"status": "processing"} + mock_task.assert_called_once() + call_kwargs = mock_task.call_args + assert call_kwargs[1]["kwargs"]["force_recompute"] is True -def test_compute_analysis_view_no_recompute_by_default(rf, mocker): - mock_similar = mocker.patch( - "reviews.similar_talks.compute_similar_talks", - return_value={}, - ) - mock_clusters = mocker.patch( - "reviews.similar_talks.compute_topic_clusters", - return_value={"topics": [], "outliers": [], "submission_topics": {}}, - ) +def test_compute_analysis_view_recompute_skips_cache(rf, mocker): user, conference, review_session, submissions = _create_recap_setup() - request = rf.get("/") + cached_data = {"submissions_list": [], "topic_clusters": {"topics": []}} + mock_cache, mock_task, _ = _mock_analysis_deps(mocker, cache_return=cached_data) + + request = rf.get("/?recompute=1") request.user = user admin = ReviewSessionAdmin(ReviewSession, AdminSite()) - admin.review_recap_compute_analysis_view(request, review_session.id) + response = admin.review_recap_compute_analysis_view(request, review_session.id) - _, kwargs = mock_similar.call_args - assert kwargs["force_recompute"] is False + data = json.loads(response.content) + assert data == {"status": "processing"} - _, kwargs = mock_clusters.call_args - assert kwargs["force_recompute"] is False + # Cache should NOT have been checked when recompute=1 + mock_cache.assert_not_called() + mock_task.assert_called_once() def test_compute_analysis_view_permission_denied_for_non_reviewer(rf): From 8d656651758e42a2a424f7be03f27f891d46bb32 Mon Sep 17 00:00:00 2001 From: Patrick Arminio Date: Mon, 9 Feb 2026 16:03:11 +0100 Subject: [PATCH 2/6] Address review feedback: deduplicate query, add error handling, exponential backoff - Extract get_accepted_submissions() to remove duplicated query between admin.py and tasks.py - Add try/except in Celery task that caches error state so the frontend can show the actual failure instead of a timeout - Frontend handles {"status": "error"} responses from cached error state - Replace fixed 3s polling with exponential backoff (1s, 2s, 3s, 5s...) --- backend/reviews/admin.py | 24 +++-- backend/reviews/tasks.py | 101 ++++++++++--------- backend/reviews/templates/reviews-recap.html | 61 ++++++----- 3 files changed, 104 insertions(+), 82 deletions(-) diff --git a/backend/reviews/admin.py b/backend/reviews/admin.py index df641ba776..3701de9ad2 100644 --- a/backend/reviews/admin.py +++ b/backend/reviews/admin.py @@ -17,6 +17,19 @@ from users.admin_mixins import ConferencePermissionMixin +def get_accepted_submissions(conference): + return ( + Submission.objects.filter(conference=conference) + .filter( + Q(pending_status=Submission.STATUS.accepted) + | Q(pending_status__isnull=True, status=Submission.STATUS.accepted) + | Q(pending_status="", status=Submission.STATUS.accepted) + ) + .select_related("speaker", "type", "audience_level") + .prefetch_related("languages") + ) + + class AvailableScoreOptionInline(admin.TabularInline): model = AvailableScoreOption @@ -366,16 +379,7 @@ def review_shortlist_view(self, request, review_session_id): return TemplateResponse(request, adapter.shortlist_template, context) def _get_accepted_submissions(self, conference): - return ( - Submission.objects.filter(conference=conference) - .filter( - Q(pending_status=Submission.STATUS.accepted) - | Q(pending_status__isnull=True, status=Submission.STATUS.accepted) - | Q(pending_status="", status=Submission.STATUS.accepted) - ) - .select_related("speaker", "type", "audience_level") - .prefetch_related("languages") - ) + return get_accepted_submissions(conference) def review_recap_view(self, request, review_session_id): review_session = ReviewSession.objects.get(id=review_session_id) diff --git a/backend/reviews/tasks.py b/backend/reviews/tasks.py index 79262e59fe..d8f38f6192 100644 --- a/backend/reviews/tasks.py +++ b/backend/reviews/tasks.py @@ -8,65 +8,70 @@ @app.task def compute_recap_analysis(conference_id, force_recompute=False): from django.core.cache import cache - from django.db.models import Q + from reviews.admin import get_accepted_submissions from reviews.similar_talks import ( _get_cache_key, compute_similar_talks, compute_topic_clusters, ) - from submissions.models import Submission - accepted_submissions = list( - Submission.objects.filter(conference_id=conference_id) - .filter( - Q(pending_status=Submission.STATUS.accepted) - | Q(pending_status__isnull=True, status=Submission.STATUS.accepted) - | Q(pending_status="", status=Submission.STATUS.accepted) - ) - .select_related("speaker", "type", "audience_level") - .prefetch_related("languages") - ) + from conferences.models import Conference - similar_talks = compute_similar_talks( - accepted_submissions, - top_n=5, - conference_id=conference_id, - force_recompute=force_recompute, - ) + conference = Conference.objects.get(id=conference_id) + accepted_submissions = list(get_accepted_submissions(conference)) - topic_clusters = compute_topic_clusters( - accepted_submissions, - min_topic_size=3, - conference_id=conference_id, - force_recompute=force_recompute, + combined_cache_key = _get_cache_key( + "recap_analysis", conference_id, accepted_submissions ) - submissions_list = sorted( - [ - { - "id": s.id, - "title": str(s.title), - "type": s.type.name, - "speaker": s.speaker.display_name if s.speaker else "Unknown", - "similar": similar_talks.get(s.id, []), - } - for s in accepted_submissions - ], - key=lambda x: max( - (item["similarity"] for item in x["similar"]), default=0 - ), - reverse=True, - ) + try: + similar_talks = compute_similar_talks( + accepted_submissions, + top_n=5, + conference_id=conference_id, + force_recompute=force_recompute, + ) + + topic_clusters = compute_topic_clusters( + accepted_submissions, + min_topic_size=3, + conference_id=conference_id, + force_recompute=force_recompute, + ) - result = { - "submissions_list": submissions_list, - "topic_clusters": topic_clusters, - } + submissions_list = sorted( + [ + { + "id": s.id, + "title": str(s.title), + "type": s.type.name, + "speaker": s.speaker.display_name if s.speaker else "Unknown", + "similar": similar_talks.get(s.id, []), + } + for s in accepted_submissions + ], + key=lambda x: max( + (item["similarity"] for item in x["similar"]), default=0 + ), + reverse=True, + ) - combined_cache_key = _get_cache_key( - "recap_analysis", conference_id, accepted_submissions - ) - cache.set(combined_cache_key, result, 60 * 60 * 24) + result = { + "submissions_list": submissions_list, + "topic_clusters": topic_clusters, + } + + cache.set(combined_cache_key, result, 60 * 60 * 24) - return result + return result + except Exception: + logger.exception( + "Failed to compute recap analysis for conference %s", conference_id + ) + cache.set( + combined_cache_key, + {"status": "error", "message": "Analysis failed. Please try again."}, + 60 * 5, + ) + raise diff --git a/backend/reviews/templates/reviews-recap.html b/backend/reviews/templates/reviews-recap.html index 9ae937f2d2..ee5c388f4a 100644 --- a/backend/reviews/templates/reviews-recap.html +++ b/backend/reviews/templates/reviews-recap.html @@ -572,18 +572,43 @@

🔗 Similar Talks

var pollTimer = null; var pollStartTime = null; - var POLL_INTERVAL = 3000; + var pollAttempt = 0; var POLL_TIMEOUT = 120000; + function getNextPollInterval() { + // Exponential backoff: 1s, 2s, 3s, 5s, 5s, 5s... + var intervals = [1000, 2000, 3000, 5000]; + return intervals[Math.min(pollAttempt, intervals.length - 1)]; + } + function stopPolling() { if (pollTimer) { clearTimeout(pollTimer); pollTimer = null; } pollStartTime = null; + pollAttempt = 0; + } + + function showError(message) { + stopPolling(); + loading.style.display = 'none'; + btn.disabled = false; + btn.textContent = 'Compute Topic Clusters & Similar Talks'; + recomputeBtn.disabled = false; + recomputeBtn.textContent = 'Recompute (ignore cache)'; + errorDiv.textContent = message; + errorDiv.style.display = ''; } - function handleResult(data, recompute) { + function handleResult(data) { + if (data.status === 'error') { + showError(data.message || 'Analysis failed. Please try again.'); + recomputeBtn.style.display = ''; + btn.style.display = 'none'; + return; + } + loading.style.display = 'none'; btn.style.display = 'none'; recomputeBtn.style.display = ''; @@ -594,21 +619,13 @@

🔗 Similar Talks

renderSimilarTalks(data.submissions_list); } - function pollForResults(recompute) { + function pollForResults() { if (pollStartTime && (Date.now() - pollStartTime) > POLL_TIMEOUT) { - stopPolling(); - loading.style.display = 'none'; - var activeBtn = recompute ? recomputeBtn : btn; - activeBtn.disabled = false; - activeBtn.textContent = recompute ? 'Recompute (ignore cache)' : 'Compute Topic Clusters & Similar Talks'; - errorDiv.textContent = 'Analysis is taking longer than expected. Please try again later.'; - errorDiv.style.display = ''; + showError('Analysis is taking longer than expected. Please try again later.'); return; } - var url = recompute ? computeUrl + '?recompute=1' : computeUrl; - - fetch(url, { + fetch(computeUrl, { headers: { 'X-Requested-With': 'XMLHttpRequest' } }) .then(function(response) { @@ -617,20 +634,15 @@

🔗 Similar Talks

}) .then(function(data) { if (data.status === 'processing') { - pollTimer = setTimeout(function() { pollForResults(false); }, POLL_INTERVAL); + pollAttempt++; + pollTimer = setTimeout(pollForResults, getNextPollInterval()); return; } stopPolling(); - handleResult(data, recompute); + handleResult(data); }) .catch(function(err) { - stopPolling(); - loading.style.display = 'none'; - var activeBtn = recompute ? recomputeBtn : btn; - activeBtn.disabled = false; - activeBtn.textContent = recompute ? 'Recompute (ignore cache)' : 'Compute Topic Clusters & Similar Talks'; - errorDiv.textContent = 'Failed to compute analysis: ' + err.message; - errorDiv.style.display = ''; + showError('Failed to compute analysis: ' + err.message); }); } @@ -653,10 +665,11 @@

🔗 Similar Talks

.then(function(data) { if (data.status === 'processing') { pollStartTime = Date.now(); - pollTimer = setTimeout(function() { pollForResults(false); }, POLL_INTERVAL); + pollAttempt = 0; + pollTimer = setTimeout(pollForResults, getNextPollInterval()); return; } - handleResult(data, recompute); + handleResult(data); }) .catch(function(err) { loading.style.display = 'none'; From bb945b298416fccbcf40834e2aa696413dad87fa Mon Sep 17 00:00:00 2001 From: Patrick Arminio Date: Mon, 9 Feb 2026 16:34:43 +0100 Subject: [PATCH 3/6] Address second review: stampede lock, cache key consistency, error handling, tests - Add cache.add lock to prevent duplicate task dispatch on concurrent requests - Pass combined_cache_key from view to task to avoid key mismatch from race conditions between dispatch and execution - Handle Conference.DoesNotExist in task for deleted conferences - Clean up computing lock in finally block - Align frontend poll timeout (3min) with error cache TTL (2min) - Add integration tests: task cache population, error caching, missing conference - Add stampede prevention test (cache.add returns False) --- backend/reviews/admin.py | 16 +- backend/reviews/tasks.py | 21 +-- backend/reviews/templates/reviews-recap.html | 2 +- backend/reviews/tests/test_recap.py | 152 +++++++++++++++++-- 4 files changed, 165 insertions(+), 26 deletions(-) diff --git a/backend/reviews/admin.py b/backend/reviews/admin.py index 3701de9ad2..fca9ca003b 100644 --- a/backend/reviews/admin.py +++ b/backend/reviews/admin.py @@ -470,13 +470,15 @@ def review_recap_compute_analysis_view(self, request, review_session_id): if cached_result is not None: return JsonResponse(cached_result) - # Dispatch the Celery task to the heavy_processing queue - compute_recap_analysis.apply_async( - args=[conference.id], - kwargs={"force_recompute": force_recompute}, - queue="heavy_processing", - ) - check_pending_heavy_processing_work.delay() + # Use cache.add as a lock to prevent duplicate task dispatch + computing_key = f"{combined_cache_key}:computing" + if cache.add(computing_key, True, timeout=600): + compute_recap_analysis.apply_async( + args=[conference.id, combined_cache_key], + kwargs={"force_recompute": force_recompute}, + queue="heavy_processing", + ) + check_pending_heavy_processing_work.delay() return JsonResponse({"status": "processing"}) diff --git a/backend/reviews/tasks.py b/backend/reviews/tasks.py index d8f38f6192..d2bf1cdcab 100644 --- a/backend/reviews/tasks.py +++ b/backend/reviews/tasks.py @@ -6,25 +6,26 @@ @app.task -def compute_recap_analysis(conference_id, force_recompute=False): +def compute_recap_analysis(conference_id, combined_cache_key, force_recompute=False): from django.core.cache import cache + from conferences.models import Conference from reviews.admin import get_accepted_submissions from reviews.similar_talks import ( - _get_cache_key, compute_similar_talks, compute_topic_clusters, ) - from conferences.models import Conference + try: + conference = Conference.objects.get(id=conference_id) + except Conference.DoesNotExist: + logger.error( + "Conference %s not found for recap analysis", conference_id + ) + return - conference = Conference.objects.get(id=conference_id) accepted_submissions = list(get_accepted_submissions(conference)) - combined_cache_key = _get_cache_key( - "recap_analysis", conference_id, accepted_submissions - ) - try: similar_talks = compute_similar_talks( accepted_submissions, @@ -72,6 +73,8 @@ def compute_recap_analysis(conference_id, force_recompute=False): cache.set( combined_cache_key, {"status": "error", "message": "Analysis failed. Please try again."}, - 60 * 5, + 60 * 2, ) raise + finally: + cache.delete(f"{combined_cache_key}:computing") diff --git a/backend/reviews/templates/reviews-recap.html b/backend/reviews/templates/reviews-recap.html index ee5c388f4a..ac363aa827 100644 --- a/backend/reviews/templates/reviews-recap.html +++ b/backend/reviews/templates/reviews-recap.html @@ -573,7 +573,7 @@

🔗 Similar Talks

var pollTimer = null; var pollStartTime = null; var pollAttempt = 0; - var POLL_TIMEOUT = 120000; + var POLL_TIMEOUT = 180000; function getNextPollInterval() { // Exponential backoff: 1s, 2s, 3s, 5s, 5s, 5s... diff --git a/backend/reviews/tests/test_recap.py b/backend/reviews/tests/test_recap.py index eab1774d60..d476fa2087 100644 --- a/backend/reviews/tests/test_recap.py +++ b/backend/reviews/tests/test_recap.py @@ -3,6 +3,7 @@ import pytest from django.contrib.admin import AdminSite from django.core.exceptions import PermissionDenied +from django.test import override_settings from conferences.tests.factories import ConferenceFactory from reviews.admin import ReviewSessionAdmin @@ -182,16 +183,24 @@ def test_recap_view_redirects_when_shortlist_not_visible(rf, mocker): # --- review_recap_compute_analysis_view tests --- +FAKE_CACHE_KEY = "recap_analysis:conf_test:abc123" + + def _mock_analysis_deps(mocker, cache_return=None): """Mock the lazy-imported dependencies used in the compute analysis view.""" - mock_cache = mocker.patch("django.core.cache.cache.get", return_value=cache_return) + mock_cache_get = mocker.patch( + "django.core.cache.cache.get", return_value=cache_return + ) + mock_cache_add = mocker.patch( + "django.core.cache.cache.add", return_value=True + ) mock_task = mocker.patch("reviews.tasks.compute_recap_analysis.apply_async") mock_check = mocker.patch("pycon.tasks.check_pending_heavy_processing_work.delay") mocker.patch( "reviews.similar_talks._get_cache_key", - return_value="recap_analysis:conf_test:abc123", + return_value=FAKE_CACHE_KEY, ) - return mock_cache, mock_task, mock_check + return mock_cache_get, mock_cache_add, mock_task, mock_check def test_compute_analysis_view_returns_cached_result(rf, mocker): @@ -226,7 +235,9 @@ def test_compute_analysis_view_returns_cached_result(rf, mocker): }, } - mock_cache, mock_task, _ = _mock_analysis_deps(mocker, cache_return=cached_data) + mock_cache_get, _, mock_task, _ = _mock_analysis_deps( + mocker, cache_return=cached_data + ) request = rf.get("/") request.user = user @@ -248,7 +259,7 @@ def test_compute_analysis_view_returns_cached_result(rf, mocker): def test_compute_analysis_view_dispatches_task_on_cache_miss(rf, mocker): user, conference, review_session, submissions = _create_recap_setup() - _, mock_task, mock_check = _mock_analysis_deps(mocker, cache_return=None) + _, _, mock_task, mock_check = _mock_analysis_deps(mocker, cache_return=None) request = rf.get("/") request.user = user @@ -261,7 +272,7 @@ def test_compute_analysis_view_dispatches_task_on_cache_miss(rf, mocker): assert data == {"status": "processing"} mock_task.assert_called_once_with( - args=[conference.id], + args=[conference.id, FAKE_CACHE_KEY], kwargs={"force_recompute": False}, queue="heavy_processing", ) @@ -272,7 +283,7 @@ def test_compute_analysis_view_dispatches_task_on_cache_miss(rf, mocker): def test_compute_analysis_view_dispatches_task_with_recompute(rf, mocker): user, conference, review_session, submissions = _create_recap_setup() - _, mock_task, _ = _mock_analysis_deps(mocker, cache_return=None) + _, _, mock_task, _ = _mock_analysis_deps(mocker, cache_return=None) request = rf.get("/?recompute=1") request.user = user @@ -293,7 +304,9 @@ def test_compute_analysis_view_recompute_skips_cache(rf, mocker): user, conference, review_session, submissions = _create_recap_setup() cached_data = {"submissions_list": [], "topic_clusters": {"topics": []}} - mock_cache, mock_task, _ = _mock_analysis_deps(mocker, cache_return=cached_data) + mock_cache_get, _, mock_task, _ = _mock_analysis_deps( + mocker, cache_return=cached_data + ) request = rf.get("/?recompute=1") request.user = user @@ -305,10 +318,33 @@ def test_compute_analysis_view_recompute_skips_cache(rf, mocker): assert data == {"status": "processing"} # Cache should NOT have been checked when recompute=1 - mock_cache.assert_not_called() + mock_cache_get.assert_not_called() mock_task.assert_called_once() +def test_compute_analysis_view_skips_dispatch_when_already_computing(rf, mocker): + user, conference, review_session, submissions = _create_recap_setup() + + mock_cache_get, mock_cache_add, mock_task, mock_check = _mock_analysis_deps( + mocker, cache_return=None + ) + # Simulate lock already held — cache.add returns False + mock_cache_add.return_value = False + + request = rf.get("/") + request.user = user + + admin = ReviewSessionAdmin(ReviewSession, AdminSite()) + response = admin.review_recap_compute_analysis_view(request, review_session.id) + + data = json.loads(response.content) + assert data == {"status": "processing"} + + # Task should NOT be dispatched since lock was already held + mock_task.assert_not_called() + mock_check.assert_not_called() + + def test_compute_analysis_view_permission_denied_for_non_reviewer(rf): user = UserFactory(is_staff=True, is_superuser=False) conference = ConferenceFactory() @@ -342,3 +378,101 @@ def test_compute_analysis_view_permission_denied_when_shortlist_not_visible(rf): with pytest.raises(PermissionDenied): admin.review_recap_compute_analysis_view(request, review_session.id) + + +# --- compute_recap_analysis task tests --- + + +LOCMEM_CACHE = { + "default": { + "BACKEND": "django.core.cache.backends.locmem.LocMemCache", + "LOCATION": "test-recap-analysis", + } +} + + +@pytest.mark.django_db +@override_settings(CACHES=LOCMEM_CACHE) +def test_task_populates_cache_with_results(mocker): + from django.core.cache import cache + + from reviews.tasks import compute_recap_analysis + + user, conference, review_session, submissions = _create_recap_setup() + sub1, sub2 = submissions + + mocker.patch( + "reviews.similar_talks.compute_similar_talks", + return_value={ + sub1.id: [{"id": sub2.id, "title": str(sub2.title), "similarity": 75.0}], + sub2.id: [], + }, + ) + mocker.patch( + "reviews.similar_talks.compute_topic_clusters", + return_value={ + "topics": [ + {"name": "ML", "count": 2, "keywords": ["ml"], "submissions": []} + ], + "outliers": [], + "submission_topics": {}, + }, + ) + + cache_key = "recap_analysis:conf_test:integration" + # Set computing lock to verify it gets cleaned up + cache.set(f"{cache_key}:computing", True) + + result = compute_recap_analysis(conference.id, cache_key) + + assert len(result["submissions_list"]) == 2 + assert result["submissions_list"][0]["id"] == sub1.id + assert result["submissions_list"][0]["similar"][0]["similarity"] == 75.0 + assert result["topic_clusters"]["topics"][0]["name"] == "ML" + + # Verify cache was populated + cached = cache.get(cache_key) + assert cached == result + + # Verify computing lock was cleaned up + assert cache.get(f"{cache_key}:computing") is None + + +@pytest.mark.django_db +@override_settings(CACHES=LOCMEM_CACHE) +def test_task_caches_error_on_failure(mocker): + from django.core.cache import cache + + from reviews.tasks import compute_recap_analysis + + user, conference, review_session, submissions = _create_recap_setup() + + mocker.patch( + "reviews.similar_talks.compute_similar_talks", + side_effect=RuntimeError("ML model failed"), + ) + + cache_key = "recap_analysis:conf_test:error" + cache.set(f"{cache_key}:computing", True) + + with pytest.raises(RuntimeError, match="ML model failed"): + compute_recap_analysis(conference.id, cache_key) + + # Verify error was cached + cached = cache.get(cache_key) + assert cached["status"] == "error" + assert "failed" in cached["message"].lower() + + # Verify computing lock was cleaned up + assert cache.get(f"{cache_key}:computing") is None + + +def test_task_handles_missing_conference(mocker): + from reviews.tasks import compute_recap_analysis + + mock_similar = mocker.patch("reviews.similar_talks.compute_similar_talks") + + result = compute_recap_analysis(999999, "recap_analysis:conf_999999:key") + + assert result is None + mock_similar.assert_not_called() From 5a2fab16aa671378f08e86364500e0024a6d377f Mon Sep 17 00:00:00 2001 From: Patrick Arminio Date: Mon, 9 Feb 2026 17:14:57 +0100 Subject: [PATCH 4/6] Reduce computing lock TTL from 600s to 300s for auto-expiry safety --- backend/reviews/admin.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/reviews/admin.py b/backend/reviews/admin.py index fca9ca003b..0fa703f884 100644 --- a/backend/reviews/admin.py +++ b/backend/reviews/admin.py @@ -470,9 +470,10 @@ def review_recap_compute_analysis_view(self, request, review_session_id): if cached_result is not None: return JsonResponse(cached_result) - # Use cache.add as a lock to prevent duplicate task dispatch + # Use cache.add as a lock to prevent duplicate task dispatch. + # Short TTL so lock auto-expires if the worker is killed before cleanup. computing_key = f"{combined_cache_key}:computing" - if cache.add(computing_key, True, timeout=600): + if cache.add(computing_key, True, timeout=300): compute_recap_analysis.apply_async( args=[conference.id, combined_cache_key], kwargs={"force_recompute": force_recompute}, From a72ef20d1729863194c056ad63de5c069544f302 Mon Sep 17 00:00:00 2001 From: Patrick Arminio Date: Mon, 9 Feb 2026 17:20:29 +0100 Subject: [PATCH 5/6] Extract cache_keys module, remove redundant caching, unmock key generation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move get_cache_key and get_embedding_text to reviews/cache_keys.py (lightweight module with no ML imports) so admin view can import without triggering torch/sentence-transformers loading - Skip individual function caching in task (conference_id=None) since the combined result is cached; avoids triple cache storage - Remove _get_cache_key mock from tests — real hash computation now exercises the cache key generation logic --- backend/reviews/admin.py | 4 ++-- backend/reviews/cache_keys.py | 21 +++++++++++++++++++++ backend/reviews/similar_talks.py | 24 +++--------------------- backend/reviews/tasks.py | 6 ++++-- backend/reviews/tests/test_recap.py | 19 +++++++------------ 5 files changed, 37 insertions(+), 37 deletions(-) create mode 100644 backend/reviews/cache_keys.py diff --git a/backend/reviews/admin.py b/backend/reviews/admin.py index 0fa703f884..6346295a31 100644 --- a/backend/reviews/admin.py +++ b/backend/reviews/admin.py @@ -458,10 +458,10 @@ def review_recap_compute_analysis_view(self, request, review_session_id): from django.core.cache import cache from pycon.tasks import check_pending_heavy_processing_work - from reviews.similar_talks import _get_cache_key + from reviews.cache_keys import get_cache_key from reviews.tasks import compute_recap_analysis - combined_cache_key = _get_cache_key( + combined_cache_key = get_cache_key( "recap_analysis", conference.id, accepted_submissions ) diff --git a/backend/reviews/cache_keys.py b/backend/reviews/cache_keys.py new file mode 100644 index 0000000000..57334cdbaa --- /dev/null +++ b/backend/reviews/cache_keys.py @@ -0,0 +1,21 @@ +import hashlib + + +def get_embedding_text(submission) -> str: + """Combine title, elevator_pitch, and abstract into a single string for embedding.""" + title = str(submission.title) if submission.title else "" + elevator_pitch = str(submission.elevator_pitch) if submission.elevator_pitch else "" + abstract = ( + str(submission.abstract) + if hasattr(submission, "abstract") and submission.abstract + else "" + ) + return f"{title}. {elevator_pitch}. {abstract}" + + +def get_cache_key(prefix: str, conference_id: int, submissions) -> str: + """Generate a cache key based on conference and submission content.""" + content_hash = hashlib.md5() + for s in sorted(submissions, key=lambda x: x.id): + content_hash.update(f"{s.id}:{get_embedding_text(s)}".encode()) + return f"{prefix}:conf_{conference_id}:{content_hash.hexdigest()}" diff --git a/backend/reviews/similar_talks.py b/backend/reviews/similar_talks.py index 570dc8259b..4cc63ed8f0 100644 --- a/backend/reviews/similar_talks.py +++ b/backend/reviews/similar_talks.py @@ -1,5 +1,4 @@ import functools -import hashlib import logging import nltk @@ -12,6 +11,9 @@ from sklearn.feature_extraction.text import CountVectorizer from sklearn.metrics.pairwise import cosine_similarity +from reviews.cache_keys import get_cache_key as _get_cache_key +from reviews.cache_keys import get_embedding_text + logger = logging.getLogger(__name__) CACHE_TIMEOUT = 60 * 60 * 24 # 24 hours @@ -222,18 +224,6 @@ def get_embedding_model(): return SentenceTransformer("all-MiniLM-L6-v2", token=False) -def get_embedding_text(submission) -> str: - """Combine title, elevator_pitch, and abstract into a single string for embedding.""" - title = str(submission.title) if submission.title else "" - elevator_pitch = str(submission.elevator_pitch) if submission.elevator_pitch else "" - abstract = ( - str(submission.abstract) - if hasattr(submission, "abstract") and submission.abstract - else "" - ) - return f"{title}. {elevator_pitch}. {abstract}" - - def _get_submission_languages(submissions) -> set[str]: """Extract all unique language codes from submissions.""" language_codes = set() @@ -245,14 +235,6 @@ def _get_submission_languages(submissions) -> set[str]: return language_codes or {"en"} -def _get_cache_key(prefix: str, conference_id: int, submissions) -> str: - """Generate a cache key based on conference and submission content.""" - content_hash = hashlib.md5() - for s in sorted(submissions, key=lambda x: x.id): - content_hash.update(f"{s.id}:{get_embedding_text(s)}".encode()) - return f"{prefix}:conf_{conference_id}:{content_hash.hexdigest()}" - - def compute_similar_talks( submissions, top_n=5, conference_id=None, force_recompute=False ): diff --git a/backend/reviews/tasks.py b/backend/reviews/tasks.py index d2bf1cdcab..ff5fc8d3bd 100644 --- a/backend/reviews/tasks.py +++ b/backend/reviews/tasks.py @@ -27,17 +27,19 @@ def compute_recap_analysis(conference_id, combined_cache_key, force_recompute=Fa accepted_submissions = list(get_accepted_submissions(conference)) try: + # Pass conference_id=None to skip individual function caching; + # the combined result is cached under combined_cache_key instead. similar_talks = compute_similar_talks( accepted_submissions, top_n=5, - conference_id=conference_id, + conference_id=None, force_recompute=force_recompute, ) topic_clusters = compute_topic_clusters( accepted_submissions, min_topic_size=3, - conference_id=conference_id, + conference_id=None, force_recompute=force_recompute, ) diff --git a/backend/reviews/tests/test_recap.py b/backend/reviews/tests/test_recap.py index d476fa2087..99163c445f 100644 --- a/backend/reviews/tests/test_recap.py +++ b/backend/reviews/tests/test_recap.py @@ -183,9 +183,6 @@ def test_recap_view_redirects_when_shortlist_not_visible(rf, mocker): # --- review_recap_compute_analysis_view tests --- -FAKE_CACHE_KEY = "recap_analysis:conf_test:abc123" - - def _mock_analysis_deps(mocker, cache_return=None): """Mock the lazy-imported dependencies used in the compute analysis view.""" mock_cache_get = mocker.patch( @@ -196,10 +193,6 @@ def _mock_analysis_deps(mocker, cache_return=None): ) mock_task = mocker.patch("reviews.tasks.compute_recap_analysis.apply_async") mock_check = mocker.patch("pycon.tasks.check_pending_heavy_processing_work.delay") - mocker.patch( - "reviews.similar_talks._get_cache_key", - return_value=FAKE_CACHE_KEY, - ) return mock_cache_get, mock_cache_add, mock_task, mock_check @@ -271,11 +264,13 @@ def test_compute_analysis_view_dispatches_task_on_cache_miss(rf, mocker): data = json.loads(response.content) assert data == {"status": "processing"} - mock_task.assert_called_once_with( - args=[conference.id, FAKE_CACHE_KEY], - kwargs={"force_recompute": False}, - queue="heavy_processing", - ) + mock_task.assert_called_once() + call_kwargs = mock_task.call_args + assert call_kwargs[1]["args"][0] == conference.id + assert isinstance(call_kwargs[1]["args"][1], str) + assert call_kwargs[1]["args"][1].startswith("recap_analysis:conf_") + assert call_kwargs[1]["kwargs"] == {"force_recompute": False} + assert call_kwargs[1]["queue"] == "heavy_processing" mock_check.assert_called_once() From 77c48b703aeec18395f42b408c84eaac9868fc56 Mon Sep 17 00:00:00 2001 From: Patrick Arminio Date: Mon, 9 Feb 2026 19:34:35 +0100 Subject: [PATCH 6/6] Address review: stale lock detection, DoesNotExist cleanup, timeout alignment - Store Celery task ID in computing lock and check AsyncResult state to detect stale locks from crashed workers before dispatching duplicates - Clean up computing lock on Conference.DoesNotExist early return - Increase frontend poll timeout to 360s to exceed backend 300s lock TTL - Extend polling backoff intervals to 15s for large conferences - Add .order_by("id") to get_accepted_submissions for consistent ordering - Extract RESULT_CACHE_TTL / ERROR_CACHE_TTL constants - Add tests for stale lock detection, active lock preservation, and DoesNotExist lock cleanup --- backend/reviews/admin.py | 20 +++- backend/reviews/tasks.py | 8 +- backend/reviews/templates/reviews-recap.html | 6 +- backend/reviews/tests/test_recap.py | 116 ++++++++++++++++++- 4 files changed, 139 insertions(+), 11 deletions(-) diff --git a/backend/reviews/admin.py b/backend/reviews/admin.py index 6346295a31..ffc35f397e 100644 --- a/backend/reviews/admin.py +++ b/backend/reviews/admin.py @@ -27,6 +27,7 @@ def get_accepted_submissions(conference): ) .select_related("speaker", "type", "audience_level") .prefetch_related("languages") + .order_by("id") ) @@ -473,12 +474,27 @@ def review_recap_compute_analysis_view(self, request, review_session_id): # Use cache.add as a lock to prevent duplicate task dispatch. # Short TTL so lock auto-expires if the worker is killed before cleanup. computing_key = f"{combined_cache_key}:computing" - if cache.add(computing_key, True, timeout=300): - compute_recap_analysis.apply_async( + + # Check for stale lock from a crashed/finished task + existing_task_id = cache.get(computing_key) + if existing_task_id: + from celery.result import AsyncResult + + if AsyncResult(existing_task_id).state in ( + "SUCCESS", + "FAILURE", + "REVOKED", + ): + cache.delete(computing_key) + + if cache.add(computing_key, "pending", timeout=300): + result = compute_recap_analysis.apply_async( args=[conference.id, combined_cache_key], kwargs={"force_recompute": force_recompute}, queue="heavy_processing", ) + # Store task ID so subsequent requests can detect stale locks + cache.set(computing_key, result.id, timeout=300) check_pending_heavy_processing_work.delay() return JsonResponse({"status": "processing"}) diff --git a/backend/reviews/tasks.py b/backend/reviews/tasks.py index ff5fc8d3bd..d10eee7b69 100644 --- a/backend/reviews/tasks.py +++ b/backend/reviews/tasks.py @@ -4,6 +4,9 @@ logger = logging.getLogger(__name__) +RESULT_CACHE_TTL = 60 * 60 * 24 # 24 hours +ERROR_CACHE_TTL = 60 * 2 # 2 minutes + @app.task def compute_recap_analysis(conference_id, combined_cache_key, force_recompute=False): @@ -22,6 +25,7 @@ def compute_recap_analysis(conference_id, combined_cache_key, force_recompute=Fa logger.error( "Conference %s not found for recap analysis", conference_id ) + cache.delete(f"{combined_cache_key}:computing") return accepted_submissions = list(get_accepted_submissions(conference)) @@ -65,7 +69,7 @@ def compute_recap_analysis(conference_id, combined_cache_key, force_recompute=Fa "topic_clusters": topic_clusters, } - cache.set(combined_cache_key, result, 60 * 60 * 24) + cache.set(combined_cache_key, result, RESULT_CACHE_TTL) return result except Exception: @@ -75,7 +79,7 @@ def compute_recap_analysis(conference_id, combined_cache_key, force_recompute=Fa cache.set( combined_cache_key, {"status": "error", "message": "Analysis failed. Please try again."}, - 60 * 2, + ERROR_CACHE_TTL, ) raise finally: diff --git a/backend/reviews/templates/reviews-recap.html b/backend/reviews/templates/reviews-recap.html index ac363aa827..8c8e2bbe24 100644 --- a/backend/reviews/templates/reviews-recap.html +++ b/backend/reviews/templates/reviews-recap.html @@ -573,11 +573,11 @@

🔗 Similar Talks

var pollTimer = null; var pollStartTime = null; var pollAttempt = 0; - var POLL_TIMEOUT = 180000; + var POLL_TIMEOUT = 360000; // 6 minutes – must exceed backend lock TTL (300s) function getNextPollInterval() { - // Exponential backoff: 1s, 2s, 3s, 5s, 5s, 5s... - var intervals = [1000, 2000, 3000, 5000]; + // Exponential backoff: 1s, 2s, 3s, 5s, 10s, 15s, 15s... + var intervals = [1000, 2000, 3000, 5000, 10000, 15000]; return intervals[Math.min(pollAttempt, intervals.length - 1)]; } diff --git a/backend/reviews/tests/test_recap.py b/backend/reviews/tests/test_recap.py index 99163c445f..e2176430dd 100644 --- a/backend/reviews/tests/test_recap.py +++ b/backend/reviews/tests/test_recap.py @@ -183,14 +183,22 @@ def test_recap_view_redirects_when_shortlist_not_visible(rf, mocker): # --- review_recap_compute_analysis_view tests --- -def _mock_analysis_deps(mocker, cache_return=None): +def _mock_analysis_deps(mocker, cache_return=None, computing_task_id=None): """Mock the lazy-imported dependencies used in the compute analysis view.""" + + def cache_get_side_effect(key): + if ":computing" in key: + return computing_task_id + return cache_return + mock_cache_get = mocker.patch( - "django.core.cache.cache.get", return_value=cache_return + "django.core.cache.cache.get", side_effect=cache_get_side_effect ) mock_cache_add = mocker.patch( "django.core.cache.cache.add", return_value=True ) + mocker.patch("django.core.cache.cache.set") + mocker.patch("django.core.cache.cache.delete") mock_task = mocker.patch("reviews.tasks.compute_recap_analysis.apply_async") mock_check = mocker.patch("pycon.tasks.check_pending_heavy_processing_work.delay") return mock_cache_get, mock_cache_add, mock_task, mock_check @@ -312,8 +320,10 @@ def test_compute_analysis_view_recompute_skips_cache(rf, mocker): data = json.loads(response.content) assert data == {"status": "processing"} - # Cache should NOT have been checked when recompute=1 - mock_cache_get.assert_not_called() + # With recompute=1, cache.get should only be called for the computing key, + # not for the result cache + for call in mock_cache_get.call_args_list: + assert ":computing" in call.args[0] mock_task.assert_called_once() @@ -471,3 +481,101 @@ def test_task_handles_missing_conference(mocker): assert result is None mock_similar.assert_not_called() + + +@pytest.mark.django_db +@override_settings(CACHES=LOCMEM_CACHE) +def test_task_handles_missing_conference_cleans_up_lock(mocker): + from django.core.cache import cache + + from reviews.tasks import compute_recap_analysis + + cache_key = "recap_analysis:conf_999999:key" + computing_key = f"{cache_key}:computing" + cache.set(computing_key, "some-task-id") + + mocker.patch("reviews.similar_talks.compute_similar_talks") + + result = compute_recap_analysis(999999, cache_key) + + assert result is None + # Verify computing lock was cleaned up on DoesNotExist + assert cache.get(computing_key) is None + + +def test_compute_analysis_view_clears_stale_lock_and_dispatches(rf, mocker): + user, conference, review_session, submissions = _create_recap_setup() + + def cache_get_side_effect(key): + if ":computing" in key: + return "stale-task-id-123" + return None + + mocker.patch("django.core.cache.cache.get", side_effect=cache_get_side_effect) + mocker.patch("django.core.cache.cache.add", return_value=True) + mocker.patch("django.core.cache.cache.set") + mock_cache_delete = mocker.patch("django.core.cache.cache.delete") + mock_task = mocker.patch("reviews.tasks.compute_recap_analysis.apply_async") + mock_check = mocker.patch("pycon.tasks.check_pending_heavy_processing_work.delay") + + # Mock AsyncResult to report task as finished + mock_async_result_cls = mocker.patch("celery.result.AsyncResult") + mock_async_result_cls.return_value.state = "SUCCESS" + + request = rf.get("/") + request.user = user + + admin = ReviewSessionAdmin(ReviewSession, AdminSite()) + response = admin.review_recap_compute_analysis_view(request, review_session.id) + + data = json.loads(response.content) + assert data == {"status": "processing"} + + # Stale lock should have been deleted + mock_cache_delete.assert_called() + # New task should have been dispatched + mock_task.assert_called_once() + mock_check.assert_called_once() + + +def test_compute_analysis_view_keeps_active_task_lock(rf, mocker): + user, conference, review_session, submissions = _create_recap_setup() + + def cache_get_side_effect(key): + if ":computing" in key: + return "active-task-id-456" + return None + + mocker.patch("django.core.cache.cache.get", side_effect=cache_get_side_effect) + mocker.patch("django.core.cache.cache.add", return_value=False) + mocker.patch("django.core.cache.cache.set") + mock_cache_delete = mocker.patch("django.core.cache.cache.delete") + mock_task = mocker.patch("reviews.tasks.compute_recap_analysis.apply_async") + mock_check = mocker.patch("pycon.tasks.check_pending_heavy_processing_work.delay") + + # Mock AsyncResult to report task as still running + mock_async_result_cls = mocker.patch("celery.result.AsyncResult") + mock_async_result_cls.return_value.state = "STARTED" + + request = rf.get("/") + request.user = user + + admin = ReviewSessionAdmin(ReviewSession, AdminSite()) + response = admin.review_recap_compute_analysis_view(request, review_session.id) + + data = json.loads(response.content) + assert data == {"status": "processing"} + + # Lock should NOT have been deleted (task still active) + mock_cache_delete.assert_not_called() + # No new task should be dispatched + mock_task.assert_not_called() + mock_check.assert_not_called() + + +def test_error_cache_ttl_is_shorter_than_result_ttl(): + from reviews.tasks import ERROR_CACHE_TTL, RESULT_CACHE_TTL + + assert ERROR_CACHE_TTL == 120 # 2 minutes + assert RESULT_CACHE_TTL == 86400 # 24 hours + assert ERROR_CACHE_TTL < RESULT_CACHE_TTL