Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 103 additions & 41 deletions packages/plugins/mcp/src/api/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ import { Context, Effect } from "effect";

import { addGroup, capture } from "@executor-js/api";
import type {
McpConnectionAccessFailure,
McpExtensionFailure,
McpPluginExtension,
McpProbeEndpointInput,
McpSourceConfig,
McpUpdateSourceInput,
} from "../sdk/plugin";
import type { SecretBackedValue } from "../sdk/types";
import { McpConnectionError } from "../sdk/errors";
import { McpStoredSourceSchema } from "../sdk/stored-source";
import { McpGroup } from "./group";

Expand All @@ -29,6 +32,53 @@ export class McpExtensionService extends Context.Service<McpExtensionService, Mc

const ExecutorApiWithMcp = addGroup(McpGroup);

const connectionAccessToMcpError = <A, R>(
effect: Effect.Effect<A, McpExtensionFailure, R>,
): Effect.Effect<
A,
Exclude<McpExtensionFailure, McpConnectionAccessFailure> | McpConnectionError,
R
> =>
effect.pipe(
Effect.catchTags({
ConnectionNotFoundError: (err) =>
Effect.fail(
new McpConnectionError({
transport: "remote",
message: `OAuth connection "${err.connectionId}" was not found`,
}),
),
ConnectionProviderNotRegisteredError: (err) =>
Effect.fail(
new McpConnectionError({
transport: "remote",
message: `OAuth provider "${err.provider}" is not registered`,
}),
),
ConnectionRefreshNotSupportedError: (err) =>
Effect.fail(
new McpConnectionError({
transport: "remote",
message: `OAuth provider "${err.provider}" does not support refresh`,
}),
),
ConnectionReauthRequiredError: (err) =>
Effect.fail(
new McpConnectionError({
transport: "remote",
message: `OAuth connection "${err.connectionId}" requires reauthentication`,
}),
),
ConnectionRefreshError: (err) =>
Effect.fail(
new McpConnectionError({
transport: "remote",
message: `Failed to refresh OAuth connection "${err.connectionId}"`,
}),
),
}),
);

// ---------------------------------------------------------------------------
// Convert API payload → McpSourceConfig
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -100,67 +150,79 @@ export const McpHandlers = HttpApiBuilder.group(ExecutorApiWithMcp, "mcp", (hand
handlers
.handle("probeEndpoint", ({ payload }) =>
capture(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
return yield* ext.probeEndpoint(payload as McpProbeEndpointInput);
}),
connectionAccessToMcpError(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
return yield* ext.probeEndpoint(payload as McpProbeEndpointInput);
}),
),
),
)
.handle("addSource", ({ params: path, payload }) =>
capture(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
return yield* ext.addSource(
toSourceConfig(payload as Parameters<typeof toSourceConfig>[0], path.scopeId),
);
}),
connectionAccessToMcpError(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
return yield* ext.addSource(
toSourceConfig(payload as Parameters<typeof toSourceConfig>[0], path.scopeId),
);
}),
),
),
)
.handle("removeSource", ({ params: path, payload }) =>
capture(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
yield* ext.removeSource(payload.namespace, path.scopeId);
return { removed: true };
}),
connectionAccessToMcpError(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
yield* ext.removeSource(payload.namespace, path.scopeId);
return { removed: true };
}),
),
),
)
.handle("refreshSource", ({ params: path, payload }) =>
capture(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
return yield* ext.refreshSource(payload.namespace, path.scopeId);
}),
connectionAccessToMcpError(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
return yield* ext.refreshSource(payload.namespace, path.scopeId);
}),
),
),
)
.handle("getSource", ({ params: path }) =>
capture(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
const source = yield* ext.getSource(path.namespace, path.scopeId);
return source
? new McpStoredSourceSchema({
namespace: source.namespace,
name: source.name,
config: source.config,
})
: null;
}),
connectionAccessToMcpError(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
const source = yield* ext.getSource(path.namespace, path.scopeId);
return source
? new McpStoredSourceSchema({
namespace: source.namespace,
name: source.name,
config: source.config,
})
: null;
}),
),
),
)
.handle("updateSource", ({ params: path, payload }) =>
capture(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
yield* ext.updateSource(path.namespace, path.scopeId, {
name: payload.name,
endpoint: payload.endpoint,
headers: payload.headers,
queryParams: payload.queryParams,
auth: payload.auth as McpUpdateSourceInput["auth"],
});
return { updated: true };
}),
connectionAccessToMcpError(
Effect.gen(function* () {
const ext = yield* McpExtensionService;
yield* ext.updateSource(path.namespace, path.scopeId, {
name: payload.name,
endpoint: payload.endpoint,
headers: payload.headers,
queryParams: payload.queryParams,
auth: payload.auth as McpUpdateSourceInput["auth"],
});
return { updated: true };
}),
),
),
),
);
35 changes: 18 additions & 17 deletions packages/plugins/mcp/src/react/EditMcpSource.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { useState } from "react";
import { useAtomValue, useAtomSet } from "@effect/atom-react";
import { Exit } from "effect";
import * as AsyncResult from "effect/unstable/reactivity/AsyncResult";
import { mcpSourceAtom, updateMcpSource } from "./atoms";
import { useScope } from "@executor-js/react/api/scope-context";
Expand Down Expand Up @@ -35,7 +36,7 @@ function RemoteEditForm(props: {
onSave: () => void;
}) {
const scopeId = useScope();
const doUpdate = useAtomSet(updateMcpSource, { mode: "promise" });
const doUpdate = useAtomSet(updateMcpSource, { mode: "promiseExit" });
const secretList = useSecretPickerSecrets();

const identity = useSourceIdentity({
Expand Down Expand Up @@ -64,24 +65,24 @@ function RemoteEditForm(props: {
setSaving(true);
setError(null);
const { headers, queryParams } = serializeHttpCredentials(credentials);
try {
await doUpdate({
params: { scopeId, namespace: props.sourceId },
payload: {
name: identity.name.trim() || undefined,
endpoint: endpoint.trim() || undefined,
headers,
queryParams,
},
reactivityKeys: sourceWriteKeys,
});
setDirty(false);
props.onSave();
} catch (e) {
setError(e instanceof Error ? e.message : "Failed to update source");
} finally {
const exit = await doUpdate({
params: { scopeId, namespace: props.sourceId },
payload: {
name: identity.name.trim() || undefined,
endpoint: endpoint.trim() || undefined,
headers,
queryParams,
},
reactivityKeys: sourceWriteKeys,
});
if (Exit.isFailure(exit)) {
setError("Failed to update source");
setSaving(false);
return;
}
setDirty(false);
props.onSave();
setSaving(false);
};

return (
Expand Down
24 changes: 10 additions & 14 deletions packages/plugins/mcp/src/sdk/discover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,38 @@ export const discoverTools = (
// Acquire connection
const connection = yield* connector.pipe(
Effect.mapError(
(err) =>
() =>
new McpToolDiscoveryError({
stage: "connect",
message: `Failed connecting to MCP server: ${err.message}`,
message: "Failed connecting to MCP server",
}),
),
);

// List tools
const listResult = yield* Effect.tryPromise({
try: () => connection.client.listTools(),
catch: (cause) =>
catch: () =>
new McpToolDiscoveryError({
stage: "list_tools",
message: `Failed listing MCP tools: ${
cause instanceof Error ? cause.message : String(cause)
}`,
message: "Failed listing MCP tools",
}),
});

if (!isListToolsResult(listResult)) {
yield* Effect.promise(() => connection.close().catch(() => {}));
return yield* Effect.fail(
new McpToolDiscoveryError({
stage: "list_tools",
message: "MCP listTools response did not match the expected schema",
}),
);
yield* Effect.ignore(Effect.tryPromise(() => connection.close()));
return yield* new McpToolDiscoveryError({
stage: "list_tools",
message: "MCP listTools response did not match the expected schema",
});
}

const manifest = extractManifestFromListToolsResult(listResult, {
serverInfo: connection.client.getServerVersion?.(),
});

// Close the connection after discovery
yield* Effect.promise(() => connection.close().catch(() => {}));
yield* Effect.ignore(Effect.tryPromise(() => connection.close()));

return manifest;
});
34 changes: 16 additions & 18 deletions packages/plugins/mcp/src/sdk/invoke.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
// 4. Retrying once on connection failure (invalidate + reconnect).
// ---------------------------------------------------------------------------

import { Cause, Effect, Exit, Schema, ScopedCache } from "effect";
import { Cause, Effect, Exit, Predicate, Schema, ScopedCache } from "effect";

import { ElicitRequestSchema } from "@modelcontextprotocol/sdk/types.js";

import {
FormElicitation,
UrlElicitation,
type ElicitationDeclinedError,
type Elicit,
type ElicitationRequest,
} from "@executor-js/sdk/core";
Expand Down Expand Up @@ -108,14 +109,13 @@ const installElicitationHandler = (
}
const failure = exit.cause.reasons.find(Cause.isFailReason);
if (failure) {
const err = failure.error as {
readonly _tag?: string;
readonly action?: "decline" | "cancel";
};
if (err._tag === "ElicitationDeclinedError") {
return { action: err.action ?? "decline" };
const err = failure.error;
if (Predicate.isTagged(err, "ElicitationDeclinedError")) {
const action = (err as ElicitationDeclinedError).action;
return { action };
}
}
// oxlint-disable-next-line executor/no-try-catch-or-throw -- boundary: MCP SDK request handler must throw JSON-RPC failures
throw Cause.squash(exit.cause);
},
);
Expand All @@ -135,12 +135,10 @@ const useConnection = (
installElicitationHandler(connection.client, elicit);
return yield* Effect.tryPromise({
try: () => connection.client.callTool({ name: toolName, arguments: args }),
catch: (cause) =>
catch: () =>
new McpInvocationError({
toolName,
message: `MCP tool call failed for ${toolName}: ${
cause instanceof Error ? cause.message : String(cause)
}`,
message: `MCP tool call failed for ${toolName}`,
}),
}).pipe(
Effect.withSpan("plugin.mcp.client.call_tool", {
Expand All @@ -153,7 +151,7 @@ const useConnection = (
// Public API
// ---------------------------------------------------------------------------

export interface InvokeMcpToolInput {
export interface InvokeMcpToolInput<E = McpConnectionError> {
readonly toolId: string;
readonly toolName: string;
readonly args: unknown;
Expand All @@ -162,22 +160,22 @@ export interface InvokeMcpToolInput {
* connection cache key so per-user OAuth/secret resolution doesn't
* collapse multiple users onto one shared connection. */
readonly invokerScope: string;
readonly resolveConnector: () => Effect.Effect<McpConnection, McpConnectionError>;
readonly resolveConnector: () => Effect.Effect<McpConnection, E>;
readonly connectionCache: ScopedCache.ScopedCache<
string,
McpConnection,
McpConnectionError
E
>;
readonly pendingConnectors: Map<
string,
Effect.Effect<McpConnection, McpConnectionError>
Effect.Effect<McpConnection, E>
>;
readonly elicit: Elicit;
}

export const invokeMcpTool = (
input: InvokeMcpToolInput,
): Effect.Effect<unknown, McpConnectionError | McpInvocationError> => {
export const invokeMcpTool = <E = McpConnectionError>(
input: InvokeMcpToolInput<E>,
): Effect.Effect<unknown, E | McpInvocationError> => {
const transport: string =
input.sourceData.transport === "stdio"
? "stdio"
Expand Down
Loading
Loading