Skip to content

Commit 61942e2

Browse files
committed
feat: allow passing job opts
1 parent c59f520 commit 61942e2

2 files changed

Lines changed: 24 additions & 36 deletions

File tree

src/index.ts

Lines changed: 16 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type {
33
FlowJob as BullFlowJob,
44
Job as BullJob,
55
QueueOptions as BullQueueOptions,
6+
FlowJob,
67
} from 'bullmq'
78
import type {
89
DefineFlowOptions,
@@ -21,19 +22,21 @@ import IORedis from 'ioredis'
2122
import { mapValues } from 'remeda'
2223
import { FlowBuilder, flowSymbol, jobSymbol, StandardSchemaV1Error } from './types'
2324

25+
export { RateLimitError, UnrecoverableError } from 'bullmq'
26+
2427
export function defineJob<Schema extends StandardSchemaV1, Output>(
2528
opts: DefineJobOptions<Schema, Output>,
2629
): Job<Schema, Output> {
2730
return {
2831
...opts,
2932
[jobSymbol]: <Job<Schema, Output>[typeof jobSymbol]>{
30-
async addToQueue(queue, payload) {
33+
async addToQueue(queue, payload, jobOpts) {
3134
const parsed = await opts.schema['~standard'].validate(payload)
3235
if (parsed.issues) throw new StandardSchemaV1Error(parsed.issues)
3336

34-
return queue.add(queue.name, parsed.value)
37+
return queue.add(queue.name, parsed.value, jobOpts)
3538
},
36-
async addToQueueBulk(queue, payloads) {
39+
async addToQueueBulk(queue, payloads, jobOpts) {
3740
return queue.addBulk(
3841
await Promise.all(
3942
payloads.map(async (payload) => {
@@ -43,6 +46,7 @@ export function defineJob<Schema extends StandardSchemaV1, Output>(
4346
return {
4447
name: queue.name,
4548
data: parsed.value,
49+
opts: jobOpts,
4650
}
4751
}),
4852
),
@@ -57,6 +61,7 @@ function buildFlowJobStack(opts: {
5761
rootInputPayload: unknown
5862
steps: FlowStep<any, any>[]
5963
flowName: string
64+
flowOpts?: FlowJob['opts']
6065
}) {
6166
if (opts.steps.length === 0) throw new Error(`Flow ${opts.flowName} has no steps`)
6267
const firstStep = opts.steps[0]!
@@ -65,21 +70,19 @@ function buildFlowJobStack(opts: {
6570
name: `${opts.flowName}_${firstStep.name}`,
6671
queueName: `${opts.flowName}_${firstStep.name}`,
6772
data: opts.rootInputPayload,
73+
opts: opts.flowOpts,
6874
}
6975

7076
for (const step of opts.steps.slice(1)) {
7177
currentStep = {
7278
name: `${opts.flowName}_${step.name}`,
7379
queueName: `${opts.flowName}_${step.name}`,
7480
children: [currentStep],
81+
opts: opts.flowOpts,
7582
}
7683
}
7784

78-
return {
79-
name: `${opts.flowName}`,
80-
queueName: `${opts.flowName}`,
81-
children: [currentStep],
82-
} satisfies BullFlowJob
85+
return currentStep
8386
}
8487

8588
export function defineFlow<Schema extends StandardSchemaV1, Output>(
@@ -90,7 +93,7 @@ export function defineFlow<Schema extends StandardSchemaV1, Output>(
9093
...opts,
9194
[flowSymbol]: <Flow<Schema, Output>[typeof flowSymbol]>{
9295
steps,
93-
async addToQueue(flowName, flowProducer, payload) {
96+
async addToQueue(flowName, flowProducer, payload, flowOpts) {
9497
const parsed = await opts.schema['~standard'].validate(payload)
9598
if (parsed.issues) throw new StandardSchemaV1Error(parsed.issues)
9699

@@ -100,9 +103,10 @@ export function defineFlow<Schema extends StandardSchemaV1, Output>(
100103
steps,
101104
flowName,
102105
}),
106+
flowOpts,
103107
)
104108
},
105-
async addToQueueBulk(flowName, flowProducer, payloads) {
109+
async addToQueueBulk(flowName, flowProducer, payloads, flowOpts) {
106110
return flowProducer.addBulk(
107111
await Promise.all(
108112
payloads.map(async (payload) => {
@@ -113,6 +117,7 @@ export function defineFlow<Schema extends StandardSchemaV1, Output>(
113117
flowName,
114118
rootInputPayload: parsed.value,
115119
steps,
120+
flowOpts,
116121
})
117122
}),
118123
),
@@ -242,32 +247,6 @@ export async function startWorkers<J extends JobDefinitionsObject>(
242247
} else if (flowSymbol in value) {
243248
const flowName = fullPath.join('-')
244249

245-
const worker = new BullWorker(
246-
flowName,
247-
async (job) => {
248-
// eslint-disable-next-line ts/no-unsafe-return
249-
const results = await job.getChildrenValues().then((res) => Object.values(res))
250-
if (results.length !== 1)
251-
throw new Error('Flow root job should have exactly one child job')
252-
253-
// eslint-disable-next-line ts/no-unsafe-return
254-
return results[0]
255-
},
256-
{
257-
...opts,
258-
...value.workerOptions,
259-
connection,
260-
},
261-
)
262-
263-
const hooks = value.workerOptions?.hooks ?? opts?.hooks
264-
if (hooks)
265-
for (const [hookName, hook] of Object.entries(hooks)) {
266-
worker.addListener(hookName, hook)
267-
}
268-
269-
workers.set(flowName, worker)
270-
271250
// add workers for each step
272251
for (const step of value[flowSymbol].steps) {
273252
const jobName = `${flowName}_${step.name}`
@@ -295,6 +274,7 @@ export async function startWorkers<J extends JobDefinitionsObject>(
295274
},
296275
)
297276

277+
const hooks = value.workerOptions?.hooks ?? opts?.hooks
298278
if (hooks)
299279
for (const [hookName, hook] of Object.entries(hooks)) {
300280
stepWorker.addListener(hookName, hook)

src/types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import type { StandardSchemaV1 } from '@standard-schema/spec'
22
import type {
3+
BulkJobOptions,
34
Job as BullJob,
45
Queue as BullQueue,
56
WorkerListener as BullWorkerListener,
67
WorkerOptions as BullWorkerOptions,
8+
FlowJob,
9+
FlowOpts,
710
FlowProducer,
811
JobNode,
12+
JobsOptions,
913
} from 'bullmq'
1014

1115
export class StandardSchemaV1Error extends Error {
@@ -35,10 +39,12 @@ export type Job<Schema extends StandardSchemaV1, Output> = DefineJobOptions<Sche
3539
addToQueue: (
3640
queue: BullQueue<BullJob<StandardSchemaV1.InferOutput<Schema>, Output, string>>,
3741
payload: StandardSchemaV1.InferInput<Schema>,
42+
opts?: JobsOptions,
3843
) => Promise<BullJob<StandardSchemaV1.InferOutput<Schema>, Output, string>>
3944
addToQueueBulk: (
4045
queue: BullQueue<BullJob<StandardSchemaV1.InferOutput<Schema>, Output, string>>,
4146
payloads: StandardSchemaV1.InferInput<Schema>[],
47+
opts?: BulkJobOptions,
4248
) => Promise<BullJob<StandardSchemaV1.InferOutput<Schema>, Output, string>[]>
4349
}
4450
}
@@ -86,11 +92,13 @@ export type Flow<Schema extends StandardSchemaV1, Output> = DefineFlowOptions<Sc
8692
flowName: string,
8793
flowProducer: FlowProducer,
8894
payload: StandardSchemaV1.InferInput<Schema>,
95+
opts?: FlowOpts,
8996
) => Promise<JobNode>
9097
addToQueueBulk: (
9198
flowName: string,
9299
flowProducer: FlowProducer,
93100
payloads: StandardSchemaV1.InferInput<Schema>[],
101+
opts?: FlowJob['opts'],
94102
) => Promise<JobNode[]>
95103
}
96104
}

0 commit comments

Comments
 (0)