-
Notifications
You must be signed in to change notification settings - Fork 116
Expand file tree
/
Copy pathlangchainv1_http.py
More file actions
94 lines (78 loc) · 2.82 KB
/
langchainv1_http.py
File metadata and controls
94 lines (78 loc) · 2.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import asyncio
import logging
import os
from datetime import datetime
import azure.identity
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import ChatOpenAI
from pydantic import SecretStr
from rich.logging import RichHandler
# Configure logging
logging.basicConfig(level=logging.WARNING, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()])
logger = logging.getLogger("langchainv1_http")
logger.setLevel(logging.INFO)
# Load environment variables
load_dotenv(override=True)
# Constants
MCP_SERVER_URL = os.getenv("MCP_SERVER_URL", "http://localhost:8000/mcp/")
# Configure language model based on API_HOST
API_HOST = os.getenv("API_HOST", "azure")
if API_HOST == "azure":
token_provider = azure.identity.get_bearer_token_provider(
azure.identity.DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default"
)
base_model = ChatOpenAI(
model=os.environ.get("AZURE_OPENAI_CHAT_DEPLOYMENT"),
base_url=os.environ["AZURE_OPENAI_ENDPOINT"] + "/openai/v1/",
api_key=token_provider,
use_responses_api=True,
)
elif API_HOST == "ollama":
base_model = ChatOpenAI(
model=os.environ.get("OLLAMA_MODEL", "gemma4:e2b"),
base_url=os.environ.get("OLLAMA_ENDPOINT", "http://localhost:11434/v1"),
api_key=SecretStr(os.getenv("OLLAMA_API_KEY", "no-key-needed")),
use_responses_api=True,
)
elif API_HOST == "openai":
base_model = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-5.2"),
api_key=SecretStr(os.environ["OPENAI_API_KEY"]),
use_responses_api=True,
)
else:
raise ValueError(f"Unsupported API_HOST '{API_HOST}'. Use one of: azure, ollama, openai.")
async def run_agent() -> None:
"""
Run the agent to process expense-related queries using MCP tools.
"""
# Initialize MCP client
client = MultiServerMCPClient(
{
"expenses": {
"url": MCP_SERVER_URL,
"transport": "streamable_http",
}
}
)
# Get tools and create agent
tools = await client.get_tools()
agent = create_agent(base_model, tools)
# Prepare query with context
today = datetime.now().strftime("%Y-%m-%d")
user_query = "yesterday I bought a laptop for $1200 using my visa."
# Invoke agent
response = await agent.ainvoke(
{"messages": [SystemMessage(content=f"Today's date is {today}."), HumanMessage(content=user_query)]}
)
# Display result
final_response = response["messages"][-1].text
print(final_response)
def main() -> None:
"""Main entry point for the application."""
asyncio.run(run_agent())
if __name__ == "__main__":
main()