Skip to content

Commit 33d1342

Browse files
committed
remove worker code
1 parent c61cbb0 commit 33d1342

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+148
-5158
lines changed

README.md

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,6 @@ docker compose -f docker-compose.prod.yml up -d
7474

7575
Open [http://localhost:3000](http://localhost:3000)
7676

77-
#### Background worker note
78-
79-
The Docker Compose stack starts a dedicated worker container by default. If `REDIS_URL` is not configured, the worker will start, log that it is idle, and do no queue processing. This is expected. Queue-backed API, webhook, and schedule execution requires Redis; installs without Redis continue to use the inline execution path.
80-
8177
Sim also supports local models via [Ollama](https://ollama.ai) and [vLLM](https://docs.vllm.ai/) — see the [Docker self-hosting docs](https://docs.sim.ai/self-hosting/docker) for setup details.
8278

8379
### Self-hosted: Manual Setup
@@ -123,12 +119,10 @@ cd packages/db && bun run db:migrate
123119
5. Start development servers:
124120

125121
```bash
126-
bun run dev:full # Starts Next.js app, realtime socket server, and the BullMQ worker
122+
bun run dev:full # Starts Next.js app and realtime socket server
127123
```
128124

129-
If `REDIS_URL` is not configured, the worker will remain idle and execution continues inline.
130-
131-
Or run separately: `bun run dev` (Next.js), `cd apps/sim && bun run dev:sockets` (realtime), and `cd apps/sim && bun run worker` (BullMQ worker).
125+
Or run separately: `bun run dev` (Next.js) and `cd apps/sim && bun run dev:sockets` (realtime).
132126

133127
## Copilot API Keys
134128

apps/sim/app/api/jobs/[jobId]/route.test.ts

Lines changed: 17 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
66

77
const {
88
mockCheckHybridAuth,
9-
mockGetDispatchJobRecord,
109
mockGetJobQueue,
1110
mockVerifyWorkflowAccess,
1211
mockGetWorkflowById,
12+
mockGetJob,
1313
} = vi.hoisted(() => ({
1414
mockCheckHybridAuth: vi.fn(),
15-
mockGetDispatchJobRecord: vi.fn(),
1615
mockGetJobQueue: vi.fn(),
1716
mockVerifyWorkflowAccess: vi.fn(),
1817
mockGetWorkflowById: vi.fn(),
18+
mockGetJob: vi.fn(),
1919
}))
2020

2121
vi.mock('@sim/logger', () => ({
@@ -32,19 +32,9 @@ vi.mock('@/lib/auth/hybrid', () => ({
3232
}))
3333

3434
vi.mock('@/lib/core/async-jobs', () => ({
35-
JOB_STATUS: {
36-
PENDING: 'pending',
37-
PROCESSING: 'processing',
38-
COMPLETED: 'completed',
39-
FAILED: 'failed',
40-
},
4135
getJobQueue: mockGetJobQueue,
4236
}))
4337

44-
vi.mock('@/lib/core/workspace-dispatch/store', () => ({
45-
getDispatchJobRecord: mockGetDispatchJobRecord,
46-
}))
47-
4838
vi.mock('@/lib/core/utils/request', () => ({
4939
generateRequestId: vi.fn().mockReturnValue('request-1'),
5040
}))
@@ -85,71 +75,51 @@ describe('GET /api/jobs/[jobId]', () => {
8575
})
8676

8777
mockGetJobQueue.mockResolvedValue({
88-
getJob: vi.fn().mockResolvedValue(null),
78+
getJob: mockGetJob,
8979
})
9080
})
9181

92-
it('returns dispatcher-aware waiting status with metadata', async () => {
93-
mockGetDispatchJobRecord.mockResolvedValue({
94-
id: 'dispatch-1',
95-
workspaceId: 'workspace-1',
96-
lane: 'runtime',
97-
queueName: 'workflow-execution',
98-
bullmqJobName: 'workflow-execution',
99-
bullmqPayload: {},
82+
it('returns job status with metadata', async () => {
83+
mockGetJob.mockResolvedValue({
84+
id: 'job-1',
85+
status: 'pending',
10086
metadata: {
10187
workflowId: 'workflow-1',
10288
},
103-
priority: 10,
104-
status: 'waiting',
105-
createdAt: 1000,
106-
admittedAt: 2000,
10789
})
10890

10991
const response = await GET(createMockRequest(), {
110-
params: Promise.resolve({ jobId: 'dispatch-1' }),
92+
params: Promise.resolve({ jobId: 'job-1' }),
11193
})
11294
const body = await response.json()
11395

11496
expect(response.status).toBe(200)
115-
expect(body.status).toBe('waiting')
116-
expect(body.metadata.queueName).toBe('workflow-execution')
117-
expect(body.metadata.lane).toBe('runtime')
118-
expect(body.metadata.workspaceId).toBe('workspace-1')
97+
expect(body.status).toBe('pending')
98+
expect(body.metadata.workflowId).toBe('workflow-1')
11999
})
120100

121-
it('returns completed output from dispatch state', async () => {
122-
mockGetDispatchJobRecord.mockResolvedValue({
123-
id: 'dispatch-2',
124-
workspaceId: 'workspace-1',
125-
lane: 'interactive',
126-
queueName: 'workflow-execution',
127-
bullmqJobName: 'direct-workflow-execution',
128-
bullmqPayload: {},
101+
it('returns completed output from job', async () => {
102+
mockGetJob.mockResolvedValue({
103+
id: 'job-2',
104+
status: 'completed',
129105
metadata: {
130106
workflowId: 'workflow-1',
131107
},
132-
priority: 1,
133-
status: 'completed',
134-
createdAt: 1000,
135-
startedAt: 2000,
136-
completedAt: 7000,
137108
output: { success: true },
138109
})
139110

140111
const response = await GET(createMockRequest(), {
141-
params: Promise.resolve({ jobId: 'dispatch-2' }),
112+
params: Promise.resolve({ jobId: 'job-2' }),
142113
})
143114
const body = await response.json()
144115

145116
expect(response.status).toBe(200)
146117
expect(body.status).toBe('completed')
147118
expect(body.output).toEqual({ success: true })
148-
expect(body.metadata.duration).toBe(5000)
149119
})
150120

151-
it('returns 404 when neither dispatch nor BullMQ job exists', async () => {
152-
mockGetDispatchJobRecord.mockResolvedValue(null)
121+
it('returns 404 when job does not exist', async () => {
122+
mockGetJob.mockResolvedValue(null)
153123

154124
const response = await GET(createMockRequest(), {
155125
params: Promise.resolve({ jobId: 'missing-job' }),

apps/sim/app/api/jobs/[jobId]/route.ts

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ import { type NextRequest, NextResponse } from 'next/server'
33
import { checkHybridAuth } from '@/lib/auth/hybrid'
44
import { getJobQueue } from '@/lib/core/async-jobs'
55
import { generateRequestId } from '@/lib/core/utils/request'
6-
import { presentDispatchOrJobStatus } from '@/lib/core/workspace-dispatch/status'
7-
import { getDispatchJobRecord } from '@/lib/core/workspace-dispatch/store'
86
import { createErrorResponse } from '@/app/api/workflows/utils'
97

108
const logger = createLogger('TaskStatusAPI')
@@ -25,15 +23,14 @@ export async function GET(
2523

2624
const authenticatedUserId = authResult.userId
2725

28-
const dispatchJob = await getDispatchJobRecord(taskId)
2926
const jobQueue = await getJobQueue()
30-
const job = dispatchJob ? null : await jobQueue.getJob(taskId)
27+
const job = await jobQueue.getJob(taskId)
3128

32-
if (!job && !dispatchJob) {
29+
if (!job) {
3330
return createErrorResponse('Task not found', 404)
3431
}
3532

36-
const metadataToCheck = dispatchJob?.metadata ?? job?.metadata
33+
const metadataToCheck = job.metadata
3734

3835
if (metadataToCheck?.workflowId) {
3936
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
@@ -61,25 +58,22 @@ export async function GET(
6158
return createErrorResponse('Access denied', 403)
6259
}
6360

64-
const presented = presentDispatchOrJobStatus(dispatchJob, job)
65-
const response: any = {
61+
const response: Record<string, unknown> = {
6662
success: true,
6763
taskId,
68-
status: presented.status,
69-
metadata: presented.metadata,
64+
status: job.status,
65+
metadata: job.metadata,
7066
}
7167

72-
if (presented.output !== undefined) response.output = presented.output
73-
if (presented.error !== undefined) response.error = presented.error
74-
if (presented.estimatedDuration !== undefined) {
75-
response.estimatedDuration = presented.estimatedDuration
76-
}
68+
if (job.output !== undefined) response.output = job.output
69+
if (job.error !== undefined) response.error = job.error
7770

7871
return NextResponse.json(response)
79-
} catch (error: any) {
72+
} catch (error: unknown) {
73+
const errorMessage = error instanceof Error ? error.message : String(error)
8074
logger.error(`[${requestId}] Error fetching task status:`, error)
8175

82-
if (error.message?.includes('not found') || error.status === 404) {
76+
if (errorMessage?.includes('not found')) {
8377
return createErrorResponse('Task not found', 404)
8478
}
8579

apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import { createLogger } from '@sim/logger'
22
import { type NextRequest, NextResponse } from 'next/server'
33
import { AuthType } from '@/lib/auth/hybrid'
4-
import { getJobQueue, shouldUseBullMQ } from '@/lib/core/async-jobs'
5-
import { createBullMQJobData } from '@/lib/core/bullmq'
4+
import { getJobQueue } from '@/lib/core/async-jobs'
65
import { generateRequestId } from '@/lib/core/utils/request'
76
import { SSE_HEADERS } from '@/lib/core/utils/sse'
87
import { getBaseUrl } from '@/lib/core/utils/urls'
98
import { generateId } from '@/lib/core/utils/uuid'
10-
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
119
import { setExecutionMeta } from '@/lib/execution/event-buffer'
1210
import { preprocessExecution } from '@/lib/execution/preprocessing'
1311
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
@@ -227,26 +225,10 @@ export async function POST(
227225

228226
let jobId: string
229227
try {
230-
const useBullMQ = shouldUseBullMQ()
231-
if (useBullMQ) {
232-
jobId = await enqueueWorkspaceDispatch({
233-
id: enqueueResult.resumeExecutionId,
234-
workspaceId: workflow.workspaceId,
235-
lane: 'runtime',
236-
queueName: 'resume-execution',
237-
bullmqJobName: 'resume-execution',
238-
bullmqPayload: createBullMQJobData(resumePayload, {
239-
workflowId,
240-
userId,
241-
}),
242-
metadata: { workflowId, userId },
243-
})
244-
} else {
245-
const jobQueue = await getJobQueue()
246-
jobId = await jobQueue.enqueue('resume-execution', resumePayload, {
247-
metadata: { workflowId, workspaceId: workflow.workspaceId, userId },
248-
})
249-
}
228+
const jobQueue = await getJobQueue()
229+
jobId = await jobQueue.enqueue('resume-execution', resumePayload, {
230+
metadata: { workflowId, workspaceId: workflow.workspaceId, userId },
231+
})
250232
logger.info('Enqueued async resume execution', {
251233
jobId,
252234
resumeExecutionId: enqueueResult.resumeExecutionId,

apps/sim/app/api/schedules/execute/route.test.ts

Lines changed: 21 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ const {
1414
mockDbReturning,
1515
mockDbUpdate,
1616
mockEnqueue,
17-
mockEnqueueWorkspaceDispatch,
1817
mockStartJob,
1918
mockCompleteJob,
2019
mockMarkJobFailed,
@@ -24,7 +23,6 @@ const {
2423
const mockDbSet = vi.fn().mockReturnValue({ where: mockDbWhere })
2524
const mockDbUpdate = vi.fn().mockReturnValue({ set: mockDbSet })
2625
const mockEnqueue = vi.fn().mockResolvedValue('job-id-1')
27-
const mockEnqueueWorkspaceDispatch = vi.fn().mockResolvedValue('job-id-1')
2826
const mockStartJob = vi.fn().mockResolvedValue(undefined)
2927
const mockCompleteJob = vi.fn().mockResolvedValue(undefined)
3028
const mockMarkJobFailed = vi.fn().mockResolvedValue(undefined)
@@ -42,7 +40,6 @@ const {
4240
mockDbReturning,
4341
mockDbUpdate,
4442
mockEnqueue,
45-
mockEnqueueWorkspaceDispatch,
4643
mockStartJob,
4744
mockCompleteJob,
4845
mockMarkJobFailed,
@@ -75,15 +72,6 @@ vi.mock('@/lib/core/async-jobs', () => ({
7572
shouldExecuteInline: vi.fn().mockReturnValue(false),
7673
}))
7774

78-
vi.mock('@/lib/core/bullmq', () => ({
79-
isBullMQEnabled: vi.fn().mockReturnValue(true),
80-
createBullMQJobData: vi.fn((payload: unknown) => ({ payload })),
81-
}))
82-
83-
vi.mock('@/lib/core/workspace-dispatch', () => ({
84-
enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch,
85-
}))
86-
8775
vi.mock('@/lib/workflows/utils', () => ({
8876
getWorkflowById: vi.fn().mockResolvedValue({
8977
id: 'workflow-1',
@@ -175,8 +163,6 @@ const SINGLE_JOB = [
175163
cronExpression: '0 * * * *',
176164
failedCount: 0,
177165
lastQueuedAt: undefined,
178-
sourceUserId: 'user-1',
179-
sourceWorkspaceId: 'workspace-1',
180166
sourceType: 'job',
181167
},
182168
]
@@ -250,56 +236,48 @@ describe('Scheduled Workflow Execution API Route', () => {
250236
expect(data).toHaveProperty('executedCount', 2)
251237
})
252238

253-
it('should queue mothership jobs to BullMQ when available', async () => {
239+
it('should execute mothership jobs inline', async () => {
254240
mockDbReturning.mockReturnValueOnce([]).mockReturnValueOnce(SINGLE_JOB)
255241

256242
const response = await GET(createMockRequest())
257243

258244
expect(response.status).toBe(200)
259-
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
245+
expect(mockExecuteJobInline).toHaveBeenCalledWith(
260246
expect.objectContaining({
261-
workspaceId: 'workspace-1',
262-
lane: 'runtime',
263-
queueName: 'mothership-job-execution',
264-
bullmqJobName: 'mothership-job-execution',
265-
bullmqPayload: {
266-
payload: {
267-
scheduleId: 'job-1',
268-
cronExpression: '0 * * * *',
269-
failedCount: 0,
270-
now: expect.any(String),
271-
},
272-
},
247+
scheduleId: 'job-1',
248+
cronExpression: '0 * * * *',
249+
failedCount: 0,
250+
now: expect.any(String),
273251
})
274252
)
275-
expect(mockExecuteJobInline).not.toHaveBeenCalled()
276253
})
277254

278-
it('should enqueue preassigned correlation metadata for schedules', async () => {
279-
mockDbReturning.mockReturnValue(SINGLE_SCHEDULE)
255+
it('should enqueue schedule with correlation metadata via job queue', async () => {
256+
mockDbReturning.mockReturnValueOnce(SINGLE_SCHEDULE).mockReturnValueOnce([])
280257

281258
const response = await GET(createMockRequest())
282259

283260
expect(response.status).toBe(200)
284-
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
261+
expect(mockEnqueue).toHaveBeenCalledWith(
262+
'schedule-execution',
263+
expect.objectContaining({
264+
scheduleId: 'schedule-1',
265+
workflowId: 'workflow-1',
266+
executionId: 'schedule-execution-1',
267+
requestId: 'test-request-id',
268+
}),
285269
expect.objectContaining({
286-
id: 'schedule-execution-1',
287-
workspaceId: 'workspace-1',
288-
lane: 'runtime',
289-
queueName: 'schedule-execution',
290-
bullmqJobName: 'schedule-execution',
291-
metadata: {
270+
metadata: expect.objectContaining({
292271
workflowId: 'workflow-1',
293-
correlation: {
272+
workspaceId: 'workspace-1',
273+
correlation: expect.objectContaining({
294274
executionId: 'schedule-execution-1',
295275
requestId: 'test-request-id',
296276
source: 'schedule',
297277
workflowId: 'workflow-1',
298278
scheduleId: 'schedule-1',
299-
triggerType: 'schedule',
300-
scheduledFor: '2025-01-01T00:00:00.000Z',
301-
},
302-
},
279+
}),
280+
}),
303281
})
304282
)
305283
})

0 commit comments

Comments
 (0)