-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquickstart.py
More file actions
51 lines (39 loc) · 1.61 KB
/
quickstart.py
File metadata and controls
51 lines (39 loc) · 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
"""Quickstart example for eve-api."""
import asyncio
from eve_api import EVEClient
async def main():
async with EVEClient() as eve:
# Login
await eve.login("user@example.com", "password")
# Fetch current user
me = await eve.get("/users/me")
print(f"Logged in as: {me['email']}")
# List public collections
collections = await eve.get("/collections/public")
for col in collections["data"]:
print(f" - {col['name']}")
# Create and delete a conversation
conv = await eve.post("/conversations", json={"name": "Quick Test"})
print(f"Created conversation: {conv['id']}")
await eve.delete(f"/conversations/{conv['id']}")
print("Deleted conversation")
# Stream a response (requires an existing conversation)
conv = await eve.post("/conversations", json={"name": "Stream Test"})
async for event in eve.stream(
f"/conversations/{conv['id']}/stream_messages",
json={
"query": "What is Earth Observation?",
"public_collections": ["eve-public"],
"k": 3,
},
):
if event["type"] == "token":
print(event["content"], end="", flush=True)
elif event["type"] == "final":
print() # newline after streaming
await eve.delete(f"/conversations/{conv['id']}")
# Unauthenticated request
health = await eve.request("GET", "/health", auth_required=False)
print(f"API health: {health.json()}")
if __name__ == "__main__":
asyncio.run(main())