Skip to content

Commit d3103b7

Browse files
committed
address comments
1 parent 56631e9 commit d3103b7

10 files changed

Lines changed: 17384 additions & 32 deletions

File tree

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ const JOB_CHUNK_SIZE = 100
4141
const MAX_TICK_DURATION_MS = 3 * 60 * 1000
4242
const STALE_SCHEDULE_CLAIM_MS = getMaxExecutionTimeout()
4343
const STALE_SCHEDULE_RECOVERY_BATCH_SIZE = 100
44-
let databaseScheduleStartQueue: Promise<void> = Promise.resolve()
44+
const DATABASE_SCHEDULE_START_TURN_WAIT_MS = 1_000
45+
type DatabaseScheduleStartResult = 'started' | 'capacity_full' | 'not_pending'
46+
let databaseScheduleStartTurn: Promise<void> | null = null
4547

4648
const dueFilter = (queuedAt: Date) =>
4749
and(
@@ -68,18 +70,30 @@ const workflowScheduleFilter = (queuedAt: Date) =>
6870
const jobScheduleFilter = (queuedAt: Date) =>
6971
and(dueFilter(queuedAt), sql`${workflowSchedule.sourceType} = 'job'`)
7072

71-
async function runWithDatabaseScheduleStartTurn<T>(operation: () => Promise<T>): Promise<T> {
72-
const previousTurn = databaseScheduleStartQueue
73+
async function runWithDatabaseScheduleStartTurn(
74+
operation: () => Promise<DatabaseScheduleStartResult>
75+
): Promise<DatabaseScheduleStartResult> {
76+
const activeTurn = databaseScheduleStartTurn
77+
if (activeTurn) {
78+
const turnOpened = await Promise.race([
79+
activeTurn.then(() => true),
80+
sleep(DATABASE_SCHEDULE_START_TURN_WAIT_MS).then(() => false),
81+
])
82+
if (!turnOpened || databaseScheduleStartTurn) return 'capacity_full'
83+
}
84+
7385
let releaseTurn = () => {}
74-
databaseScheduleStartQueue = new Promise<void>((resolve) => {
86+
const currentTurn = new Promise<void>((resolve) => {
7587
releaseTurn = resolve
7688
})
77-
78-
await previousTurn
89+
databaseScheduleStartTurn = currentTurn
7990

8091
try {
8192
return await operation()
8293
} finally {
94+
if (databaseScheduleStartTurn === currentTurn) {
95+
databaseScheduleStartTurn = null
96+
}
8397
releaseTurn()
8498
}
8599
}
@@ -286,14 +300,10 @@ function staleScheduleExecutionJobsFilter(staleStartedBefore: Date) {
286300
)
287301
}
288302

289-
function getExhaustedRecoveryNextRunAt(payload: ScheduleExecutionPayload, now: Date): Date {
290-
return (
291-
getNextRunFromCronExpression(payload.cronExpression, payload.timezone) ??
292-
new Date(now.getTime() + 24 * 60 * 60 * 1000)
293-
)
294-
}
295-
296-
function getScheduleFailureNextRunAt(schedule: DatabaseScheduleExecutionTarget, now: Date): Date {
303+
function getScheduleNextRunAt(
304+
schedule: { cronExpression?: string | null; timezone?: string },
305+
now: Date
306+
): Date {
297307
return (
298308
getNextRunFromCronExpression(schedule.cronExpression, schedule.timezone) ??
299309
new Date(now.getTime() + 24 * 60 * 60 * 1000)
@@ -313,7 +323,7 @@ async function markClaimedScheduleFailed(
313323
updatedAt: now,
314324
lastQueuedAt: null,
315325
lastFailedAt: now,
316-
nextRunAt: getScheduleFailureNextRunAt(schedule, now),
326+
nextRunAt: getScheduleNextRunAt(schedule, now),
317327
failedCount: sql`COALESCE(${workflowSchedule.failedCount}, 0) + 1`,
318328
status: sql`CASE WHEN COALESCE(${workflowSchedule.failedCount}, 0) + 1 >= ${MAX_CONSECUTIVE_FAILURES} THEN 'disabled' ELSE 'active' END`,
319329
infraRetryCount: 0,
@@ -447,7 +457,7 @@ async function recoverStaleDatabaseScheduleJobs(now: Date): Promise<void> {
447457
updatedAt: now,
448458
lastQueuedAt: null,
449459
lastFailedAt: now,
450-
nextRunAt: getExhaustedRecoveryNextRunAt(payload, now),
460+
nextRunAt: getScheduleNextRunAt(payload, now),
451461
failedCount: sql`COALESCE(${workflowSchedule.failedCount}, 0) + 1`,
452462
status: sql`CASE WHEN COALESCE(${workflowSchedule.failedCount}, 0) + 1 >= ${MAX_CONSECUTIVE_FAILURES} THEN 'disabled' ELSE 'active' END`,
453463
infraRetryCount: 0,
@@ -488,8 +498,6 @@ function isStaleDatabaseScheduleJob(job: { status: string; startedAt?: Date }):
488498
}
489499

490500
async function getDatabaseScheduleExecutionSlots(): Promise<number> {
491-
await recoverStaleDatabaseScheduleJobs(new Date())
492-
493501
const [row] = await db
494502
.select({
495503
count: sql<number>`count(*)`,
@@ -501,8 +509,6 @@ async function getDatabaseScheduleExecutionSlots(): Promise<number> {
501509
return Math.max(0, SCHEDULE_EXECUTION_CONCURRENCY_LIMIT - processingCount)
502510
}
503511

504-
type DatabaseScheduleStartResult = 'started' | 'capacity_full' | 'not_pending'
505-
506512
async function tryStartDatabaseScheduleJob(jobId: string): Promise<DatabaseScheduleStartResult> {
507513
const now = new Date()
508514

@@ -1062,6 +1068,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
10621068
let databaseScheduleSlots = SCHEDULE_EXECUTION_CONCURRENCY_LIMIT
10631069

10641070
if (useDatabaseFallback) {
1071+
await recoverStaleDatabaseScheduleJobs(queuedAt)
10651072
databaseScheduleSlots = await getDatabaseScheduleExecutionSlots()
10661073
resumedPendingSchedules = await resumePendingDatabaseScheduleJobs(
10671074
jobQueue,

apps/sim/app/api/workspaces/[id]/environment/route.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ import {
1919
createWorkspaceEnvCredentials,
2020
deleteWorkspaceEnvCredentials,
2121
} from '@/lib/credentials/environment'
22-
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
22+
import {
23+
getPersonalAndWorkspaceEnv,
24+
invalidateEffectiveDecryptedEnvCache,
25+
} from '@/lib/environment/utils'
2326
import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/permissions/utils'
2427

2528
const logger = createLogger('WorkspaceEnvironmentAPI')
@@ -136,6 +139,7 @@ export const PUT = withRouteHandler(
136139
return { existingEncrypted: existing, merged: mergedVars }
137140
})
138141

142+
invalidateEffectiveDecryptedEnvCache({ workspaceId })
139143
const newKeys = Object.keys(variables).filter((k) => !(k in existingEncrypted))
140144
await createWorkspaceEnvCredentials({ workspaceId, newKeys, actingUserId: userId })
141145

@@ -224,6 +228,7 @@ export const DELETE = withRouteHandler(
224228
return NextResponse.json({ success: true })
225229
}
226230

231+
invalidateEffectiveDecryptedEnvCache({ workspaceId })
227232
await deleteWorkspaceEnvCredentials({ workspaceId, removedKeys: keys })
228233

229234
recordAudit({

apps/sim/lib/copilot/chat/payload.test.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,12 @@ vi.mock('@/tools/params', () => ({
5757
createUserToolSchema: mockCreateUserToolSchema,
5858
}))
5959

60-
import { buildIntegrationToolSchemas } from './payload'
60+
import { buildIntegrationToolSchemas, clearIntegrationToolSchemaCacheForTests } from './payload'
6161

6262
describe('buildIntegrationToolSchemas', () => {
6363
beforeEach(() => {
6464
vi.clearAllMocks()
65+
clearIntegrationToolSchemaCacheForTests()
6566
mockCreateUserToolSchema.mockReturnValue({ type: 'object', properties: {} })
6667
})
6768

@@ -122,4 +123,16 @@ describe('buildIntegrationToolSchemas', () => {
122123
{ surface: 'copilot' }
123124
)
124125
})
126+
127+
it('briefly reuses built schemas for the same user and surface', async () => {
128+
mockGetHighestPrioritySubscription.mockResolvedValue({ plan: 'pro', status: 'active' })
129+
130+
const first = await buildIntegrationToolSchemas('user-cache')
131+
first[0].input_schema.mutated = true
132+
const second = await buildIntegrationToolSchemas('user-cache')
133+
134+
expect(mockGetHighestPrioritySubscription).toHaveBeenCalledTimes(1)
135+
expect(mockCreateUserToolSchema).toHaveBeenCalledTimes(3)
136+
expect(second[0].input_schema).not.toHaveProperty('mutated')
137+
})
125138
})

apps/sim/lib/copilot/chat/payload.ts

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { createLogger } from '@sim/logger'
22
import { toError } from '@sim/utils/errors'
3+
import { LRUCache } from 'lru-cache'
34
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
45
import { isPaid } from '@/lib/billing/plan-helpers'
56
import { getToolEntry } from '@/lib/copilot/tool-executor/router'
@@ -11,6 +12,8 @@ import { tools } from '@/tools/registry'
1112
import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils'
1213

1314
const logger = createLogger('CopilotChatPayload')
15+
const INTEGRATION_TOOL_SCHEMA_CACHE_TTL_MS = 5_000
16+
const INTEGRATION_TOOL_SCHEMA_CACHE_MAX_ENTRIES = 500
1417

1518
interface BuildPayloadParams {
1619
message: string
@@ -48,6 +51,39 @@ interface BuildIntegrationToolSchemasOptions {
4851
schemaSurface?: 'default' | 'copilot'
4952
}
5053

54+
interface IntegrationToolSchemaCacheEntry {
55+
promise: Promise<ToolSchema[]>
56+
}
57+
58+
const integrationToolSchemaCache = new LRUCache<string, IntegrationToolSchemaCacheEntry>({
59+
max: INTEGRATION_TOOL_SCHEMA_CACHE_MAX_ENTRIES,
60+
ttl: INTEGRATION_TOOL_SCHEMA_CACHE_TTL_MS,
61+
})
62+
63+
function getIntegrationToolSchemaCacheKey(
64+
userId: string,
65+
workspaceId: string | undefined,
66+
schemaSurface: string
67+
): string {
68+
return JSON.stringify([userId, workspaceId ?? null, schemaSurface])
69+
}
70+
71+
function cloneToolSchemas(toolSchemas: ToolSchema[]): ToolSchema[] {
72+
return toolSchemas.map((tool) => {
73+
const cloned: ToolSchema = {
74+
...tool,
75+
input_schema: { ...tool.input_schema },
76+
}
77+
if (tool.params) cloned.params = { ...tool.params }
78+
if (tool.oauth) cloned.oauth = { ...tool.oauth }
79+
return cloned
80+
})
81+
}
82+
83+
export function clearIntegrationToolSchemaCacheForTests(): void {
84+
integrationToolSchemaCache.clear()
85+
}
86+
5187
/**
5288
* Build deferred integration tool schemas from the Sim tool registry.
5389
* Shared by the interactive chat payload builder and the non-interactive
@@ -62,6 +98,36 @@ export async function buildIntegrationToolSchemas(
6298
messageId?: string,
6399
options: BuildIntegrationToolSchemasOptions = { schemaSurface: 'copilot' },
64100
workspaceId?: string
101+
): Promise<ToolSchema[]> {
102+
const schemaSurface = options.schemaSurface ?? 'copilot'
103+
const cacheKey = getIntegrationToolSchemaCacheKey(userId, workspaceId, schemaSurface)
104+
const cached = integrationToolSchemaCache.get(cacheKey)
105+
if (cached) {
106+
return cloneToolSchemas(await cached.promise)
107+
}
108+
109+
const promise = buildIntegrationToolSchemasUncached(
110+
userId,
111+
messageId,
112+
{ schemaSurface },
113+
workspaceId
114+
).catch((error) => {
115+
integrationToolSchemaCache.delete(cacheKey)
116+
throw error
117+
})
118+
119+
integrationToolSchemaCache.set(cacheKey, {
120+
promise,
121+
})
122+
123+
return cloneToolSchemas(await promise)
124+
}
125+
126+
async function buildIntegrationToolSchemasUncached(
127+
userId: string,
128+
messageId: string | undefined,
129+
options: Required<BuildIntegrationToolSchemasOptions>,
130+
workspaceId?: string
65131
): Promise<ToolSchema[]> {
66132
const reqLogger = logger.withMetadata({ messageId })
67133
const integrationTools: ToolSchema[] = []
@@ -121,7 +187,7 @@ export async function buildIntegrationToolSchemas(
121187
}
122188
}
123189
const userSchema = createUserToolSchema(toolConfig, {
124-
surface: options.schemaSurface ?? 'copilot',
190+
surface: options.schemaSurface,
125191
})
126192
const catalogEntry = getToolEntry(strippedName)
127193
integrationTools.push({
@@ -165,7 +231,7 @@ export async function buildIntegrationToolSchemas(
165231
)
166232
}
167233

168-
return integrationTools.map((tool) => ({ ...tool, input_schema: { ...tool.input_schema } }))
234+
return integrationTools
169235
}
170236

171237
/**

apps/sim/lib/environment/utils.ts

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
44
import { getErrorMessage } from '@sim/utils/errors'
55
import { generateId } from '@sim/utils/id'
66
import { eq, inArray } from 'drizzle-orm'
7+
import { LRUCache } from 'lru-cache'
78
import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption'
89
import {
910
createWorkspaceEnvCredentials,
@@ -13,6 +14,45 @@ import {
1314
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
1415

1516
const logger = createLogger('EnvironmentUtils')
17+
const EFFECTIVE_DECRYPTED_ENV_CACHE_TTL_MS = 2_000
18+
const EFFECTIVE_DECRYPTED_ENV_CACHE_MAX_ENTRIES = 1_000
19+
20+
interface EffectiveDecryptedEnvCacheEntry {
21+
userId: string
22+
workspaceId?: string
23+
promise: Promise<Record<string, string>>
24+
}
25+
26+
const effectiveDecryptedEnvCache = new LRUCache<string, EffectiveDecryptedEnvCacheEntry>({
27+
max: EFFECTIVE_DECRYPTED_ENV_CACHE_MAX_ENTRIES,
28+
ttl: EFFECTIVE_DECRYPTED_ENV_CACHE_TTL_MS,
29+
})
30+
31+
function getEffectiveDecryptedEnvCacheKey(userId: string, workspaceId?: string): string {
32+
return JSON.stringify([userId, workspaceId ?? null])
33+
}
34+
35+
function cloneEnvVars(envVars: Record<string, string>): Record<string, string> {
36+
return { ...envVars }
37+
}
38+
39+
export function invalidateEffectiveDecryptedEnvCache(input: {
40+
userId?: string
41+
workspaceId?: string
42+
}): void {
43+
const { userId, workspaceId } = input
44+
if (!userId && !workspaceId) return
45+
46+
effectiveDecryptedEnvCache.forEach((entry, cacheKey) => {
47+
if (userId && entry.userId === userId) {
48+
effectiveDecryptedEnvCache.delete(cacheKey)
49+
return
50+
}
51+
if (workspaceId && entry.workspaceId === workspaceId) {
52+
effectiveDecryptedEnvCache.delete(cacheKey)
53+
}
54+
})
55+
}
1656

1757
/**
1858
* Get environment variable keys for a user
@@ -255,6 +295,7 @@ export async function upsertPersonalEnvVars(
255295
set: { variables: finalEncrypted, updatedAt: new Date() },
256296
})
257297

298+
invalidateEffectiveDecryptedEnvCache({ userId })
258299
await syncPersonalEnvCredentialsForUser({
259300
userId,
260301
envKeys: Object.keys(finalEncrypted),
@@ -304,6 +345,7 @@ export async function upsertWorkspaceEnvVars(
304345
set: { variables: merged, updatedAt: new Date() },
305346
})
306347

348+
invalidateEffectiveDecryptedEnvCache({ workspaceId })
307349
const newKeys = Object.keys(newVars).filter((k) => !(k in existingWsEncrypted))
308350
await createWorkspaceEnvCredentials({ workspaceId, newKeys, actingUserId })
309351

@@ -317,12 +359,27 @@ export async function getEffectiveDecryptedEnv(
317359
userId: string,
318360
workspaceId?: string
319361
): Promise<Record<string, string>> {
320-
const { personalDecrypted, workspaceDecrypted } = await getPersonalAndWorkspaceEnv(
321-
userId,
322-
workspaceId
323-
)
324-
return {
325-
...personalDecrypted,
326-
...workspaceDecrypted,
362+
const cacheKey = getEffectiveDecryptedEnvCacheKey(userId, workspaceId)
363+
const cached = effectiveDecryptedEnvCache.get(cacheKey)
364+
if (cached) {
365+
return cloneEnvVars(await cached.promise)
327366
}
367+
368+
const promise = getPersonalAndWorkspaceEnv(userId, workspaceId)
369+
.then(({ personalDecrypted, workspaceDecrypted }) => ({
370+
...personalDecrypted,
371+
...workspaceDecrypted,
372+
}))
373+
.catch((error) => {
374+
effectiveDecryptedEnvCache.delete(cacheKey)
375+
throw error
376+
})
377+
378+
effectiveDecryptedEnvCache.set(cacheKey, {
379+
userId,
380+
workspaceId,
381+
promise,
382+
})
383+
384+
return cloneEnvVars(await promise)
328385
}

apps/sim/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@
149149
"json5": "2.2.3",
150150
"jszip": "3.10.1",
151151
"jwt-decode": "^4.0.0",
152+
"lru-cache": "11.5.1",
152153
"lucide-react": "^0.479.0",
153154
"mammoth": "^1.9.0",
154155
"mermaid": "11.15.0",

0 commit comments

Comments
 (0)