diff --git a/test/cluster-spec-sheet/mzcompose.py b/test/cluster-spec-sheet/mzcompose.py index fe6a27c37ed4f..d38fbc51891de 100644 --- a/test/cluster-spec-sheet/mzcompose.py +++ b/test/cluster-spec-sheet/mzcompose.py @@ -2646,12 +2646,21 @@ def new_connection(self) -> psycopg.Connection: ... @abstractmethod def cleanup(self) -> None: ... @abstractmethod - def replica_size_for_scale(self, scale: int) -> str: + def replica_sizes_for_scale(self, scale: int) -> list[str]: """ - Returns the replica size for a given scale. + Returns all replica sizes to test at a given scale. The cluster spec + sheet runs the strong/weak scaling scenarios once per returned size so + that we can compare equivalent cc and M.1 cluster sizes side by side. """ ... + def replica_size_for_scale(self, scale: int) -> str: + """ + Returns the default (first) replica size for a given scale. Used for + setup-only clusters where a single size is sufficient. + """ + return self.replica_sizes_for_scale(scale)[0] + def max_scale(self) -> int | None: """ Returns the maximum scale for the target, or None if there is no limit. @@ -2742,11 +2751,22 @@ def new_connection(self) -> psycopg.Connection: def cleanup(self) -> None: disable_region(self.composition, hard=True) - def replica_size_for_scale(self, scale: int) -> str: - """ - Returns the replica size for a given scale. - """ - return f"{scale}00cc" + # M.1 size with the same worker count as the {scale}00cc size. + M1_REPLICA_SIZES: dict[int, str] = { + 1: "M.1-xsmall", + 2: "M.1-small", + 4: "M.1-large", + 8: "M.1-2xlarge", + 16: "M.1-4xlarge", + 32: "M.1-8xlarge", + } + + def replica_sizes_for_scale(self, scale: int) -> list[str]: + sizes = [f"{scale}00cc"] + m1_size = self.M1_REPLICA_SIZES.get(scale) + if m1_size is not None: + sizes.append(m1_size) + return sizes class DockerTarget(BenchTarget): @@ -2780,9 +2800,9 @@ def cleanup(self) -> None: print("Stopping local Materialize instance ...") self.composition.stop("materialized") - def replica_size_for_scale(self, scale: int) -> str: + def replica_sizes_for_scale(self, scale: int) -> list[str]: # 100cc == 2 workers - return f"scale=1,workers={2*scale}" + return [f"scale=1,workers={2*scale}"] def max_scale(self) -> int | None: return 16 @@ -2827,19 +2847,19 @@ def run_scenario_strong( for replica_scale in REPLICA_SCALES: if replica_scale > max_scale: break - replica_size = target.replica_size_for_scale(replica_scale) - print( - f"--- Running strong scenario {scenario.name()} with replica size {replica_size}" - ) - # Create a cluster with the specified size - runner.run_query("DROP CLUSTER IF EXISTS c CASCADE") - runner.run_query(f"CREATE CLUSTER c SIZE '{replica_size}'") - runner.run_query("SET cluster = 'c';") - runner.run_query("SELECT * FROM t;") + for replica_size in target.replica_sizes_for_scale(replica_scale): + print( + f"--- Running strong scenario {scenario.name()} with replica size {replica_size}" + ) + # Create a cluster with the specified size + runner.run_query("DROP CLUSTER IF EXISTS c CASCADE") + runner.run_query(f"CREATE CLUSTER c SIZE '{replica_size}'") + runner.run_query("SET cluster = 'c';") + runner.run_query("SELECT * FROM t;") - runner.replica_size = replica_size + runner.replica_size = replica_size - scenario.run(runner) + scenario.run(runner) def run_scenario_envd_strong_scaling( @@ -2960,39 +2980,39 @@ def run_scenario_weak( for replica_scale in REPLICA_SCALES: if replica_scale > max_scale: break - replica_size = target.replica_size_for_scale(replica_scale) - print( - f"--- Running weak scenario {scenario.name()} with replica size {replica_size}" - ) - scenario.replica_size = replica_size - scenario.scale = initial_scale * replica_scale - runner = ScenarioRunner( - scenario.name(), - scenario.VERSION, - scenario.scale, - "weak", - connection, - results_writer, - replica_size, - target=target, - ) - for query in scenario.drop(): - runner.run_query(query) + for replica_size in target.replica_sizes_for_scale(replica_scale): + print( + f"--- Running weak scenario {scenario.name()} with replica size {replica_size}" + ) + scenario.replica_size = replica_size + scenario.scale = initial_scale * replica_scale + runner = ScenarioRunner( + scenario.name(), + scenario.VERSION, + scenario.scale, + "weak", + connection, + results_writer, + replica_size, + target=target, + ) + for query in scenario.drop(): + runner.run_query(query) - for query in scenario.setup(): - runner.run_query(query) + for query in scenario.setup(): + runner.run_query(query) - for name in scenario.materialize_views(): - runner.run_query(f"SELECT COUNT(*) > 0 FROM {name};") + for name in scenario.materialize_views(): + runner.run_query(f"SELECT COUNT(*) > 0 FROM {name};") - # Create a cluster with the specified size - print(f"--- Loading complete; creating cluster with size {replica_size}") - runner.run_query("DROP CLUSTER IF EXISTS c CASCADE") - runner.run_query(f"CREATE CLUSTER c SIZE '{replica_size}'") - runner.run_query("SET cluster = 'c';") - runner.run_query("SELECT * FROM t;") + # Create a cluster with the specified size + print(f"--- Loading complete; creating cluster with size {replica_size}") + runner.run_query("DROP CLUSTER IF EXISTS c CASCADE") + runner.run_query(f"CREATE CLUSTER c SIZE '{replica_size}'") + runner.run_query("SET cluster = 'c';") + runner.run_query("SELECT * FROM t;") - scenario.run(runner) + scenario.run(runner) def workflow_plot_cluster( @@ -3062,7 +3082,28 @@ def workflow_plot(composition: Composition, parser: WorkflowArgumentParser) -> N def analyze_cluster_results_file(file: str) -> None: print(f"--- Analyzing cluster results file {file} ...") + # M.1 sizes are named tiers, so map each one to (credits/hour, workers). + m1_sizes: dict[str, tuple[float, int]] = { + "M.1-nano": (0.75, 1), + "M.1-micro": (1.5, 1), + "M.1-xsmall": (3, 2), + "M.1-small": (6, 4), + "M.1-medium": (9, 6), + "M.1-large": (12, 8), + "M.1-1.5xlarge": (18, 12), + "M.1-2xlarge": (24, 16), + "M.1-3xlarge": (36, 24), + "M.1-4xlarge": (48, 31), + "M.1-8xlarge": (96, 62), + "M.1-16xlarge": (192, 62), + "M.1-32xlarge": (384, 62), + "M.1-64xlarge": (768, 62), + "M.1-128xlarge": (1536, 62), + } + def extract_cluster_size(s: str) -> float: + if s in m1_sizes: + return m1_sizes[s][0] match = re.search(r"(\d+)(?:(cc)|(C))", s) if match: if match.group(2): # 'cc' match @@ -3077,6 +3118,11 @@ def extract_cluster_size(s: str) -> float: return float(match.group(1)) * float(match.group(2)) / 2 raise ValueError(f"Invalid cluster size format: {s}") + def extract_workers(s: str, credits_per_h: float) -> float: + if s in m1_sizes: + return float(m1_sizes[s][1]) + return round(credits_per_h * 1.9375) + df = pd.read_csv(file) if df.empty: print(f"^^^ +++ File {file} is empty, skipping") @@ -3087,7 +3133,9 @@ def extract_cluster_size(s: str) -> float: # Cluster replica size as centi-credits/s df["ccredit_per_s"] = df["credits_per_h"] / 3600 * 100 # Number of timely workers - df["workers"] = round(df["credits_per_h"] * 1.9375) + df["workers"] = df.apply( + lambda r: extract_workers(r["cluster_size"], r["credits_per_h"]), axis=1 + ) # Throughput in MiB/s df["throughput_mb_per_s"] = df["size_bytes"] / df["time_ms"] * 1000 / 1024 / 1024 # Throughput in MiB/s/worker