Skip to content

Commit 407674e

Browse files
committed
refactor: rename defineQueues to defineJobs & remove proxy
1 parent 53a3df0 commit 407674e

5 files changed

Lines changed: 63 additions & 60 deletions

File tree

README.md

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ npm add @falcondev-oss/queue
1313
#### 1. Define your job schemas and handlers
1414

1515
```ts
16-
const jobs = {
16+
const jobs = defineJobs({
1717
video: {
1818
process: defineJob({
1919
schema: z.object({
@@ -39,32 +39,26 @@ const jobs = {
3939
// send email to `payload.to` with `payload.subject` and `payload.body`
4040
},
4141
}),
42-
}
43-
```
44-
45-
#### 2. Define queues
46-
47-
```ts
48-
const queues = defineQueues(jobs)
42+
})
4943
```
5044

51-
#### 3. Start workers
45+
#### 2. Start workers
5246

5347
```ts
5448
await startWorkers(jobs)
5549
```
5650

57-
#### 4. Queue jobs
51+
#### 3. Queue jobs
5852

5953
Single jobs:
6054

6155
```ts
62-
queues.video.process.queue({
56+
await jobs.video.process.queue({
6357
// payload type inferred from schema
6458
outputFormat: 'mp4',
6559
path: '/path/to/video.mov',
6660
})
67-
queues.sendEmail.queue({
61+
await jobs.sendEmail.queue({
6862
to: 'user@example.com',
6963
subject: 'Hello',
7064
body: 'This is a test email',
@@ -74,7 +68,7 @@ queues.sendEmail.queue({
7468
Bulk jobs:
7569

7670
```ts
77-
queues.sendEmail.queueBulk([
71+
await jobs.sendEmail.queueBulk([
7872
{
7973
to: 'user@example.com',
8074
subject: 'Hello',

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
"@standard-schema/spec": "^1.0.0",
4545
"@types/node": "^24.5.2",
4646
"bullmq": "^5.58.7",
47-
"ioredis": "^5.7.0"
47+
"ioredis": "^5.7.0",
48+
"remeda": "^2.32.0"
4849
},
4950
"devDependencies": {
5051
"@falcondev-oss/configs": "^5.0.2",

pnpm-lock.yaml

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/index.ts

Lines changed: 32 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,17 @@ import type {
88
DefineFlowOptions,
99
DefineJobOptions,
1010
Flow,
11+
FlowAccessor,
1112
FlowStep,
1213
Job,
14+
JobAccessor,
1315
JobDefinitionsObject,
14-
QueueFlowProxyAccessor,
15-
QueueJobProxy,
16-
QueueJobProxyAccessor,
16+
Jobs,
1717
WorkerOptions,
1818
} from './types'
1919
import { Queue as BullQueue, Worker as BullWorker, FlowProducer } from 'bullmq'
2020
import IORedis from 'ioredis'
21+
import { mapValues } from 'remeda'
2122
import { FlowBuilder, flowSymbol, jobSymbol, StandardSchemaV1Error } from './types'
2223

2324
export function defineJob<Schema extends StandardSchemaV1, Output>(
@@ -121,7 +122,7 @@ export function defineFlow<Schema extends StandardSchemaV1, Output>(
121122
}
122123
}
123124

124-
export function defineQueues<J extends JobDefinitionsObject>(jobs: J, opts?: BullQueueOptions) {
125+
export function defineJobs<J extends JobDefinitionsObject>(jobs: J, opts?: BullQueueOptions) {
125126
const connection =
126127
opts?.connection ??
127128
new IORedis({
@@ -162,49 +163,40 @@ export function defineQueues<J extends JobDefinitionsObject>(jobs: J, opts?: Bul
162163
return flowProducer
163164
}
164165

165-
function createProxy(obj: JobDefinitionsObject, path: string[]) {
166-
return new Proxy(obj, {
167-
get(target, p, receiver) {
168-
if (typeof p !== 'string') return
166+
function traverse(obj: JobDefinitionsObject, path: string[]): Jobs<any> {
167+
return mapValues(obj, (jobOrJobs, p) => {
168+
if (typeof jobOrJobs !== 'object') throw new Error('Job definition must be an object')
169169

170-
const jobOrJobs = Reflect.get(target, p, receiver)
171-
if (typeof jobOrJobs !== 'object') return
170+
const fullPath = [...path, p]
172171

173-
const fullPath = [...path, p]
174-
175-
if (jobSymbol in jobOrJobs) {
176-
const jobName = fullPath.join('-')
172+
if (jobSymbol in jobOrJobs) {
173+
const jobName = fullPath.join('-')
177174

178-
return {
179-
queue: async (payload: unknown) => {
180-
return jobOrJobs[jobSymbol].addToQueue(await getQueue(jobName), payload)
181-
},
182-
queueBulk: async (payloads: unknown[]) => {
183-
return jobOrJobs[jobSymbol].addToQueueBulk(await getQueue(jobName), payloads)
184-
},
185-
} satisfies QueueJobProxyAccessor<any, any>
186-
} else if (flowSymbol in jobOrJobs) {
187-
const flowName = fullPath.join('-')
175+
return {
176+
queue: async (payload: unknown) => {
177+
return jobOrJobs[jobSymbol].addToQueue(await getQueue(jobName), payload)
178+
},
179+
queueBulk: async (payloads: unknown[]) => {
180+
return jobOrJobs[jobSymbol].addToQueueBulk(await getQueue(jobName), payloads)
181+
},
182+
} satisfies JobAccessor<any, any>
183+
} else if (flowSymbol in jobOrJobs) {
184+
const flowName = fullPath.join('-')
188185

189-
return {
190-
queue: async (payload: unknown) => {
191-
return jobOrJobs[flowSymbol].addToQueue(flowName, await getFlowProducer(), payload)
192-
},
193-
queueBulk: async (payloads: unknown[]) => {
194-
return jobOrJobs[flowSymbol].addToQueueBulk(
195-
flowName,
196-
await getFlowProducer(),
197-
payloads,
198-
)
199-
},
200-
} satisfies QueueFlowProxyAccessor<any>
201-
}
186+
return {
187+
queue: async (payload: unknown) => {
188+
return jobOrJobs[flowSymbol].addToQueue(flowName, await getFlowProducer(), payload)
189+
},
190+
queueBulk: async (payloads: unknown[]) => {
191+
return jobOrJobs[flowSymbol].addToQueueBulk(flowName, await getFlowProducer(), payloads)
192+
},
193+
} satisfies FlowAccessor<any>
194+
}
202195

203-
return createProxy(jobOrJobs, fullPath)
204-
},
196+
return traverse(jobOrJobs, fullPath)
205197
})
206198
}
207-
return createProxy(jobs, []) as unknown as QueueJobProxy<J>
199+
return traverse(jobs, []) as Jobs<J>
208200
}
209201

210202
export async function startWorkers<J extends JobDefinitionsObject>(

src/types.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,25 +99,25 @@ export type JobDefinitionsObject = {
9999
[key: string]: Job<any, any> | Flow<any, any> | JobDefinitionsObject
100100
}
101101

102-
export type QueueJobProxyAccessor<S extends StandardSchemaV1, Output> = {
102+
export type JobAccessor<S extends StandardSchemaV1, Output> = {
103103
queue: (
104104
payload: StandardSchemaV1.InferInput<S>,
105105
) => Promise<BullJob<StandardSchemaV1.InferOutput<S>, Output, string>>
106106
queueBulk: (
107107
payloads: StandardSchemaV1.InferInput<S>[],
108108
) => Promise<BullJob<StandardSchemaV1.InferOutput<S>, Output, string>[]>
109109
}
110-
export type QueueFlowProxyAccessor<S extends StandardSchemaV1> = {
110+
export type FlowAccessor<S extends StandardSchemaV1> = {
111111
queue: (payload: StandardSchemaV1.InferInput<S>) => Promise<JobNode>
112112
queueBulk: (payloads: StandardSchemaV1.InferInput<S>[]) => Promise<JobNode[]>
113113
}
114-
export type QueueJobProxy<J extends JobDefinitionsObject> = {
114+
export type Jobs<J extends JobDefinitionsObject> = {
115115
[K in keyof J]: J[K] extends Job<infer S, infer O>
116-
? QueueJobProxyAccessor<S, O>
116+
? JobAccessor<S, O>
117117
: J[K] extends Flow<infer S, any>
118-
? QueueFlowProxyAccessor<S>
118+
? FlowAccessor<S>
119119
: J[K] extends JobDefinitionsObject
120-
? QueueJobProxy<J[K]>
120+
? Jobs<J[K]>
121121
: never
122122
}
123123

0 commit comments

Comments
 (0)