From 4d0d16c4b36d85d3417080725b7820d083dd12ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 3 Jan 2026 11:34:18 +0100 Subject: [PATCH 1/4] [IMP] queue_job: use state constant in lock function --- queue_job/job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index 6cfe12f232..4d69046e17 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -263,11 +263,11 @@ def lock(self): queue_job WHERE uuid = %s - AND state='started' + AND state = %s ) FOR UPDATE; """, - [self.uuid], + [self.uuid, STARTED], ) # 1 job should be locked From 7c2a76f859cf819a9c1abf920d8099a8a9c19857 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 2 Jan 2026 12:18:38 +0100 Subject: [PATCH 2/4] [IMP] queue_job: refactor job acquisition In this commit we cleanly separate the job acquisition (i.e. verifying the job is in the exepected state, marking it started and locking it) from job execution. We also avoid trying to start the job if it is already locked by using SKIP LOCKED and exiting early. Indeed in such situations the job is likely already being handled by another worker so there is no point trying to start it, so we exit early and let it be handled either by the other worker or the dead job requeuer. --- queue_job/controllers/main.py | 57 ++++++++++++------- queue_job/job.py | 19 +++---- test_queue_job/tests/__init__.py | 1 + test_queue_job/tests/common.py | 10 ++++ test_queue_job/tests/test_acquire_job.py | 51 +++++++++++++++++ test_queue_job/tests/test_requeue_dead_job.py | 17 ------ 6 files changed, 107 insertions(+), 48 deletions(-) create mode 100644 test_queue_job/tests/test_acquire_job.py diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index a0dce3b8e9..7a81eb13a5 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -27,15 +27,47 @@ class RunJobController(http.Controller): - def _try_perform_job(self, env, job): - """Try to perform the job.""" + @classmethod + def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: + """Acquire a job for execution. + + - make sure it is in ENQUEUED state + - mark it as STARTED and commit the state change + - acquire the job lock + + If successful, return the Job instance, otherwise return None. This + function may fail to acquire the job is not in the expected state or is + already locked by another worker. + """ + env.cr.execute( + "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s " + "FOR UPDATE SKIP LOCKED", + (job_uuid, ENQUEUED), + ) + if not env.cr.fetchone(): + _logger.warning( + "was requested to run job %s, but it does not exist, " + "or is not in state %s, or is being handled by another worker", + job_uuid, + ENQUEUED, + ) + return None + job = Job.load(env, job_uuid) + assert job and job.state == ENQUEUED job.set_started() job.store() env.cr.commit() - job.lock() + if not job.lock(): + _logger.warning( + "was requested to run job %s, but it could not be locked", + job_uuid, + ) + return None + return job + def _try_perform_job(self, env, job): + """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) - job.perform() # Triggers any stored computed fields before calling 'set_done' # so that will be part of the 'exec_time' @@ -95,23 +127,10 @@ def retry_postpone(job, message, seconds=None): job.set_pending(reset_retry=False) job.store() - # ensure the job to run is in the correct state and lock the record - env.cr.execute( - "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE", - (job_uuid, ENQUEUED), - ) - if not env.cr.fetchone(): - _logger.warning( - "was requested to run job %s, but it does not exist, " - "or is not in state %s", - job_uuid, - ENQUEUED, - ) + job = self._acquire_job(env, job_uuid) + if not job: return "" - job = Job.load(env, job_uuid) - assert job and job.state == ENQUEUED - try: try: self._try_perform_job(env, job) diff --git a/queue_job/job.py b/queue_job/job.py index 4d69046e17..4c78072508 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -221,7 +221,7 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} - def add_lock_record(self): + def add_lock_record(self) -> None: """ Create row in db to be locked while the job is being performed. """ @@ -241,13 +241,11 @@ def add_lock_record(self): [self.uuid], ) - def lock(self): - """ - Lock row of job that is being performed + def lock(self) -> bool: + """Lock row of job that is being performed. - If a job cannot be locked, - it means that the job wasn't started, - a RetryableJobError is thrown. + Return False if a job cannot be locked: it means that the job is not in + STARTED state or is already locked by another worker. """ self.env.cr.execute( """ @@ -265,16 +263,13 @@ def lock(self): uuid = %s AND state = %s ) - FOR UPDATE; + FOR UPDATE SKIP LOCKED; """, [self.uuid, STARTED], ) # 1 job should be locked - if 1 != len(self.env.cr.fetchall()): - raise RetryableJobError( - f"Trying to lock job that wasn't started, uuid: {self.uuid}" - ) + return bool(self.env.cr.fetchall()) @classmethod def _load_from_db_record(cls, job_db_record): diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 62347148e5..0cfacebdf3 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -1,3 +1,4 @@ +from . import test_acquire_job from . import test_autovacuum from . import test_delayable from . import test_dependencies diff --git a/test_queue_job/tests/common.py b/test_queue_job/tests/common.py index 335c072625..d3173a2198 100644 --- a/test_queue_job/tests/common.py +++ b/test_queue_job/tests/common.py @@ -20,3 +20,13 @@ def _create_job(self): stored = Job.db_records_from_uuids(self.env, [test_job.uuid]) self.assertEqual(len(stored), 1) return stored + + def _get_demo_job(self, uuid): + # job created during load of demo data + job = self.env["queue.job"].search([("uuid", "=", uuid)], limit=1) + self.assertTrue( + job, + f"Demo data queue job {uuid!r} should be loaded in order " + "to make this test work", + ) + return job diff --git a/test_queue_job/tests/test_acquire_job.py b/test_queue_job/tests/test_acquire_job.py new file mode 100644 index 0000000000..3f0c92a2be --- /dev/null +++ b/test_queue_job/tests/test_acquire_job.py @@ -0,0 +1,51 @@ +# Copyright 2026 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +import logging +from unittest import mock + +from odoo.tests import tagged + +from odoo.addons.queue_job.controllers.main import RunJobController + +from .common import JobCommonCase + + +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): + def test_acquire_enqueued_job(self): + job_record = self._get_demo_job(uuid="test_enqueued_job") + self.assertFalse( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)], + ), + "A job lock record should not exist at this point", + ) + with mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit: + job = RunJobController._acquire_job(self.env, job_uuid="test_enqueued_job") + mock_commit.assert_called_once() + self.assertIsNotNone(job) + self.assertEqual(job.uuid, "test_enqueued_job") + self.assertEqual(job.state, "started") + self.assertTrue( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)] + ), + "A job lock record should exist at this point", + ) + + def test_acquire_started_job(self): + with ( + mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit, + self.assertLogs(level=logging.WARNING) as logs, + ): + job = RunJobController._acquire_job(self.env, "test_started_job") + mock_commit.assert_not_called() + self.assertIsNone(job) + self.assertIn( + "was requested to run job test_started_job, but it does not exist", + logs.output[0], + ) diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index 58890adf24..510276be63 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -13,23 +13,6 @@ @tagged("post_install", "-at_install") class TestRequeueDeadJob(JobCommonCase): - def _get_demo_job(self, uuid): - # job created during load of demo data - job = self.env["queue.job"].search( - [ - ("uuid", "=", uuid), - ], - limit=1, - ) - - self.assertTrue( - job, - f"Demo data queue job {uuid} should be loaded in order" - " to make this tests work", - ) - - return job - def get_locks(self, uuid, cr=None): """ Retrieve lock rows From 9937ed0ebb2022260918b6762e32debd09966e17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 3 Jan 2026 11:26:02 +0100 Subject: [PATCH 3/4] [IMP] queue_job: refactor runjob Extract the logic to run one job out of the /queue_job/runjob route. Towards making this logic reusable in other job executors. --- queue_job/controllers/main.py | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 7a81eb13a5..26f1983c45 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -108,17 +108,7 @@ def _enqueue_dependent_jobs(self, env, job): else: break - @http.route( - "/queue_job/runjob", - type="http", - auth="none", - save_session=False, - readonly=False, - ) - def runjob(self, db, job_uuid, **kw): - http.request.session.db = db - env = http.request.env(user=SUPERUSER_ID) - + def _runjob(self, env: api.Environment, job: Job) -> None: def retry_postpone(job, message, seconds=None): job.env.clear() with Registry(job.env.cr.dbname).cursor() as new_cr: @@ -127,10 +117,6 @@ def retry_postpone(job, message, seconds=None): job.set_pending(reset_retry=False) job.store() - job = self._acquire_job(env, job_uuid) - if not job: - return "" - try: try: self._try_perform_job(env, job) @@ -151,7 +137,6 @@ def retry_postpone(job, message, seconds=None): # traceback in the logs we should have the traceback when all # retries are exhausted env.cr.rollback() - return "" except (FailedJobError, Exception) as orig_exception: buff = StringIO() @@ -171,8 +156,6 @@ def retry_postpone(job, message, seconds=None): self._enqueue_dependent_jobs(env, job) _logger.debug("%s enqueue depends done", job) - return "" - def _get_failure_values(self, job, traceback_txt, orig_exception): """Collect relevant data from exception.""" exception_name = orig_exception.__class__.__name__ @@ -187,6 +170,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception): "exc_message": exc_message, } + @http.route( + "/queue_job/runjob", + type="http", + auth="none", + save_session=False, + readonly=False, + ) + def runjob(self, db, job_uuid, **kw): + http.request.session.db = db + env = http.request.env(user=SUPERUSER_ID) + job = self._acquire_job(env, job_uuid) + if not job: + return "" + self._runjob(env, job) + return "" + # flake8: noqa: C901 @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( From ea803f3756b423c9029b3dffe419161662006798 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 3 Jan 2026 11:39:13 +0100 Subject: [PATCH 4/4] [IMP] queue_job: convert job execution logic to class method Towards making this logic reusable. --- queue_job/controllers/main.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 26f1983c45..28f3534848 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -65,7 +65,8 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: return None return job - def _try_perform_job(self, env, job): + @classmethod + def _try_perform_job(cls, env, job): """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) job.perform() @@ -78,7 +79,8 @@ def _try_perform_job(self, env, job): env.cr.commit() _logger.debug("%s done", job) - def _enqueue_dependent_jobs(self, env, job): + @classmethod + def _enqueue_dependent_jobs(cls, env, job): tries = 0 while True: try: @@ -108,7 +110,8 @@ def _enqueue_dependent_jobs(self, env, job): else: break - def _runjob(self, env: api.Environment, job: Job) -> None: + @classmethod + def _runjob(cls, env: api.Environment, job: Job) -> None: def retry_postpone(job, message, seconds=None): job.env.clear() with Registry(job.env.cr.dbname).cursor() as new_cr: @@ -119,7 +122,7 @@ def retry_postpone(job, message, seconds=None): try: try: - self._try_perform_job(env, job) + cls._try_perform_job(env, job) except OperationalError as err: # Automatically retry the typical transaction serialization # errors @@ -146,17 +149,18 @@ def retry_postpone(job, message, seconds=None): job.env.clear() with Registry(job.env.cr.dbname).cursor() as new_cr: job.env = job.env(cr=new_cr) - vals = self._get_failure_values(job, traceback_txt, orig_exception) + vals = cls._get_failure_values(job, traceback_txt, orig_exception) job.set_failed(**vals) job.store() buff.close() raise _logger.debug("%s enqueue depends started", job) - self._enqueue_dependent_jobs(env, job) + cls._enqueue_dependent_jobs(env, job) _logger.debug("%s enqueue depends done", job) - def _get_failure_values(self, job, traceback_txt, orig_exception): + @classmethod + def _get_failure_values(cls, job, traceback_txt, orig_exception): """Collect relevant data from exception.""" exception_name = orig_exception.__class__.__name__ if hasattr(orig_exception, "__module__"):