-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
106 lines (79 loc) · 3.07 KB
/
main.py
File metadata and controls
106 lines (79 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import yaml
import numpy as np
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
def load_config():
with open("config.yml", "r") as config_file:
return yaml.safe_load(config_file)
def load_workloads():
with open("workloads.yml", "r") as workloads_file:
return yaml.safe_load(workloads_file)["workloads"]
def connect_to_milvus(host, port):
connections.connect(alias="default", host=host, port=port)
print(f"Connected to Milvus at {host}:{port}")
def create_collection(collection_name, dim):
# Define collection schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=dim)
]
schema = CollectionSchema(fields, description="Benchmark Collection")
# Create or get the collection
collection = Collection(name=collection_name, schema=schema)
print(f"Collection {collection_name} created with dimension {dim}.")
return collection
def insert_data(collection, dataset_size, dim):
print(f"Inserting {dataset_size} vectors into the collection...")
# Generate random vectors
vectors = np.random.random((dataset_size, dim)).tolist()
# Insert data
collection.insert([vectors])
collection.flush()
print(f"Inserted {dataset_size} vectors.")
def perform_search(collection, query_size, dim, search_params):
print(f"Performing search with {query_size} query vectors...")
# Generate random query vectors
query_vectors = np.random.random((query_size, dim)).tolist()
# Perform search
results = collection.search(
data=query_vectors,
anns_field="vector",
param=search_params,
limit=10,
expr=None # Optional filter expression
)
# Print search results
for i, result in enumerate(results):
print(f"Query {i}: {len(result)} results")
def run_workloads(config, workloads):
# Connect to Milvus
connect_to_milvus(config["milvus"]["host"], config["milvus"]["port"])
for workload in workloads:
print(f"Running workload: {workload['name']}")
# Create or load the collection
collection = create_collection(
collection_name=config["milvus"]["collection_name"],
dim=config["milvus"]["dim"]
)
# Insert data
insert_data(
collection=collection,
dataset_size=workload["dataset_size"],
dim=config["milvus"]["dim"]
)
# Load Collection
collection.load()
# Perform search
search_params = {"ef": workload["search_params"]["ef"]}
perform_search(
collection=collection,
query_size=workload["query_size"],
dim=config["milvus"]["dim"],
search_params=search_params
)
print(f"Completed workload: {workload['name']}")
if __name__ == "__main__":
# Load configurations and workloads
config = load_config()
workloads = load_workloads()
# Run the workloads
run_workloads(config, workloads)