Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 70 additions & 64 deletions javascript/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export class RetoolRPC {
private _environmentName: string
private _pollingIntervalMs: number
private _pollingTimeoutMs: number
private _startMultipleFunctions: boolean
private _version: string
private _agentUuid: string
private _versionHash: string | undefined
Expand All @@ -52,6 +53,7 @@ export class RetoolRPC {
? Math.max(config.pollingIntervalMs, MINIMUM_POLLING_INTERVAL_MS)
: DEFAULT_POLLING_INTERVAL_MS
this._pollingTimeoutMs = config.pollingTimeoutMs || DEFAULT_POLLING_TIMEOUT_MS
this._startMultipleFunctions = config.startMultipleFunctions || false
this._version = config.version || DEFAULT_VERSION
this._agentUuid = config.agentUuid || uuidv4()

Expand Down Expand Up @@ -190,77 +192,81 @@ export class RetoolRPC {
* Fetches a query from the Retool server and executes it.
*/
private async fetchQueryAndExecute(): Promise<AgentServerStatus> {
const pendingQueryFetch = await this._retoolApi.popQuery({
resourceId: this._resourceId,
environmentName: this._environmentName,
agentUuid: this._agentUuid,
versionHash: this._versionHash,
})
do {
const pendingQueryFetch = await this._retoolApi.popQuery({
resourceId: this._resourceId,
environmentName: this._environmentName,
agentUuid: this._agentUuid,
versionHash: this._versionHash,
})

if (!pendingQueryFetch.ok) {
if (isClientError(pendingQueryFetch.status)) {
this._logger.error(`Error fetching query (${pendingQueryFetch.status}): ${await pendingQueryFetch.text()}`)
return 'stop'
}
if (!pendingQueryFetch.ok) {
if (isClientError(pendingQueryFetch.status)) {
this._logger.error(`Error fetching query (${pendingQueryFetch.status}): ${await pendingQueryFetch.text()}`)
return 'stop'
}

throw new Error(`Server error when fetching query: ${pendingQueryFetch.status}. Retrying...`)
}
throw new Error(`Server error when fetching query: ${pendingQueryFetch.status}. Retrying...`)
}

const { query } = await pendingQueryFetch.json()
if (query) {
this._logger.debug('Executing query', query) // This might contain sensitive information
const { query } = await pendingQueryFetch.json()
if (query) {
this._logger.debug('Executing query', query) // This might contain sensitive information

const agentReceivedQueryAt = new Date().toISOString()
const agentReceivedQueryAt = new Date().toISOString()

const queryUuid: string = query.queryUuid
const { method, parameters, context } = query.queryInfo
const queryUuid: string = query.queryUuid
const { method, parameters, context } = query.queryInfo

let status: 'success' | 'error'
let executionResponse: unknown = undefined
let executionArguments: Record<string, unknown> | undefined = undefined
let agentError: AgentServerError | undefined = undefined
let status: 'success' | 'error'
let executionResponse: unknown = undefined
let executionArguments: Record<string, unknown> | undefined = undefined
let agentError: AgentServerError | undefined = undefined

this.executeFunction(method, parameters, context)
.then((executionResult) => {
executionResponse = executionResult.result
executionArguments = executionResult.arguments
status = 'success'
})
.catch((err) => {
agentError = createAgentServerError(err)
status = 'error'
})
.finally(() => {
this._retoolApi
.postQueryResponse({
resourceId: this._resourceId,
environmentName: this._environmentName,
versionHash: this._versionHash,
agentUuid: this._agentUuid,
queryUuid,
status,
data: executionResponse,
metadata: {
packageLanguage: 'javascript',
packageVersion: RetoolRPCVersion,
agentReceivedQueryAt,
agentFinishedQueryAt: new Date().toISOString(),
parameters: executionArguments,
},
error: agentError,
})
.then(async (updateQueryResponse) => {
this._logger.debug(
'Update query response status: ',
updateQueryResponse.status,
await updateQueryResponse.text(),
)
})
.catch((err) => {
this._logger.error(`Error updating query response: `, err)
})
})
}
this.executeFunction(method, parameters, context)
.then((executionResult) => {
executionResponse = executionResult.result
executionArguments = executionResult.arguments
status = 'success'
})
.catch((err) => {
agentError = createAgentServerError(err)
status = 'error'
})
.finally(() => {
this._retoolApi
.postQueryResponse({
resourceId: this._resourceId,
environmentName: this._environmentName,
versionHash: this._versionHash,
agentUuid: this._agentUuid,
queryUuid,
status,
data: executionResponse,
metadata: {
packageLanguage: 'javascript',
packageVersion: RetoolRPCVersion,
agentReceivedQueryAt,
agentFinishedQueryAt: new Date().toISOString(),
parameters: executionArguments,
},
error: agentError,
})
.then(async (updateQueryResponse) => {
this._logger.debug(
'Update query response status: ',
updateQueryResponse.status,
await updateQueryResponse.text(),
)
})
.catch((err) => {
this._logger.error(`Error updating query response: `, err)
})
})
} else {
break
}
} while (this._startMultipleFunctions)

return 'continue'
}
Expand Down
2 changes: 2 additions & 0 deletions javascript/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export type RetoolRPCConfig = {
pollingIntervalMs?: number
/** The optional polling timeout in milliseconds. Defaults to 5000. */
pollingTimeoutMs?: number
/** Whether or not multiple functions can be executed without first waiting for the polling interval to pass - boosts performance but can increase resource usage heavily. Use with caution. Defaults to false */
startMultipleFunctions?: boolean
/** The optional UUID of the agent. Will be automatically generated by default */
agentUuid?: string
/** The optional log level. */
Expand Down