Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 227 additions & 1 deletion cron/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@
from events.models import FossMdlCourses, TestAttendance, State, City, InstituteType
from creation.models import FossCategory

from django.db import transaction
from training.models import TrainingAttend
from events.models import Test, TestAttendance
from mdldjango.helper import get_moodle_user
from events.helpers import get_fossmdlcourse
from django.db import close_old_connections



def bulk_email(taskid, *args, **kwargs):
task = AsyncCronMail.objects.get(pk=taskid)
if task.log_file.name == "":
Expand Down Expand Up @@ -166,4 +175,221 @@ def filter_student_grades(key=None):


def async_filter_student_grades(key):
TOPPER_QUEUE.enqueue(filter_student_grades, key, job_id=key, job_timeout='72h')
TOPPER_QUEUE.enqueue(filter_student_grades, key, job_id=key, job_timeout='72h')



def process_test_attendance(test_id):
"""
Background task:
- Create TestAttendance
- Sync Moodle users
"""
close_old_connections()
job = get_current_job()

print(f"\033[92m job ****** {job} \033[0m")
def meta_update(**updates):
"""Update RQ job meta safely"""
if not job:
return
job.meta.update(updates)
job.save_meta()

meta_update(
test_id=test_id,
status="starting",
started_at=int(time.time()),
progress_total=0,
progress_processed=0,
progress_pct=0,
stats_created_attendance=0,
stats_skipped_existing=0,
stats_missing_moodle=0,
message="Starting attendance processing.."
)

try:
test = Test.objects.select_related('training', 'foss').get(pk=test_id)
except Test.DoesNotExist:
meta_update(status="done", message="Test not found. Exiting")
return

if not test.training_id:
meta_update(status="done", message="No training attached to test. Exiting.")
return

meta_update(status="running", message="Loading training attendees...")

tras = TrainingAttend.objects.select_related(
'student__user',
'training__training_planner'
).filter(training=test.training)

fossmdlcourse = get_fossmdlcourse(
test.foss_id,
fossmdlmap_id=test.training.fossmdlmap_id
)

# total count for progress (1 extra query; useful for dashboard)
total = tras.count()
meta_update(progress_total=total, progress_processed=0, progress_pct=0)

existing = set(
TestAttendance.objects.filter(test=test)
.values_list("student_id", "mdluser_id")
)

mdluser_cache = {}
new_rows = []

processed = 0
skipped_existing = 0
missing_moodle = 0

# Update job meta every N rows to avoid excessive Redis writes
UPDATE_EVERY = 10
meta_update(message=f"Processing {total} attendees...")

for tra in tras.iterator():
user = tra.student.user

key = (
tra.training.training_planner.academic_id,
user.first_name,
user.last_name,
tra.student.gender,
user.email
)

if key not in mdluser_cache:
mdluser_cache[key] = get_moodle_user(*key)

mdluser = mdluser_cache[key]
if not mdluser:
missing_moodle += 1
processed += 1
# progress update
if (processed % UPDATE_EVERY == 0) or (processed == total):
pct = int((processed * 100) / total) if total else 100
meta_update(
progress_processed=processed,
progress_pct=pct,
stats_created_attendance=len(new_rows),
stats_skipped_existing=skipped_existing,
stats_missing_moodle=missing_moodle,
message=f"Processing... ({processed}/{total})",
)
continue

pair = (tra.student.id, mdluser.id)
if pair in existing:
skipped_existing += 1
processed += 1
if (processed % UPDATE_EVERY == 0) or (processed == total):
pct = int((processed * 100) / total) if total else 100
meta_update(
progress_processed=processed,
progress_pct=pct,
stats_created_attendance=len(new_rows),
stats_skipped_existing=skipped_existing,
stats_missing_moodle=missing_moodle,
message=f"Processing... ({processed}/{total})",
)
continue

new_rows.append(
TestAttendance(
student_id=tra.student.id,
test=test,
mdluser_id=mdluser.id,
mdlcourse_id=fossmdlcourse.mdlcourse_id,
mdlquiz_id=fossmdlcourse.mdlquiz_id,
mdlattempt_id=0,
status=0
)
)
processed += 1

if (processed % UPDATE_EVERY == 0) or (processed == total):
pct = int((processed * 100) / total) if total else 100
meta_update(
progress_processed=processed,
progress_pct=pct,
stats_created_attendance=len(new_rows),
stats_skipped_existing=skipped_existing,
stats_missing_moodle=missing_moodle,
message=f"Processing... ({processed}/{total})",
)

# --- write phase ---
meta_update(status="writing", message=f"Writing {len(new_rows)} new TestAttendance rows...")

if new_rows:
close_old_connections()
with transaction.atomic():
TestAttendance.objects.bulk_create(new_rows)

meta_update(
status="done",
finished_at=int(time.time()),
stats_created_attendance=len(new_rows),
stats_skipped_existing=skipped_existing,
stats_missing_moodle=missing_moodle,
progress_processed=total,
progress_pct=100,
message="Done.",
)


def process_test_post_save(test_id, user_id, message,academic_id):
"""
Background task:
- Event log
- Notifications
"""
close_old_connections()
from events.views import (
update_events_log,
update_events_notification
)

update_events_log(
user_id=user_id,
role=0,
category=1,
category_id=test_id,
academic=academic_id,
status=0
)

update_events_notification(
user_id=user_id,
role=0,
category=1,
category_id=test_id,
academic=academic_id,
status=0,
message=message
)


def async_process_test_attendance(test):
DEFAULT_QUEUE.enqueue(
process_test_attendance,
test.pk,
job_id="test_attendance_%s" % test.pk,
job_timeout='72h'
)


def async_test_post_save(test, user, message):
DEFAULT_QUEUE.enqueue(
process_test_post_save,
test.pk,
user.pk,
message,
test.academic_id,
job_id="test_post_save_%s" % test.pk,
job_timeout='24h'
)
Loading