Skip to content

Commit ce5baa1

Browse files
authored
Merge pull request #49 from taskbadger/sk/procrastinate-periodic-tracking
fix(procrastinate): track periodic tasks via job_manager.defer_periodic_job
2 parents 053ff36 + d3fbbd8 commit ce5baa1

3 files changed

Lines changed: 96 additions & 8 deletions

File tree

taskbadger/procrastinate.py

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,17 +160,22 @@ async def defer_async(**kwargs):
160160
task.defer_async = defer_async
161161

162162

163-
def _maybe_create_pending(task, kwargs):
164-
"""Decide whether to track this defer, and if so create the TaskBadger
165-
task and inject its id into ``kwargs``. Always returns the kwargs dict."""
163+
def _create_pending_task(task, task_kwargs):
164+
"""Create a PENDING TaskBadger task for ``task`` if it should be tracked.
165+
166+
Returns the created TaskBadger task, or ``None`` if Badger isn't
167+
configured, the task isn't tracked (neither manual nor auto), or the
168+
create call failed. ``task_kwargs`` is used only for the
169+
``record_task_args`` data capture.
170+
"""
166171
if not Badger.is_configured():
167-
return kwargs
172+
return None
168173

169174
system = getattr(task, "_taskbadger_system", None)
170175
manual = getattr(task, _MANUAL_ATTR, False)
171176
auto = bool(system) and system.track_task(task.name)
172177
if not manual and not auto:
173-
return kwargs
178+
return None
174179

175180
opts = dict(getattr(task, _OPTS_ATTR, {}) or {})
176181
name = opts.pop("name", None) or task.name
@@ -185,12 +190,18 @@ def _maybe_create_pending(task, kwargs):
185190
if record_args is None:
186191
record_args = bool(system) and system.record_task_args
187192
if record_args:
188-
data["procrastinate_task_kwargs"] = _serialize_kwargs(kwargs)
193+
data["procrastinate_task_kwargs"] = _serialize_kwargs(task_kwargs)
189194

190195
if data:
191196
create_kwargs["data"] = data
192197

193-
tb_task = create_task_safe(name, **create_kwargs)
198+
return create_task_safe(name, **create_kwargs)
199+
200+
201+
def _maybe_create_pending(task, kwargs):
202+
"""Decide whether to track this defer, and if so create the TaskBadger
203+
task and inject its id into ``kwargs``. Always returns the kwargs dict."""
204+
tb_task = _create_pending_task(task, kwargs)
194205
if tb_task is None:
195206
return kwargs
196207

@@ -262,6 +273,37 @@ def current_task():
262273
return safe_get_task(tb_id)
263274

264275

276+
def _patch_job_manager(app, system):
277+
"""Patch ``app.job_manager.defer_periodic_job`` so periodic tasks are tracked.
278+
279+
Procrastinate's ``PeriodicDeferrer`` enqueues jobs by calling
280+
``job_manager.defer_periodic_job(job=..., ...)`` directly, bypassing
281+
``task.defer``/``defer_async``. Without this hook, ``@app.periodic`` tasks
282+
would never get a PENDING TaskBadger task created at enqueue time.
283+
284+
Idempotent: a second call updates the system reference but doesn't
285+
re-wrap.
286+
"""
287+
jm = app.job_manager
288+
if not getattr(jm, "_taskbadger_original_defer_periodic_job", None):
289+
original = jm.defer_periodic_job
290+
jm._taskbadger_original_defer_periodic_job = original
291+
292+
@functools.wraps(original)
293+
async def patched(*, job, periodic_id, defer_timestamp):
294+
task = app.tasks.get(job.task_name)
295+
if task is not None:
296+
tb_task = _create_pending_task(task, job.task_kwargs)
297+
if tb_task is not None:
298+
new_kwargs = {**job.task_kwargs, TB_TASK_ID_KWARG: tb_task.id}
299+
job = job.evolve(task_kwargs=new_kwargs)
300+
return await jm._taskbadger_original_defer_periodic_job(
301+
job=job, periodic_id=periodic_id, defer_timestamp=defer_timestamp
302+
)
303+
304+
jm.defer_periodic_job = patched
305+
306+
265307
def _patch_app_task(app, system):
266308
"""Replace ``app.task`` with a wrapper that instruments newly-registered
267309
tasks under the supplied ``system``. Idempotent — a second call replaces

taskbadger/systems/procrastinate.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from __future__ import annotations
44

55
from taskbadger._integrations import BaseSystemIntegration
6-
from taskbadger.procrastinate import _instrument_task, _patch_app_task
6+
from taskbadger.procrastinate import _instrument_task, _patch_app_task, _patch_job_manager
77

88

99
class ProcrastinateSystemIntegration(BaseSystemIntegration):
@@ -41,6 +41,7 @@ def __init__(
4141
for task in list(app.tasks.values()):
4242
_instrument_task(task, system=self)
4343
_patch_app_task(app, system=self)
44+
_patch_job_manager(app, system=self)
4445

4546
def track_task(self, task_name):
4647
# Never auto-track Procrastinate's built-in housekeeping tasks

tests/test_procrastinate_system_integration.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from unittest import mock
23

34
import procrastinate
@@ -98,6 +99,50 @@ def late(a):
9899
create.assert_called_once()
99100

100101

102+
@pytest.mark.usefixtures("_bind_settings")
103+
def test_periodic_defer_creates_pending(app):
104+
"""Periodic tasks are deferred via ``app.job_manager.defer_periodic_job``,
105+
which bypasses ``task.defer``/``defer_async`` entirely. The system
106+
integration must hook this path too, otherwise periodic jobs are invisible
107+
to TaskBadger."""
108+
109+
@app.task(name="periodic_target")
110+
def periodic_target(timestamp):
111+
return timestamp
112+
113+
ProcrastinateSystemIntegration(app=app, auto_track_tasks=True)
114+
115+
timestamp = 1700000000
116+
job = periodic_target.configure(task_kwargs={"timestamp": timestamp}).job
117+
118+
tb = task_for_test()
119+
with mock.patch("taskbadger.procrastinate.create_task_safe", return_value=tb) as create:
120+
asyncio.run(app.job_manager.defer_periodic_job(job=job, periodic_id="every-min", defer_timestamp=timestamp))
121+
122+
create.assert_called_once()
123+
jobs_stored = list(app.connector.jobs.values())
124+
assert jobs_stored[0]["args"][TB_TASK_ID_KWARG] == tb.id
125+
126+
127+
@pytest.mark.usefixtures("_bind_settings")
128+
def test_periodic_defer_skips_excluded(app):
129+
"""Excludes apply on the periodic path too."""
130+
131+
@app.task(name="myapp.cleanup.flush")
132+
def flush(timestamp):
133+
pass
134+
135+
ProcrastinateSystemIntegration(app=app, auto_track_tasks=True, excludes=[r"myapp\.cleanup\..*"])
136+
137+
timestamp = 1700000000
138+
job = flush.configure(task_kwargs={"timestamp": timestamp}).job
139+
140+
with mock.patch("taskbadger.procrastinate.create_task_safe") as create:
141+
asyncio.run(app.job_manager.defer_periodic_job(job=job, periodic_id="every-min", defer_timestamp=timestamp))
142+
143+
create.assert_not_called()
144+
145+
101146
@pytest.mark.usefixtures("_bind_settings")
102147
def test_track_plus_auto_track_no_double_wrap(app):
103148
@track

0 commit comments

Comments
 (0)