Conversation
- Use latest TS API SDK, update code to use latest bindings - Add code to support local dev - Add code to use KV storage from Durable Object inside MCP Server
- Service layer and tool definitions
- Accumulate and retrieve latest messages asynchronously
- Store and retrieve streaming message data using SQLite storage - Add TTL for storage of 30 sec, automatically reset when data is touched
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces foundational support for Spotter 3 by upgrading the ThoughtSpot REST API SDK and integrating new functionalities for agent-based conversations. It enables the creation, management, and interaction with agent conversations, including handling streaming responses and maintaining conversation state over time. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for Spotter 3 by updating dependencies, adding new API endpoints for agent conversations, and modifying the MCP server to handle these new functionalities. The changes include updating package dependencies, adding new schemas and tool definitions for agent conversations, and modifying the instrumentedMCPServer function to include conversation state management. The review focuses on identifying potential issues related to correctness and maintainability, specifically concerning the new agent conversation features and the integration of conversation state management.
| console.log('>>> streaming response update with # lines', lines.length); | ||
|
|
||
| for (const line of lines) { | ||
| if (!line.startsWith("data: ")) continue; |
There was a problem hiding this comment.
It's crucial to validate the line before attempting to parse it as JSON. If the line does not start with "data: ", it might contain invalid data or an error message. Attempting to parse such a line could lead to unexpected errors and potentially expose the system to security vulnerabilities. Add a check to ensure the line starts with "data: " before proceeding with the JSON parsing.
| async callSendAgentMessageAsync(request: z.infer<typeof CallToolRequestSchema>) { | ||
| const { conversationId, messages } = SendAgentMessageSchema.parse(request.params.arguments); | ||
| const response = await this.getThoughtSpotService().sendAgentMessageStreaming(conversationId, { messages }); | ||
|
|
||
| const reader = response.body?.getReader(); | ||
| if (!reader) { | ||
| throw new Error("Failed to get reader from response body"); | ||
| } | ||
|
|
||
| const updateConversationState = async (isDone: boolean, latestMessages?: AgentMessage[]) => { | ||
| let conversationState = await this.getConversationState(conversationId); | ||
| if (!conversationState) { | ||
| conversationState = { | ||
| latestMessages: [], | ||
| isDone: false, | ||
| } | ||
| } | ||
|
|
||
| await this.updateConversationStateAndResetTtlTimeout(conversationId, { | ||
| ...conversationState, | ||
| isDone, | ||
| ...(latestMessages ? { latestMessages: [...conversationState.latestMessages, ...latestMessages as AgentMessage[]] } : {}), | ||
| }); | ||
| } | ||
|
|
||
| setTimeout(async () => { | ||
| const decoder = new TextDecoder(); | ||
| let buffer = ""; | ||
|
|
||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) { | ||
| console.log('>>> streaming response done'); | ||
| await updateConversationState(true); | ||
| break; | ||
| } | ||
|
|
||
| buffer += decoder.decode(value, { stream: true }); | ||
| const lines = buffer.split("\n"); | ||
| buffer = lines.pop() || ""; | ||
| console.log('>>> streaming response update with # lines', lines.length); | ||
|
|
||
| for (const line of lines) { | ||
| if (!line.startsWith("data: ")) continue; | ||
|
|
||
| try { | ||
| const data = JSON.parse(line.slice(6)); | ||
| for (const item of data) { | ||
| if (item.type === 'text-chunk') { | ||
| await updateConversationState(false, [{ | ||
| type: 'text-chunk', | ||
| text: item.content, | ||
| }]); | ||
| } else if (item.type === 'answer') { | ||
| await updateConversationState(false, [{ | ||
| type: 'answer', | ||
| answerTitle: item.metadata.title, | ||
| answerQuery: item.metadata.sage_query, | ||
| }]); | ||
| } else { | ||
| console.log('>>> unknown item in event stream', item); | ||
| } | ||
| }; | ||
| } catch(error) { | ||
| console.log('>>> error parsing line', line, error); | ||
| } | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
The callSendAgentMessageAsync function uses setTimeout to process the streaming response. This approach might not be reliable in all environments, especially if the worker is terminated before the timeout completes. Consider using a more robust mechanism for handling asynchronous tasks, such as a queue or a dedicated background process. Also, there is no error handling if the reader.read() fails.
| const reader = response.body?.getReader(); | ||
| if (!reader) { | ||
| throw new Error("Failed to get reader from response body"); |
There was a problem hiding this comment.
Failing to get a reader from the response body is a critical error that should be handled more gracefully. Instead of throwing an error, consider returning an error response to the client with a descriptive message. This will prevent the entire worker from crashing and provide more informative feedback to the user.
| export function instrumentedMCPServer<T extends BaseMCPServer>(MCPServer: new ( | ||
| ctx: Context, | ||
| getConversationState: ( | ||
| conversationId: string, | ||
| ) => Promise<StreamingConversationState | undefined>, | ||
| updateConversationStateAndResetTtlTimeout: ( | ||
| conversationId: string, | ||
| newState: StreamingConversationState, | ||
| ) => Promise<void>, | ||
| ) => T, config: ResolveConfigFn) { |
There was a problem hiding this comment.
The number of parameters in this function signature is getting quite large. Consider using an object to group related parameters for better readability and maintainability. This is especially important as more features are added to the MCP server.
For example, instead of passing getConversationState and updateConversationStateAndResetTtlTimeout as separate parameters, you could create a ConversationStateHandlers object that encapsulates these functions.
|
|
||
| async init() { | ||
| await this.server.init(); | ||
| this.ctx.storage |
| async callCreateAgentConversation(request: z.infer<typeof CallToolRequestSchema>) { | ||
| const { metadata_context, conversation_settings } = CreateAgentConversationSchema.parse(request.params.arguments); | ||
| const conversation = await this.getThoughtSpotService().createAgentConversation({ | ||
| metadata_context, | ||
| conversation_settings, | ||
| }); | ||
|
|
||
| return this.createStructuredContentSuccessResponse( | ||
| { conversation_id: conversation.conversation_id }, | ||
| "Agent conversation created successfully" | ||
| ); |
| // Wait up to 5 seconds for the conversation state to be available | ||
| let conversationState: StreamingConversationState | undefined; | ||
| for (let i = 0; i < 5; i++) { | ||
| conversationState = await this.getConversationState(conversationId); | ||
| if (conversationState?.latestMessages?.length ?? 0 > 0) break; | ||
| await new Promise(resolve => setTimeout(resolve, 1000)); | ||
| } |
There was a problem hiding this comment.
Waiting for a fixed duration (5 seconds) for the conversation state might not be sufficient in all cases. Consider using a more adaptive approach, such as a retry mechanism with exponential backoff, to handle cases where the conversation state is not immediately available. This will make the system more resilient to temporary delays or network issues.
| if (!conversationState) { | ||
| console.log('>>> no latest status for conversationId, assuming done and evicted', conversationId); |
There was a problem hiding this comment.
Logging "no latest status for conversationId, assuming done and evicted" suggests a potential issue with conversation state management. Investigate why the conversation state is being evicted prematurely and consider implementing a mechanism to prevent this from happening, such as increasing the TTL or using a more persistent storage solution.
| }); | ||
| } | ||
|
|
||
| setTimeout(async () => { |
There was a problem hiding this comment.
Why do you need to wrap this in a setTimeout ?
There was a problem hiding this comment.
This was my simple way to defer event processing to async and return the original request sync. We can go with something more idiomatic in the final version.
| break; | ||
| } | ||
|
|
||
| buffer += decoder.decode(value, { stream: true }); |
There was a problem hiding this comment.
Seems this stream parsing logic can be moved down to service layer
| console.log('>>> no latest status for conversationId, assuming done and evicted', conversationId); | ||
| } | ||
|
|
||
| await this.updateConversationStateAndResetTtlTimeout(conversationId, { |
There was a problem hiding this comment.
Why do we need to update here ?
There was a problem hiding this comment.
There is a chance you override updates coming from the backend in another thread. Since you are updating them here.
There was a problem hiding this comment.
Yes, I wanted to rethink the message storage contract for the prod version
| await this.cancelSchedule(oldState.ttlTimeoutId); | ||
| } | ||
|
|
||
| const schedule = await this.schedule(30, 'clearConversationState' as any, { |
There was a problem hiding this comment.
Why do we do delete and update here? Instead of just appending new messages ?
There was a problem hiding this comment.
Not sure I followed, there is no delete and update here? Are you referring to the scheduled timer?
There was a problem hiding this comment.
Yes, is this the TTL ? We delete the conversation after 30 sec ?
There was a problem hiding this comment.
Yes I used 30s in the POC to validate it was working but in reality it will be longer of course
|
|
||
| // Wait up to 5 seconds for the conversation state to be available | ||
| let conversationState: StreamingConversationState | undefined; | ||
| for (let i = 0; i < 5; i++) { |
There was a problem hiding this comment.
How does this breaking loop help? Should we also take as argument, last message recieved id? That way we can check if there are more messages recieved after the last message and wait for more messages before responding.
There was a problem hiding this comment.
Yes the approach here is simplified just for the POC, but the overall concept here is I wanted there to be a max timeout for getUpdates, so that it can't get stuck waiting indefinitely for updates in case Spotter BE crashed or something
- Ability to display charts in an embedded iframe
Not intended to be merged, just sharing for reference purposes