1313import random
1414
1515import procrastinate
16+ import psycopg
1617import pytest
1718
1819import taskbadger
@@ -36,19 +37,36 @@ def _check_log_errors(caplog):
3637
3738
3839@pytest .fixture (scope = "session" )
39- def app ():
40- """A Procrastinate app pointed at a real Postgres instance with its schema applied."""
41- conn = procrastinate .SyncPsycopgConnector (conninfo = PROCRASTINATE_DSN )
40+ def _schema ():
41+ # apply_schema is NOT idempotent (schema.sql uses bare CREATE TYPE), so
42+ # only apply when the schema isn't already present.
43+ with psycopg .connect (PROCRASTINATE_DSN ) as conn , conn .cursor () as cur :
44+ cur .execute ("SELECT to_regclass('procrastinate_jobs')" )
45+ if cur .fetchone ()[0 ] is not None :
46+ return
47+ schema_conn = procrastinate .SyncPsycopgConnector (conninfo = PROCRASTINATE_DSN )
48+ schema_app = procrastinate .App (connector = schema_conn )
49+ with schema_app .open ():
50+ schema_app .schema_manager .apply_schema ()
51+
52+
53+ @pytest .fixture
54+ def app (_schema ):
55+ # Async connector: run_worker raises SyncConnectorConfigurationError on
56+ # SyncPsycopgConnector. Async connectors work in sync contexts too.
57+ #
58+ # Function-scoped because run_worker tears down the sync sub-connector that
59+ # PsycopgConnector spawns inside `app.open()`, leaving the next test's
60+ # defer() with no usable sync pool.
61+ conn = procrastinate .PsycopgConnector (conninfo = PROCRASTINATE_DSN )
4262 app = procrastinate .App (connector = conn )
4363 with app .open ():
44- # Apply schema (idempotent — Procrastinate's apply_schema is safe to re-run).
45- app .schema_manager .apply_schema ()
4664 yield app
4765
4866
49- def _fetch_job_args (app , job_id ):
50- """Read the stored ``args`` JSONB for a Procrastinate job."""
51- with app . connector . pool . connection ( ) as conn :
67+ def _fetch_job_args (job_id ):
68+ # Direct sync psycopg connection — the app's pool is async (see fixture).
69+ with psycopg . connect ( PROCRASTINATE_DSN ) as conn :
5270 with conn .cursor () as cur :
5371 cur .execute ("SELECT args FROM procrastinate_jobs WHERE id = %s" , (job_id ,))
5472 row = cur .fetchone ()
@@ -75,7 +93,7 @@ def add_manual(a, b):
7593
7694 # The TB task id was stashed in the job kwargs at defer time. Read it back
7795 # from Procrastinate to verify the final state.
78- args = _fetch_job_args (app , job_id )
96+ args = _fetch_job_args (job_id )
7997 tb_id = args ["__taskbadger_task_id__" ]
8098
8199 fetched = taskbadger .get_task (tb_id )
@@ -100,7 +118,7 @@ def add_auto(a, b):
100118 listen_notify = False ,
101119 )
102120
103- args = _fetch_job_args (app , job_id )
121+ args = _fetch_job_args (job_id )
104122 tb_id = args ["__taskbadger_task_id__" ]
105123
106124 fetched = taskbadger .get_task (tb_id )
0 commit comments