diff --git a/website/docs/job-key.md b/website/docs/job-key.md index 35ab76fa..75805197 100644 --- a/website/docs/job-key.md +++ b/website/docs/job-key.md @@ -29,6 +29,17 @@ SELECT graphile_worker.add_job('send_email', '{"count": 2}', job_key := 'abc'); COMMIT; ``` +If both the previous and new versions of the job use array payloads they will be +merged; for example the `process_invoices` job will run once with the payload +`[{id: 42},{id: 67}]`: + +```sql +BEGIN; +SELECT graphile_worker.add_job('process_invoices', '[{"id": 42}]', job_key := 'inv'); +SELECT graphile_worker.add_job('process_invoices', '[{"id": 67}]', job_key := 'inv'); +COMMIT; +``` + In all cases if no match is found then a new job will be created. ### `job_key_mode` @@ -36,14 +47,15 @@ In all cases if no match is found then a new job will be created. Behavior when an existing job with the same job key is found is controlled by the `job_key_mode` setting: -- `replace` (default) - overwrites the unlocked job with the new values. This is - primarily useful for rescheduling, updating, or **debouncing** (delaying - execution until there have been no events for at least a certain time period). - Locked jobs will cause a new job to be scheduled instead. -- `preserve_run_at` - overwrites the unlocked job with the new values, but - preserves `run_at`. This is primarily useful for **throttling** (executing at - most once over a given time period). Locked jobs will cause a new job to be - scheduled instead. +- `replace` (default) - overwrites the unlocked job with the new values (merging + array payloads). This is primarily useful for rescheduling, updating, or + **debouncing** (delaying execution until there have been no events for at + least a certain time period). Locked jobs will cause a new job to be scheduled + instead. +- `preserve_run_at` - overwrites the unlocked job with the new values (merging + array payloads), but preserves `run_at`. This is primarily useful for + **throttling** (executing at most once over a given time period). Locked jobs + will cause a new job to be scheduled instead. - `unsafe_dedupe` - if an existing job is found, even if it is locked or permanently failed, then it won't be updated. This is very dangerous as it means that the event that triggered this `add_job` call may not result in any @@ -71,6 +83,49 @@ The full `job_key_mode` algorithm is roughly as follows: - Otherwise: - the job will have all its attributes updated to their new values. +### Array payload merging + +When updating an existing job via `job_key` (except in `unsafe_dedupe` mode), if +both the existing job's payload and the new payload are JSON arrays, they will +be concatenated rather than overwritten. This enables a batching pattern where +multiple events can be accumulated into a single job for more efficient +execution; see [Handling batch jobs](./tasks.md#handling-batch-jobs) for more +info. + +```sql +-- First call creates job with payload: [{"id": 1}] +SELECT graphile_worker.add_job( + 'process_events', + '[{"id": 1}]'::json, + job_key := 'my_batch', + job_key_mode := 'preserve_run_at', + run_at := NOW() + INTERVAL '10 seconds' +); + +-- Second call (before job runs) merges to: [{"id": 1}, {"id": 2}] +SELECT graphile_worker.add_job( + 'process_events', + '[{"id": 2}]'::json, + job_key := 'my_batch', + job_key_mode := 'preserve_run_at', + run_at := NOW() + INTERVAL '10 seconds' +); +``` + +Combined with `preserve_run_at` job_key_mode, this creates a fixed batching +window: the job runs at the originally scheduled time with all accumulated +payloads merged together. With the default `replace` job_key_mode, each new +event would push the `run_at` forward, creating a rolling/debounce window +instead. + +:::caution Both payloads must be arrays for merging to occur + +If **either** payload is not an array (e.g., one is an object, as is the default +if no payload is specified), the standard replace behavior applies and the old +payload will be lost. + +::: + ## Removing jobs Pending jobs may also be removed using `job_key`: @@ -98,3 +153,9 @@ prevented from running again, and will have the `job_key` removed from it.) Calling `remove_job` for a locked (i.e. running) job will not actually remove it, but will prevent it from running again on failure. + +There's currently a race condition in adding jobs with a job key which means +under very high contention of a specific key an `add_job` may fail and return +`null`. You should check for this `null` and handle it appropriately: retrying, +throwing an error, or however else makes sense to your code. See: +https://github.com/graphile/worker/issues/580 for more details. diff --git a/website/docs/tasks.md b/website/docs/tasks.md index a419ae7d..4408f849 100644 --- a/website/docs/tasks.md +++ b/website/docs/tasks.md @@ -193,6 +193,15 @@ any of these promises reject, then the job will be re-enqueued, but the payload will be overwritten to only contain the entries associated with the rejected promises — i.e. the successful entries will be removed. +:::tip Accumulating batch payloads with job_key + +You can use [`job_key`](./job-key.md#array-payload-merging) with array payloads +to accumulate multiple events into a single batch job. When both the existing +and new payloads are arrays (and `job_key_mode` is not `unsafe_dedupe`), they +are concatenated automatically. + +::: + ## `helpers` ### `helpers.abortPromise`