-
Notifications
You must be signed in to change notification settings - Fork 5.3k
Expand file tree
/
Copy pathmcp_client.py
More file actions
95 lines (80 loc) · 3.07 KB
/
mcp_client.py
File metadata and controls
95 lines (80 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import sys
from contextlib import AsyncExitStack
from typing import Any, Awaitable, Callable, ClassVar, Self
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp_client import chat
from mcp_client.handlers import OpenAIQueryHandler
class MCPClient:
"""MCP client to interact with MCP server.
Usage:
async with MCPClient(server_path) as client:
# Call client methods here...
"""
client_session: ClassVar[ClientSession]
def __init__(self, server_path: str):
self.server_path = server_path
self.exit_stack = AsyncExitStack()
async def __aenter__(self) -> Self:
cls = type(self)
cls.client_session = await self._connect_to_server()
return self
async def __aexit__(self, *_) -> None:
await self.exit_stack.aclose()
async def _connect_to_server(self) -> ClientSession:
try:
read, write = await self.exit_stack.enter_async_context(
stdio_client(
server=StdioServerParameters(
command="sh",
args=[
"-c",
f"{sys.executable} {self.server_path} 2>/dev/null",
],
env=None,
)
)
)
client_session = await self.exit_stack.enter_async_context(
ClientSession(read, write)
)
await client_session.initialize()
return client_session
except Exception:
raise RuntimeError("Error: Failed to connect to server")
async def list_all_members(self) -> None:
"""List all available tools, prompts, and resources."""
print("MCP Server Members")
print("=" * 50)
sections = {
"tools": self.client_session.list_tools,
"prompts": self.client_session.list_prompts,
"resources": self.client_session.list_resources,
}
for section, listing_method in sections.items():
await self._list_section(section, listing_method)
print("\n" + "=" * 50)
async def _list_section(
self,
section: str,
list_method: Callable[[], Awaitable[Any]],
) -> None:
try:
items = getattr(await list_method(), section)
if items:
print(f"\n{section.upper()} ({len(items)}):")
print("-" * 30)
for item in items:
description = item.description or "No description"
print(f" > {item.name} - {description}")
else:
print(f"\n{section.upper()}: None available")
except Exception as e:
print(f"\n{section.upper()}: Error - {e}")
async def run_chat(self) -> None:
"""Start interactive chat with MCP server using OpenAI."""
try:
handler = OpenAIQueryHandler(self.client_session)
await chat.run_chat(handler)
except RuntimeError as e:
print(e)