Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 69 additions & 8 deletions website/docs/job-key.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,33 @@ SELECT graphile_worker.add_job('send_email', '{"count": 2}', job_key := 'abc');
COMMIT;
```

If both the previous and new versions of the job use array payloads they will be
merged; for example the `process_invoices` job will run once with the payload
`[{id: 42},{id: 67}]`:

```sql
BEGIN;
SELECT graphile_worker.add_job('process_invoices', '[{"id": 42}]', job_key := 'inv');
SELECT graphile_worker.add_job('process_invoices', '[{"id": 67}]', job_key := 'inv');
COMMIT;
```

In all cases if no match is found then a new job will be created.

### `job_key_mode`

Behavior when an existing job with the same job key is found is controlled by
the `job_key_mode` setting:

- `replace` (default) - overwrites the unlocked job with the new values. This is
primarily useful for rescheduling, updating, or **debouncing** (delaying
execution until there have been no events for at least a certain time period).
Locked jobs will cause a new job to be scheduled instead.
- `preserve_run_at` - overwrites the unlocked job with the new values, but
preserves `run_at`. This is primarily useful for **throttling** (executing at
most once over a given time period). Locked jobs will cause a new job to be
scheduled instead.
- `replace` (default) - overwrites the unlocked job with the new values (merging
array payloads). This is primarily useful for rescheduling, updating, or
**debouncing** (delaying execution until there have been no events for at
least a certain time period). Locked jobs will cause a new job to be scheduled
instead.
- `preserve_run_at` - overwrites the unlocked job with the new values (merging
array payloads), but preserves `run_at`. This is primarily useful for
**throttling** (executing at most once over a given time period). Locked jobs
will cause a new job to be scheduled instead.
- `unsafe_dedupe` - if an existing job is found, even if it is locked or
permanently failed, then it won't be updated. This is very dangerous as it
means that the event that triggered this `add_job` call may not result in any
Expand Down Expand Up @@ -71,6 +83,49 @@ The full `job_key_mode` algorithm is roughly as follows:
- Otherwise:
- the job will have all its attributes updated to their new values.

### Array payload merging

When updating an existing job via `job_key` (except in `unsafe_dedupe` mode), if
both the existing job's payload and the new payload are JSON arrays, they will
be concatenated rather than overwritten. This enables a batching pattern where
multiple events can be accumulated into a single job for more efficient
execution; see [Handling batch jobs](./tasks.md#handling-batch-jobs) for more
info.

```sql
-- First call creates job with payload: [{"id": 1}]
SELECT graphile_worker.add_job(
'process_events',
'[{"id": 1}]'::json,
job_key := 'my_batch',
job_key_mode := 'preserve_run_at',
run_at := NOW() + INTERVAL '10 seconds'
);

-- Second call (before job runs) merges to: [{"id": 1}, {"id": 2}]
SELECT graphile_worker.add_job(
'process_events',
'[{"id": 2}]'::json,
job_key := 'my_batch',
job_key_mode := 'preserve_run_at',
run_at := NOW() + INTERVAL '10 seconds'
);
```

Combined with `preserve_run_at` job_key_mode, this creates a fixed batching
window: the job runs at the originally scheduled time with all accumulated
payloads merged together. With the default `replace` job_key_mode, each new
event would push the `run_at` forward, creating a rolling/debounce window
instead.

:::caution Both payloads must be arrays for merging to occur

If **either** payload is not an array (e.g., one is an object, as is the default
if no payload is specified), the standard replace behavior applies and the old
payload will be lost.

:::

## Removing jobs

Pending jobs may also be removed using `job_key`:
Expand Down Expand Up @@ -98,3 +153,9 @@ prevented from running again, and will have the `job_key` removed from it.)

Calling `remove_job` for a locked (i.e. running) job will not actually remove
it, but will prevent it from running again on failure.

There's currently a race condition in adding jobs with a job key which means
under very high contention of a specific key an `add_job` may fail and return
`null`. You should check for this `null` and handle it appropriately: retrying,
throwing an error, or however else makes sense to your code. See:
https://github.com/graphile/worker/issues/580 for more details.
9 changes: 9 additions & 0 deletions website/docs/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ any of these promises reject, then the job will be re-enqueued, but the payload
will be overwritten to only contain the entries associated with the rejected
promises — i.e. the successful entries will be removed.

:::tip Accumulating batch payloads with job_key

You can use [`job_key`](./job-key.md#array-payload-merging) with array payloads
to accumulate multiple events into a single batch job. When both the existing
and new payloads are arrays (and `job_key_mode` is not `unsafe_dedupe`), they
are concatenated automatically.

:::

## `helpers`

### `helpers.abortPromise`
Expand Down