Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 98 additions & 50 deletions test/cluster-spec-sheet/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -2646,12 +2646,21 @@ def new_connection(self) -> psycopg.Connection: ...
@abstractmethod
def cleanup(self) -> None: ...
@abstractmethod
def replica_size_for_scale(self, scale: int) -> str:
def replica_sizes_for_scale(self, scale: int) -> list[str]:
"""
Returns the replica size for a given scale.
Returns all replica sizes to test at a given scale. The cluster spec
sheet runs the strong/weak scaling scenarios once per returned size so
that we can compare equivalent cc and M.1 cluster sizes side by side.
"""
...

def replica_size_for_scale(self, scale: int) -> str:
"""
Returns the default (first) replica size for a given scale. Used for
setup-only clusters where a single size is sufficient.
"""
return self.replica_sizes_for_scale(scale)[0]

def max_scale(self) -> int | None:
"""
Returns the maximum scale for the target, or None if there is no limit.
Expand Down Expand Up @@ -2742,11 +2751,22 @@ def new_connection(self) -> psycopg.Connection:
def cleanup(self) -> None:
disable_region(self.composition, hard=True)

def replica_size_for_scale(self, scale: int) -> str:
"""
Returns the replica size for a given scale.
"""
return f"{scale}00cc"
# M.1 size with the same worker count as the {scale}00cc size.
M1_REPLICA_SIZES: dict[int, str] = {
1: "M.1-xsmall",
2: "M.1-small",
4: "M.1-large",
8: "M.1-2xlarge",
16: "M.1-4xlarge",
32: "M.1-8xlarge",
}

def replica_sizes_for_scale(self, scale: int) -> list[str]:
sizes = [f"{scale}00cc"]
m1_size = self.M1_REPLICA_SIZES.get(scale)
if m1_size is not None:
sizes.append(m1_size)
return sizes


class DockerTarget(BenchTarget):
Expand Down Expand Up @@ -2780,9 +2800,9 @@ def cleanup(self) -> None:
print("Stopping local Materialize instance ...")
self.composition.stop("materialized")

def replica_size_for_scale(self, scale: int) -> str:
def replica_sizes_for_scale(self, scale: int) -> list[str]:
# 100cc == 2 workers
return f"scale=1,workers={2*scale}"
return [f"scale=1,workers={2*scale}"]

def max_scale(self) -> int | None:
return 16
Expand Down Expand Up @@ -2827,19 +2847,19 @@ def run_scenario_strong(
for replica_scale in REPLICA_SCALES:
if replica_scale > max_scale:
break
replica_size = target.replica_size_for_scale(replica_scale)
print(
f"--- Running strong scenario {scenario.name()} with replica size {replica_size}"
)
# Create a cluster with the specified size
runner.run_query("DROP CLUSTER IF EXISTS c CASCADE")
runner.run_query(f"CREATE CLUSTER c SIZE '{replica_size}'")
runner.run_query("SET cluster = 'c';")
runner.run_query("SELECT * FROM t;")
for replica_size in target.replica_sizes_for_scale(replica_scale):
print(
f"--- Running strong scenario {scenario.name()} with replica size {replica_size}"
)
# Create a cluster with the specified size
runner.run_query("DROP CLUSTER IF EXISTS c CASCADE")
runner.run_query(f"CREATE CLUSTER c SIZE '{replica_size}'")
runner.run_query("SET cluster = 'c';")
runner.run_query("SELECT * FROM t;")

runner.replica_size = replica_size
runner.replica_size = replica_size

scenario.run(runner)
scenario.run(runner)


def run_scenario_envd_strong_scaling(
Expand Down Expand Up @@ -2960,39 +2980,39 @@ def run_scenario_weak(
for replica_scale in REPLICA_SCALES:
if replica_scale > max_scale:
break
replica_size = target.replica_size_for_scale(replica_scale)
print(
f"--- Running weak scenario {scenario.name()} with replica size {replica_size}"
)
scenario.replica_size = replica_size
scenario.scale = initial_scale * replica_scale
runner = ScenarioRunner(
scenario.name(),
scenario.VERSION,
scenario.scale,
"weak",
connection,
results_writer,
replica_size,
target=target,
)
for query in scenario.drop():
runner.run_query(query)
for replica_size in target.replica_sizes_for_scale(replica_scale):
print(
f"--- Running weak scenario {scenario.name()} with replica size {replica_size}"
)
scenario.replica_size = replica_size
scenario.scale = initial_scale * replica_scale
runner = ScenarioRunner(
scenario.name(),
scenario.VERSION,
scenario.scale,
"weak",
connection,
results_writer,
replica_size,
target=target,
)
for query in scenario.drop():
runner.run_query(query)

for query in scenario.setup():
runner.run_query(query)
for query in scenario.setup():
runner.run_query(query)

for name in scenario.materialize_views():
runner.run_query(f"SELECT COUNT(*) > 0 FROM {name};")
for name in scenario.materialize_views():
runner.run_query(f"SELECT COUNT(*) > 0 FROM {name};")

# Create a cluster with the specified size
print(f"--- Loading complete; creating cluster with size {replica_size}")
runner.run_query("DROP CLUSTER IF EXISTS c CASCADE")
runner.run_query(f"CREATE CLUSTER c SIZE '{replica_size}'")
runner.run_query("SET cluster = 'c';")
runner.run_query("SELECT * FROM t;")
# Create a cluster with the specified size
print(f"--- Loading complete; creating cluster with size {replica_size}")
runner.run_query("DROP CLUSTER IF EXISTS c CASCADE")
runner.run_query(f"CREATE CLUSTER c SIZE '{replica_size}'")
runner.run_query("SET cluster = 'c';")
runner.run_query("SELECT * FROM t;")

scenario.run(runner)
scenario.run(runner)


def workflow_plot_cluster(
Expand Down Expand Up @@ -3062,7 +3082,28 @@ def workflow_plot(composition: Composition, parser: WorkflowArgumentParser) -> N
def analyze_cluster_results_file(file: str) -> None:
print(f"--- Analyzing cluster results file {file} ...")

# M.1 sizes are named tiers, so map each one to (credits/hour, workers).
m1_sizes: dict[str, tuple[float, int]] = {
"M.1-nano": (0.75, 1),
"M.1-micro": (1.5, 1),
"M.1-xsmall": (3, 2),
"M.1-small": (6, 4),
"M.1-medium": (9, 6),
"M.1-large": (12, 8),
"M.1-1.5xlarge": (18, 12),
"M.1-2xlarge": (24, 16),
"M.1-3xlarge": (36, 24),
"M.1-4xlarge": (48, 31),
"M.1-8xlarge": (96, 62),
"M.1-16xlarge": (192, 62),
"M.1-32xlarge": (384, 62),
"M.1-64xlarge": (768, 62),
"M.1-128xlarge": (1536, 62),
}

def extract_cluster_size(s: str) -> float:
if s in m1_sizes:
return m1_sizes[s][0]
match = re.search(r"(\d+)(?:(cc)|(C))", s)
if match:
if match.group(2): # 'cc' match
Expand All @@ -3077,6 +3118,11 @@ def extract_cluster_size(s: str) -> float:
return float(match.group(1)) * float(match.group(2)) / 2
raise ValueError(f"Invalid cluster size format: {s}")

def extract_workers(s: str, credits_per_h: float) -> float:
if s in m1_sizes:
return float(m1_sizes[s][1])
return round(credits_per_h * 1.9375)

df = pd.read_csv(file)
if df.empty:
print(f"^^^ +++ File {file} is empty, skipping")
Expand All @@ -3087,7 +3133,9 @@ def extract_cluster_size(s: str) -> float:
# Cluster replica size as centi-credits/s
df["ccredit_per_s"] = df["credits_per_h"] / 3600 * 100
# Number of timely workers
df["workers"] = round(df["credits_per_h"] * 1.9375)
df["workers"] = df.apply(
lambda r: extract_workers(r["cluster_size"], r["credits_per_h"]), axis=1
)
# Throughput in MiB/s
df["throughput_mb_per_s"] = df["size_bytes"] / df["time_ms"] * 1000 / 1024 / 1024
# Throughput in MiB/s/worker
Expand Down
Loading