Skip to content

Commit a2972b3

Browse files
last_output count fixed, other nits addressed
1 parent 1af2375 commit a2972b3

File tree

7 files changed

+173
-76
lines changed

7 files changed

+173
-76
lines changed

apps/sim/executor/execution/block-executor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { createLogger, type Logger } from '@sim/logger'
22
import { redactApiKeys } from '@/lib/core/security/redaction'
3-
import { hydrateCacheReferences } from '@/lib/paginated-cache/paginate'
43
import { getBaseUrl } from '@/lib/core/utils/urls'
4+
import { hydrateCacheReferences } from '@/lib/paginated-cache/paginate'
55
import {
66
containsUserFileWithMetadata,
77
hydrateUserFilesWithBase64,

apps/sim/lib/paginated-cache/paginate.test.ts

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,7 @@ vi.mock('@/lib/paginated-cache/redis-cache', () => ({
2828
import { autoPaginate, hydrateCacheReferences } from '@/lib/paginated-cache/paginate'
2929
import type { ToolResponse } from '@/tools/types'
3030

31-
function makePageResponse(
32-
items: unknown[],
33-
hasMore: boolean,
34-
cursor: string | null
35-
): ToolResponse {
31+
function makePageResponse(items: unknown[], hasMore: boolean, cursor: string | null): ToolResponse {
3632
return {
3733
success: true,
3834
output: {
@@ -206,7 +202,73 @@ describe('autoPaginate', () => {
206202
})
207203

208204
const storedCacheId = mockStoreMetadata.mock.calls[0][0] as string
209-
expect(storedCacheId).toMatch(/^exec-42:zendesk_get_tickets:tickets:\d+$/)
205+
expect(storedCacheId).toMatch(
206+
/^exec-42:zendesk_get_tickets:tickets:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/
207+
)
208+
})
209+
210+
it('does not inject fields that the tool output does not have', async () => {
211+
const noMetadataConfig = {
212+
...paginationConfig,
213+
pageField: 'items',
214+
getItems: (output: Record<string, unknown>) => (output.items as unknown[]) ?? [],
215+
}
216+
const initialResult: ToolResponse = {
217+
success: true,
218+
output: {
219+
items: [{ id: 1 }],
220+
cursor: 'abc',
221+
},
222+
}
223+
224+
const result = await autoPaginate({
225+
initialResult,
226+
params: {},
227+
paginationConfig: noMetadataConfig,
228+
executeTool: mockExecuteTool,
229+
toolId: 'custom_tool',
230+
executionId: 'exec-1',
231+
})
232+
233+
const outputKeys = Object.keys(result.output)
234+
expect(outputKeys).toContain('items')
235+
expect(outputKeys).toContain('cursor')
236+
expect(outputKeys).not.toContain('metadata')
237+
expect(outputKeys).not.toContain('paging')
238+
})
239+
})
240+
241+
describe('cleanupPaginatedCache', () => {
242+
let mockScan: ReturnType<typeof vi.fn>
243+
let mockDel: ReturnType<typeof vi.fn>
244+
245+
beforeEach(() => {
246+
vi.clearAllMocks()
247+
mockScan = vi.fn().mockResolvedValue(['0', []])
248+
mockDel = vi.fn().mockResolvedValue(1)
249+
mockGetRedisClient.mockReturnValue({ scan: mockScan, del: mockDel })
250+
})
251+
252+
it('scans with prefix-based patterns and deletes matching keys', async () => {
253+
mockScan
254+
.mockResolvedValueOnce(['0', ['pagcache:page:exec-1:tool:field:uuid:0']])
255+
.mockResolvedValueOnce(['0', ['pagcache:meta:exec-1:tool:field:uuid']])
256+
257+
const { cleanupPaginatedCache } = await import('@/lib/paginated-cache/paginate')
258+
await cleanupPaginatedCache('exec-1')
259+
260+
expect(mockScan).toHaveBeenCalledWith('0', 'MATCH', 'pagcache:page:exec-1:*', 'COUNT', 100)
261+
expect(mockScan).toHaveBeenCalledWith('0', 'MATCH', 'pagcache:meta:exec-1:*', 'COUNT', 100)
262+
expect(mockDel).toHaveBeenCalledTimes(2)
263+
})
264+
265+
it('no-ops when Redis is unavailable', async () => {
266+
mockGetRedisClient.mockReturnValue(null)
267+
268+
const { cleanupPaginatedCache } = await import('@/lib/paginated-cache/paginate')
269+
await cleanupPaginatedCache('exec-1')
270+
271+
expect(mockScan).not.toHaveBeenCalled()
210272
})
211273
})
212274

apps/sim/lib/paginated-cache/paginate.ts

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1+
import crypto from 'node:crypto'
12
import { createLogger } from '@sim/logger'
23
import { getRedisClient } from '@/lib/core/config/redis'
4+
import type { PaginatedCacheStorageAdapter } from '@/lib/paginated-cache/adapter'
35
import { RedisPaginatedCache } from '@/lib/paginated-cache/redis-cache'
4-
import { isPaginatedCacheReference } from '@/lib/paginated-cache/types'
56
import type { PaginatedCacheReference, ToolPaginationConfig } from '@/lib/paginated-cache/types'
7+
import { isPaginatedCacheReference } from '@/lib/paginated-cache/types'
68
import type { ToolResponse } from '@/tools/types'
79

810
const logger = createLogger('Paginator')
911

10-
const DEFAULT_MAX_PAGES = 100
12+
const DEFAULT_MAX_PAGES = 10_000
1113

1214
interface AutoPaginateOptions {
1315
initialResult: ToolResponse
@@ -23,8 +25,14 @@ interface AutoPaginateOptions {
2325
}
2426

2527
export async function autoPaginate(options: AutoPaginateOptions): Promise<ToolResponse> {
26-
const { initialResult, params, paginationConfig: config, executeTool, toolId, executionId } =
27-
options
28+
const {
29+
initialResult,
30+
params,
31+
paginationConfig: config,
32+
executeTool,
33+
toolId,
34+
executionId,
35+
} = options
2836
const maxPages = config.maxPages ?? DEFAULT_MAX_PAGES
2937

3038
const redis = getRedisClient()
@@ -33,7 +41,7 @@ export async function autoPaginate(options: AutoPaginateOptions): Promise<ToolRe
3341
}
3442

3543
const cache = new RedisPaginatedCache(redis)
36-
const cacheId = `${executionId}:${toolId}:${config.pageField}:${Date.now()}`
44+
const cacheId = `${executionId}:${toolId}:${config.pageField}:${crypto.randomUUID()}`
3745

3846
let totalItems = 0
3947
let pageIndex = 0
@@ -97,7 +105,14 @@ export async function hydrateCacheReferences(
97105
if (!containsCacheReference(inputs)) {
98106
return inputs
99107
}
100-
return (await deepHydrate(inputs)) as Record<string, unknown>
108+
109+
const redis = getRedisClient()
110+
if (!redis) {
111+
throw new Error('Redis is required to hydrate paginated cache references but is not available')
112+
}
113+
114+
const adapter = new RedisPaginatedCache(redis)
115+
return (await deepHydrate(inputs, adapter)) as Record<string, unknown>
101116
}
102117

103118
function containsCacheReference(value: unknown): boolean {
@@ -109,37 +124,35 @@ function containsCacheReference(value: unknown): boolean {
109124
return false
110125
}
111126

112-
async function deepHydrate(value: unknown): Promise<unknown> {
127+
async function deepHydrate(
128+
value: unknown,
129+
adapter: PaginatedCacheStorageAdapter
130+
): Promise<unknown> {
113131
if (isPaginatedCacheReference(value)) {
114-
return hydrateReference(value)
132+
return hydrateReference(value, adapter)
115133
}
116134

117135
if (Array.isArray(value)) {
118-
return Promise.all(value.map(deepHydrate))
136+
return Promise.all(value.map((v) => deepHydrate(v, adapter)))
119137
}
120138

121139
if (typeof value === 'object' && value !== null) {
122140
const entries = Object.entries(value as Record<string, unknown>)
123141
const hydrated: Record<string, unknown> = {}
124142
for (const [key, val] of entries) {
125-
hydrated[key] = await deepHydrate(val)
143+
hydrated[key] = await deepHydrate(val, adapter)
126144
}
127145
return hydrated
128146
}
129147

130148
return value
131149
}
132150

133-
async function hydrateReference(ref: PaginatedCacheReference): Promise<unknown[]> {
134-
const redis = getRedisClient()
135-
if (!redis) {
136-
throw new Error(
137-
`Redis is required to hydrate paginated cache reference (cacheId: ${ref.cacheId}) but is not available`
138-
)
139-
}
140-
141-
const cache = new RedisPaginatedCache(redis)
142-
const pages = await cache.getAllPages(ref.cacheId, ref.totalPages)
151+
async function hydrateReference(
152+
ref: PaginatedCacheReference,
153+
adapter: PaginatedCacheStorageAdapter
154+
): Promise<unknown[]> {
155+
const pages = await adapter.getAllPages(ref.cacheId, ref.totalPages)
143156

144157
const items: unknown[] = []
145158
for (const page of pages) {
@@ -165,21 +178,23 @@ export async function cleanupPaginatedCache(executionId: string): Promise<void>
165178
return
166179
}
167180

168-
const pattern = `pagcache:*${executionId}:*`
181+
const patterns = [`pagcache:page:${executionId}:*`, `pagcache:meta:${executionId}:*`]
169182

170183
try {
171-
let cursor = '0'
172184
let deletedCount = 0
173185

174-
do {
175-
const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100)
176-
cursor = nextCursor
177-
178-
if (keys.length > 0) {
179-
await redis.del(...keys)
180-
deletedCount += keys.length
181-
}
182-
} while (cursor !== '0')
186+
for (const pattern of patterns) {
187+
let cursor = '0'
188+
do {
189+
const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100)
190+
cursor = nextCursor
191+
192+
if (keys.length > 0) {
193+
await redis.del(...keys)
194+
deletedCount += keys.length
195+
}
196+
} while (cursor !== '0')
197+
}
183198

184199
if (deletedCount > 0) {
185200
logger.info(`Cleaned up ${deletedCount} paginated cache entries for execution ${executionId}`)

apps/sim/lib/paginated-cache/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export interface ToolPaginationConfig<O = Record<string, unknown>> {
1313
currentParams: Record<string, unknown>,
1414
token: string | number
1515
) => Record<string, unknown>
16-
/** Maximum pages to fetch. Default: 100 */
16+
/** Maximum pages to fetch. Default: 10,000 */
1717
maxPages?: number
1818
}
1919

apps/sim/tools/index.ts

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { generateRequestId } from '@/lib/core/utils/request'
1313
import { getBaseUrl, getInternalApiBaseUrl } from '@/lib/core/utils/urls'
1414
import { SIM_VIA_HEADER, serializeCallChain } from '@/lib/execution/call-chain'
1515
import { parseMcpToolId } from '@/lib/mcp/utils'
16+
import { autoPaginate } from '@/lib/paginated-cache/paginate'
1617
import { isCustomTool, isMcpTool } from '@/executor/constants'
1718
import { resolveSkillContent } from '@/executor/handlers/agent/skills-resolver'
1819
import type { ExecutionContext } from '@/executor/types'
@@ -26,7 +27,6 @@ import type {
2627
ToolResponse,
2728
ToolRetryConfig,
2829
} from '@/tools/types'
29-
import { autoPaginate } from '@/lib/paginated-cache/paginate'
3030
import { formatRequestParams, getTool, validateRequiredParametersAfterMerge } from '@/tools/utils'
3131
import * as toolsUtilsServer from '@/tools/utils.server'
3232

@@ -600,6 +600,40 @@ async function processFileOutputs(
600600
}
601601
}
602602

603+
/**
604+
* If the tool has a pagination config and there are more pages, auto-paginate
605+
* and replace the page field with a Redis cache reference.
606+
*/
607+
async function maybeAutoPaginate(
608+
tool: ToolConfig,
609+
finalResult: ToolResponse,
610+
contextParams: Record<string, unknown>,
611+
normalizedToolId: string,
612+
skipPostProcess: boolean,
613+
executionContext?: ExecutionContext
614+
): Promise<ToolResponse> {
615+
if (
616+
!tool.pagination ||
617+
!finalResult.success ||
618+
skipPostProcess ||
619+
!executionContext?.executionId
620+
) {
621+
return finalResult
622+
}
623+
const nextToken = tool.pagination.getNextPageToken(finalResult.output)
624+
if (nextToken === null) {
625+
return finalResult
626+
}
627+
return autoPaginate({
628+
initialResult: finalResult,
629+
params: contextParams,
630+
paginationConfig: tool.pagination,
631+
executeTool,
632+
toolId: normalizedToolId,
633+
executionId: executionContext.executionId,
634+
})
635+
}
636+
603637
/**
604638
* Execute a tool by making the appropriate HTTP request
605639
* All requests go directly - internal routes use regular fetch, external use SSRF-protected fetch
@@ -820,20 +854,14 @@ export async function executeTool(
820854
// Process file outputs if execution context is available
821855
finalResult = await processFileOutputs(finalResult, tool, executionContext)
822856

823-
// Auto-paginate if tool has pagination config and there are more pages
824-
if (tool.pagination && finalResult.success && !skipPostProcess && executionContext?.executionId) {
825-
const nextToken = tool.pagination.getNextPageToken(finalResult.output)
826-
if (nextToken !== null) {
827-
finalResult = await autoPaginate({
828-
initialResult: finalResult,
829-
params: contextParams,
830-
paginationConfig: tool.pagination,
831-
executeTool,
832-
toolId: normalizedToolId,
833-
executionId: executionContext.executionId,
834-
})
835-
}
836-
}
857+
finalResult = await maybeAutoPaginate(
858+
tool,
859+
finalResult,
860+
contextParams,
861+
normalizedToolId,
862+
skipPostProcess,
863+
executionContext
864+
)
837865

838866
// Add timing data to the result
839867
const endTime = new Date()
@@ -890,20 +918,14 @@ export async function executeTool(
890918
// Process file outputs if execution context is available
891919
finalResult = await processFileOutputs(finalResult, tool, executionContext)
892920

893-
// Auto-paginate if tool has pagination config and there are more pages
894-
if (tool.pagination && finalResult.success && !skipPostProcess && executionContext?.executionId) {
895-
const nextToken = tool.pagination.getNextPageToken(finalResult.output)
896-
if (nextToken !== null) {
897-
finalResult = await autoPaginate({
898-
initialResult: finalResult,
899-
params: contextParams,
900-
paginationConfig: tool.pagination,
901-
executeTool,
902-
toolId: normalizedToolId,
903-
executionId: executionContext.executionId,
904-
})
905-
}
906-
}
921+
finalResult = await maybeAutoPaginate(
922+
tool,
923+
finalResult,
924+
contextParams,
925+
normalizedToolId,
926+
skipPostProcess,
927+
executionContext
928+
)
907929

908930
// Add timing data to the result
909931
const endTime = new Date()

apps/sim/tools/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ export interface ToolConfig<P = any, R = any> {
182182
* Optional pagination configuration for tools that return paginated data.
183183
* When provided, the executor automatically fetches all pages and caches them in Redis.
184184
*/
185-
pagination?: ToolPaginationConfig
185+
pagination?: ToolPaginationConfig<R extends { output: infer O } ? O : Record<string, unknown>>
186186
}
187187

188188
export interface TableRow {

apps/sim/tools/zendesk/get_tickets.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -185,18 +185,16 @@ export const zendeskGetTicketsTool: ToolConfig<ZendeskGetTicketsParams, ZendeskG
185185

186186
pagination: {
187187
pageField: 'tickets',
188-
getItems: (output: Record<string, unknown>) => (output.tickets as unknown[]) ?? [],
189-
getNextPageToken: (output: Record<string, unknown>) => {
190-
const paging = output.paging as Record<string, unknown> | undefined
191-
if (paging?.has_more && paging?.after_cursor) {
192-
return paging.after_cursor as string
188+
getItems: (output) => output.tickets ?? [],
189+
getNextPageToken: (output) => {
190+
if (output.paging?.has_more && output.paging?.after_cursor) {
191+
return output.paging.after_cursor
193192
}
194193
return null
195194
},
196-
buildNextPageParams: (params: Record<string, unknown>, token: string | number) => ({
195+
buildNextPageParams: (params, token) => ({
197196
...params,
198197
pageAfter: String(token),
199198
}),
200-
maxPages: 100,
201199
},
202200
}

0 commit comments

Comments
 (0)