Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"@hono/zod-validator": "^0.7.2",
"@microlabs/otel-cf-workers": "^1.0.0-rc.52",
"@modelcontextprotocol/sdk": "^1.20.1",
"@thoughtspot/rest-api-sdk": "^2.13.1",
"@thoughtspot/rest-api-sdk": "^2.22.0",
"agents": "^0.2.14",
"hono": "^4.10.3",
"rxjs": "^7.8.2",
Expand Down
16 changes: 16 additions & 0 deletions server.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,22 @@
{
"name": "getDataSourceSuggestions",
"description": "Get datasource suggestions for a given query"
},
{
"name": "createAgentConversation",
"description": "Create a conversation with a given data source"
},
{
"name": "sendAgentMessage",
"description": "Send a message in a conversation"
},
{
"name": "sendAgentMessageAsync",
"description": "Send a message in a conversation, and allow the response to be generated asynchronously"
},
{
"name": "getAgentMessageUpdates",
"description": "Retrieve the latest messages from an asynchronous conversation"
}
],
"resources": [
Expand Down
47 changes: 45 additions & 2 deletions src/cloudflare-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,24 @@ import { McpAgent } from "agents/mcp";
import { instrumentDO, type ResolveConfigFn } from '@microlabs/otel-cf-workers';
import type { BaseMCPServer, Context } from "./servers/mcp-server-base";
import type { Props } from "./utils";
import { StreamingConversationState } from "./servers/mcp-server";

export function instrumentedMCPServer<T extends BaseMCPServer>(MCPServer: new (ctx: Context) => T, config: ResolveConfigFn) {
export function instrumentedMCPServer<T extends BaseMCPServer>(MCPServer: new (
ctx: Context,
getConversationState: (
conversationId: string,
) => Promise<StreamingConversationState | undefined>,
updateConversationStateAndResetTtlTimeout: (
conversationId: string,
newState: StreamingConversationState,
) => Promise<void>,
) => T, config: ResolveConfigFn) {
Comment on lines +7 to +16
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The number of parameters in this function signature is getting quite large. Consider using an object to group related parameters for better readability and maintainability. This is especially important as more features are added to the MCP server.

For example, instead of passing getConversationState and updateConversationStateAndResetTtlTimeout as separate parameters, you could create a ConversationStateHandlers object that encapsulates these functions.

const Agent = class extends McpAgent<Env, any, Props> {
server = new MCPServer(this as Context);
server = new MCPServer(
this as Context,
this.getConversationState.bind(this),
this.updateConversationStateAndResetTtlTimeout.bind(this),
);

// Argument of type 'typeof ThoughtSpotMCPWrapper' is not assignable to parameter of type 'DOClass'.
// Cannot assign a 'protected' constructor type to a 'public' constructor type.
Expand All @@ -17,6 +31,7 @@ export function instrumentedMCPServer<T extends BaseMCPServer>(MCPServer: new (c

async init() {
await this.server.init();
this.ctx.storage
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This line this.ctx.storage does not appear to be doing anything. Is this intentional? If not, it should be removed to avoid confusion.

}

public static serve(path: string) {
Expand All @@ -36,6 +51,34 @@ export function instrumentedMCPServer<T extends BaseMCPServer>(MCPServer: new (c
}
return server;
}

private async getConversationState(conversationId: string) {
return await this.ctx.storage?.get<StreamingConversationState>(conversationId);
}

private async updateConversationStateAndResetTtlTimeout(
conversationId: string,
newState: StreamingConversationState,
) {
const oldState = await this.getConversationState(conversationId);
if (oldState?.ttlTimeoutId) {
await this.cancelSchedule(oldState.ttlTimeoutId);
}

const schedule = await this.schedule(30, 'clearConversationState' as any, {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do delete and update here? Instead of just appending new messages ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I followed, there is no delete and update here? Are you referring to the scheduled timer?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, is this the TTL ? We delete the conversation after 30 sec ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I used 30s in the POC to validate it was working but in reality it will be longer of course

conversationId,
});

await this.ctx.storage?.put(conversationId, {
...newState,
ttlTimeoutId: schedule.id,
});
}

private async clearConversationState(payload: { conversationId: string }) {
console.log('>>> clearing conversation state', payload.conversationId);
await this.ctx.storage?.delete(payload.conversationId);
}
}

return instrumentDO(Agent, config) as unknown as typeof Agent;
Expand Down
Loading