Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 53 additions & 46 deletions backend/reviews/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@
from users.admin_mixins import ConferencePermissionMixin


def get_accepted_submissions(conference):
return (
Submission.objects.filter(conference=conference)
.filter(
Q(pending_status=Submission.STATUS.accepted)
| Q(pending_status__isnull=True, status=Submission.STATUS.accepted)
| Q(pending_status="", status=Submission.STATUS.accepted)
)
.select_related("speaker", "type", "audience_level")
.prefetch_related("languages")
.order_by("id")
)


class AvailableScoreOptionInline(admin.TabularInline):
model = AvailableScoreOption

Expand Down Expand Up @@ -366,16 +380,7 @@ def review_shortlist_view(self, request, review_session_id):
return TemplateResponse(request, adapter.shortlist_template, context)

def _get_accepted_submissions(self, conference):
return (
Submission.objects.filter(conference=conference)
.filter(
Q(pending_status=Submission.STATUS.accepted)
| Q(pending_status__isnull=True, status=Submission.STATUS.accepted)
| Q(pending_status="", status=Submission.STATUS.accepted)
)
.select_related("speaker", "type", "audience_level")
.prefetch_related("languages")
)
return get_accepted_submissions(conference)

def review_recap_view(self, request, review_session_id):
review_session = ReviewSession.objects.get(id=review_session_id)
Expand Down Expand Up @@ -448,49 +453,51 @@ def review_recap_compute_analysis_view(self, request, review_session_id):
raise PermissionDenied()

conference = review_session.conference
accepted_submissions = self._get_accepted_submissions(conference)
accepted_submissions = list(self._get_accepted_submissions(conference))
force_recompute = request.GET.get("recompute") == "1"

from reviews.similar_talks import compute_similar_talks, compute_topic_clusters
from django.core.cache import cache

similar_talks = compute_similar_talks(
accepted_submissions,
top_n=5,
conference_id=conference.id,
force_recompute=force_recompute,
)
from pycon.tasks import check_pending_heavy_processing_work
from reviews.cache_keys import get_cache_key
from reviews.tasks import compute_recap_analysis

topic_clusters = compute_topic_clusters(
accepted_submissions,
min_topic_size=3,
conference_id=conference.id,
force_recompute=force_recompute,
combined_cache_key = get_cache_key(
"recap_analysis", conference.id, accepted_submissions
)

# Build submissions list with similar talks, sorted by highest similarity
submissions_list = sorted(
[
{
"id": s.id,
"title": str(s.title),
"type": s.type.name,
"speaker": s.speaker.display_name if s.speaker else "Unknown",
"similar": similar_talks.get(s.id, []),
}
for s in accepted_submissions
],
key=lambda x: max(
(item["similarity"] for item in x["similar"]), default=0
),
reverse=True,
)
if not force_recompute:
cached_result = cache.get(combined_cache_key)
if cached_result is not None:
return JsonResponse(cached_result)

return JsonResponse(
{
"submissions_list": submissions_list,
"topic_clusters": topic_clusters,
}
)
# Use cache.add as a lock to prevent duplicate task dispatch.
# Short TTL so lock auto-expires if the worker is killed before cleanup.
computing_key = f"{combined_cache_key}:computing"

# Check for stale lock from a crashed/finished task
existing_task_id = cache.get(computing_key)
if existing_task_id:
from celery.result import AsyncResult

if AsyncResult(existing_task_id).state in (
"SUCCESS",
"FAILURE",
"REVOKED",
):
cache.delete(computing_key)

if cache.add(computing_key, "pending", timeout=300):
result = compute_recap_analysis.apply_async(
args=[conference.id, combined_cache_key],
kwargs={"force_recompute": force_recompute},
queue="heavy_processing",
)
# Store task ID so subsequent requests can detect stale locks
cache.set(computing_key, result.id, timeout=300)
check_pending_heavy_processing_work.delay()

return JsonResponse({"status": "processing"})

def review_view(self, request, review_session_id, review_item_id):
review_session = ReviewSession.objects.get(id=review_session_id)
Expand Down
21 changes: 21 additions & 0 deletions backend/reviews/cache_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import hashlib


def get_embedding_text(submission) -> str:
"""Combine title, elevator_pitch, and abstract into a single string for embedding."""
title = str(submission.title) if submission.title else ""
elevator_pitch = str(submission.elevator_pitch) if submission.elevator_pitch else ""
abstract = (
str(submission.abstract)
if hasattr(submission, "abstract") and submission.abstract
else ""
)
return f"{title}. {elevator_pitch}. {abstract}"


def get_cache_key(prefix: str, conference_id: int, submissions) -> str:
"""Generate a cache key based on conference and submission content."""
content_hash = hashlib.md5()
for s in sorted(submissions, key=lambda x: x.id):
content_hash.update(f"{s.id}:{get_embedding_text(s)}".encode())
return f"{prefix}:conf_{conference_id}:{content_hash.hexdigest()}"
24 changes: 3 additions & 21 deletions backend/reviews/similar_talks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import functools
import hashlib
import logging

import nltk
Expand All @@ -12,6 +11,9 @@
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity

from reviews.cache_keys import get_cache_key as _get_cache_key
from reviews.cache_keys import get_embedding_text

logger = logging.getLogger(__name__)

CACHE_TIMEOUT = 60 * 60 * 24 # 24 hours
Expand Down Expand Up @@ -222,18 +224,6 @@ def get_embedding_model():
return SentenceTransformer("all-MiniLM-L6-v2", token=False)


def get_embedding_text(submission) -> str:
"""Combine title, elevator_pitch, and abstract into a single string for embedding."""
title = str(submission.title) if submission.title else ""
elevator_pitch = str(submission.elevator_pitch) if submission.elevator_pitch else ""
abstract = (
str(submission.abstract)
if hasattr(submission, "abstract") and submission.abstract
else ""
)
return f"{title}. {elevator_pitch}. {abstract}"


def _get_submission_languages(submissions) -> set[str]:
"""Extract all unique language codes from submissions."""
language_codes = set()
Expand All @@ -245,14 +235,6 @@ def _get_submission_languages(submissions) -> set[str]:
return language_codes or {"en"}


def _get_cache_key(prefix: str, conference_id: int, submissions) -> str:
"""Generate a cache key based on conference and submission content."""
content_hash = hashlib.md5()
for s in sorted(submissions, key=lambda x: x.id):
content_hash.update(f"{s.id}:{get_embedding_text(s)}".encode())
return f"{prefix}:conf_{conference_id}:{content_hash.hexdigest()}"


def compute_similar_talks(
submissions, top_n=5, conference_id=None, force_recompute=False
):
Expand Down
86 changes: 86 additions & 0 deletions backend/reviews/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import logging

from pycon.celery import app

logger = logging.getLogger(__name__)

RESULT_CACHE_TTL = 60 * 60 * 24 # 24 hours
ERROR_CACHE_TTL = 60 * 2 # 2 minutes


@app.task
def compute_recap_analysis(conference_id, combined_cache_key, force_recompute=False):
from django.core.cache import cache

from conferences.models import Conference
from reviews.admin import get_accepted_submissions
from reviews.similar_talks import (
compute_similar_talks,
compute_topic_clusters,
)

try:
conference = Conference.objects.get(id=conference_id)
except Conference.DoesNotExist:
logger.error(
"Conference %s not found for recap analysis", conference_id
)
cache.delete(f"{combined_cache_key}:computing")
return

accepted_submissions = list(get_accepted_submissions(conference))

try:
# Pass conference_id=None to skip individual function caching;
# the combined result is cached under combined_cache_key instead.
similar_talks = compute_similar_talks(
accepted_submissions,
top_n=5,
conference_id=None,
force_recompute=force_recompute,
)

topic_clusters = compute_topic_clusters(
accepted_submissions,
min_topic_size=3,
conference_id=None,
force_recompute=force_recompute,
)

submissions_list = sorted(
[
{
"id": s.id,
"title": str(s.title),
"type": s.type.name,
"speaker": s.speaker.display_name if s.speaker else "Unknown",
"similar": similar_talks.get(s.id, []),
}
for s in accepted_submissions
],
key=lambda x: max(
(item["similarity"] for item in x["similar"]), default=0
),
reverse=True,
)

result = {
"submissions_list": submissions_list,
"topic_clusters": topic_clusters,
}

cache.set(combined_cache_key, result, RESULT_CACHE_TTL)

return result
except Exception:
logger.exception(
"Failed to compute recap analysis for conference %s", conference_id
)
cache.set(
combined_cache_key,
{"status": "error", "message": "Analysis failed. Please try again."},
ERROR_CACHE_TTL,
)
raise
finally:
cache.delete(f"{combined_cache_key}:computing")
91 changes: 83 additions & 8 deletions backend/reviews/templates/reviews-recap.html
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,82 @@ <h2 class="recap-section-title">🔗 Similar Talks</h2>
section.style.display = '';
}

var pollTimer = null;
var pollStartTime = null;
var pollAttempt = 0;
var POLL_TIMEOUT = 360000; // 6 minutes – must exceed backend lock TTL (300s)

function getNextPollInterval() {
// Exponential backoff: 1s, 2s, 3s, 5s, 10s, 15s, 15s...
var intervals = [1000, 2000, 3000, 5000, 10000, 15000];
return intervals[Math.min(pollAttempt, intervals.length - 1)];
}

function stopPolling() {
if (pollTimer) {
clearTimeout(pollTimer);
pollTimer = null;
}
pollStartTime = null;
pollAttempt = 0;
}

function showError(message) {
stopPolling();
loading.style.display = 'none';
btn.disabled = false;
btn.textContent = 'Compute Topic Clusters & Similar Talks';
recomputeBtn.disabled = false;
recomputeBtn.textContent = 'Recompute (ignore cache)';
errorDiv.textContent = message;
errorDiv.style.display = '';
}

function handleResult(data) {
if (data.status === 'error') {
showError(data.message || 'Analysis failed. Please try again.');
recomputeBtn.style.display = '';
btn.style.display = 'none';
return;
}

loading.style.display = 'none';
btn.style.display = 'none';
recomputeBtn.style.display = '';
recomputeBtn.disabled = false;
recomputeBtn.textContent = 'Recompute (ignore cache)';

renderTopicClusters(data.topic_clusters);
renderSimilarTalks(data.submissions_list);
}

function pollForResults() {
if (pollStartTime && (Date.now() - pollStartTime) > POLL_TIMEOUT) {
showError('Analysis is taking longer than expected. Please try again later.');
return;
}

fetch(computeUrl, {
headers: { 'X-Requested-With': 'XMLHttpRequest' }
})
.then(function(response) {
if (!response.ok) throw new Error('Server error: ' + response.status);
return response.json();
})
.then(function(data) {
if (data.status === 'processing') {
pollAttempt++;
pollTimer = setTimeout(pollForResults, getNextPollInterval());
return;
}
stopPolling();
handleResult(data);
})
.catch(function(err) {
showError('Failed to compute analysis: ' + err.message);
});
}

function fetchAnalysis(recompute) {
var url = recompute ? computeUrl + '?recompute=1' : computeUrl;
var activeBtn = recompute ? recomputeBtn : btn;
Expand All @@ -587,14 +663,13 @@ <h2 class="recap-section-title">🔗 Similar Talks</h2>
return response.json();
})
.then(function(data) {
loading.style.display = 'none';
btn.style.display = 'none';
recomputeBtn.style.display = '';
recomputeBtn.disabled = false;
recomputeBtn.textContent = 'Recompute (ignore cache)';

renderTopicClusters(data.topic_clusters);
renderSimilarTalks(data.submissions_list);
if (data.status === 'processing') {
pollStartTime = Date.now();
pollAttempt = 0;
pollTimer = setTimeout(pollForResults, getNextPollInterval());
return;
}
handleResult(data);
})
.catch(function(err) {
loading.style.display = 'none';
Expand Down
Loading
Loading