Skip to content
Open
8 changes: 8 additions & 0 deletions src/cloudai/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ def register_all():
NixlPerftestTestDefinition,
)
from cloudai.workloads.osu_bench import (
OSUBenchComparisonReport,
OSUBenchReportGenerationStrategy,
OSUBenchSlurmCommandGenStrategy,
OSUBenchTestDefinition,
)
Expand Down Expand Up @@ -280,6 +282,7 @@ def register_all():
Registry().add_report(AIDynamoTestDefinition, AIDynamoReportGenerationStrategy)
Registry().add_report(AiconfiguratorTestDefinition, AiconfiguratorReportGenerationStrategy)
Registry().add_report(NixlPerftestTestDefinition, NIXLKVBenchDummyReport)
Registry().add_report(OSUBenchTestDefinition, OSUBenchReportGenerationStrategy)
Registry().add_report(VllmTestDefinition, VLLMBenchReportGenerationStrategy)

Registry().add_scenario_report("per_test", PerTestReporter, ReportConfig(enable=True))
Expand All @@ -293,6 +296,11 @@ def register_all():
Registry().add_scenario_report(
"nccl_comparison", NcclComparisonReport, ComparisonReportConfig(enable=True, group_by=["subtest_name"])
)
Registry().add_scenario_report(
"osu_bench_comparison",
OSUBenchComparisonReport,
ComparisonReportConfig(enable=True, group_by=["benchmark"]),
)

Registry().add_reward_function("inverse", inverse_reward)
Registry().add_reward_function("negative", negative_reward)
Expand Down
6 changes: 5 additions & 1 deletion src/cloudai/workloads/osu_bench/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -15,10 +15,14 @@
# limitations under the License.

from .osu_bench import OSUBenchCmdArgs, OSUBenchTestDefinition
from .osu_comparison_report import OSUBenchComparisonReport
from .report_generation_strategy import OSUBenchReportGenerationStrategy
from .slurm_command_gen_strategy import OSUBenchSlurmCommandGenStrategy

__all__ = [
"OSUBenchCmdArgs",
"OSUBenchComparisonReport",
"OSUBenchReportGenerationStrategy",
"OSUBenchSlurmCommandGenStrategy",
"OSUBenchTestDefinition",
]
6 changes: 5 additions & 1 deletion src/cloudai/workloads/osu_bench/osu_bench.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -105,6 +105,10 @@ def was_run_successful(self, tr: TestRun) -> JobStatusResult:
),
)

# Special case for hello and init benchmarks that produce only a summary output.
if self.cmd_args.benchmark in ("osu_hello", "osu_init"):
return JobStatusResult(is_successful=True)

# Check for basic OSU benchmark output format
if "# Size" not in content:
return JobStatusResult(
Expand Down
148 changes: 148 additions & 0 deletions src/cloudai/workloads/osu_bench/osu_comparison_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import logging
from pathlib import Path
from typing import TYPE_CHECKING

from rich.table import Table

from cloudai.core import System, TestRun, TestScenario
from cloudai.report_generator.comparison_report import ComparisonReport, ComparisonReportConfig
from cloudai.report_generator.groups import GroupedTestRuns
from cloudai.util.lazy_imports import lazy

from .osu_bench import OSUBenchTestDefinition

if TYPE_CHECKING:
import bokeh.plotting as bk
import pandas as pd


class OSUBenchComparisonReport(ComparisonReport):
"""Comparison report for OSU Bench."""

INFO_COLUMNS = ("size",)

def __init__(
self, system: System, test_scenario: TestScenario, results_root: Path, config: ComparisonReportConfig
) -> None:
super().__init__(system, test_scenario, results_root, config)
self.report_file_name = "osu_bench_comparison.html"

def load_test_runs(self):
super().load_test_runs()
self.trs = [tr for tr in self.trs if isinstance(tr.test, OSUBenchTestDefinition)]

def extract_data_as_df(self, tr: TestRun) -> pd.DataFrame:
csv_path = tr.output_path / "osu_bench.csv"
if not csv_path.exists():
return lazy.pd.DataFrame()

df = lazy.pd.read_csv(csv_path)

if "size" not in df.columns:
logging.warning("%s: missing 'size' column, skipping", csv_path)
return lazy.pd.DataFrame()

df["size"] = df["size"].astype(int)
return df

@staticmethod
def _has_metric(dfs: list["pd.DataFrame"], col: str) -> bool:
"""Only include a metric if all compared DataFrames have it."""
return bool(dfs) and all((col in df.columns) and df[col].notna().any() for df in dfs)

def create_tables(self, cmp_groups: list[GroupedTestRuns]) -> list[Table]:
tables: list[Table] = []
for group in cmp_groups:
dfs = [self.extract_data_as_df(item.tr) for item in group.items]

if self._has_metric(dfs, "avg_lat"):
tables.append(
self.create_table(
group,
dfs=dfs,
title="Latency",
info_columns=list(self.INFO_COLUMNS),
data_columns=["avg_lat"],
)
)
if self._has_metric(dfs, "mb_sec"):
tables.append(
self.create_table(
group,
dfs=dfs,
title="Bandwidth",
info_columns=list(self.INFO_COLUMNS),
data_columns=["mb_sec"],
)
)
if self._has_metric(dfs, "messages_sec"):
tables.append(
self.create_table(
group,
dfs=dfs,
title="Message Rate",
info_columns=list(self.INFO_COLUMNS),
data_columns=["messages_sec"],
)
)

return tables

def create_charts(self, cmp_groups: list[GroupedTestRuns]) -> list[bk.figure]:
charts: list[bk.figure] = []
for group in cmp_groups:
dfs = [self.extract_data_as_df(item.tr) for item in group.items]

if self._has_metric(dfs, "avg_lat"):
charts.append(
self.create_chart(
group,
dfs,
"Latency",
list(self.INFO_COLUMNS),
["avg_lat"],
"Time (us)",
)
)
if self._has_metric(dfs, "mb_sec"):
charts.append(
self.create_chart(
group,
dfs,
"Bandwidth",
list(self.INFO_COLUMNS),
["mb_sec"],
"Bandwidth (MB/s)",
)
)
if self._has_metric(dfs, "messages_sec"):
charts.append(
self.create_chart(
group,
dfs,
"Message Rate",
list(self.INFO_COLUMNS),
["messages_sec"],
"Messages/s",
)
)

return charts
156 changes: 156 additions & 0 deletions src/cloudai/workloads/osu_bench/report_generation_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import logging
import re
from enum import Enum
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING

from cloudai.core import ReportGenerationStrategy
from cloudai.util.lazy_imports import lazy

if TYPE_CHECKING:
import pandas as pd


class BenchmarkType(Enum):
"""Type of benchmark to extract data from."""

BANDWIDTH = 0
"""Bandwidth benchmark."""

MULTIPLE_BANDWIDTH = 1
"""Multiple bandwidth benchmark."""

LATENCY = 2
"""Latency benchmark."""


HEADERS = {
BenchmarkType.LATENCY: (
r"#\s*Size\s+Avg Latency\(us\)"
r"(?:\s+Min Latency\(us\)\s+Max Latency\(us\)\s+Iterations)?"
),
BenchmarkType.MULTIPLE_BANDWIDTH: r"#\s*Size\s+MB/s\s+Messages/s",
BenchmarkType.BANDWIDTH: r"#\s*Size\s+Bandwidth\s*\(MB/s\)",
}


def _detect_benchmark_type(line: str) -> BenchmarkType | None:
for b_type, header in HEADERS.items():
if re.match(header, line):
return b_type

return None


def _parse_data_row(parts: list[str], benchmark_type: BenchmarkType) -> list[str] | None:
if len(parts) < 2:
return None

try:
int(parts[0]) # message size
except ValueError:
return None

# Append row data based on benchmark type.
if benchmark_type == BenchmarkType.MULTIPLE_BANDWIDTH:
if len(parts) >= 3:
try:
float(parts[1]) # MB/s
float(parts[2]) # Messages/s
except ValueError:
return None
# size, MB/s, Messages/s
return [parts[0], parts[1], parts[2]]
return None

# BANDWIDTH and LATENCY: both use size + one value; column names in _columns_for_type
try:
float(parts[1]) # metric value
except ValueError:
return None
return [parts[0], parts[1]]


def _columns_for_type(benchmark_type: BenchmarkType) -> list[str]:
if benchmark_type == BenchmarkType.MULTIPLE_BANDWIDTH:
return ["size", "mb_sec", "messages_sec"]

if benchmark_type == BenchmarkType.BANDWIDTH:
return ["size", "mb_sec"]

return ["size", "avg_lat"]


@cache
def extract_osu_bench_data(stdout_file: Path) -> pd.DataFrame:
if not stdout_file.exists():
logging.debug(f"{stdout_file} not found")
return lazy.pd.DataFrame()

data: list[list[str]] = []
benchmark_type: BenchmarkType | None = None

for line in stdout_file.read_text().splitlines():
if benchmark_type is None:
benchmark_type = _detect_benchmark_type(line)
continue

if row := _parse_data_row(line.split(), benchmark_type):
data.append(row)

if benchmark_type is None:
return lazy.pd.DataFrame()

columns = _columns_for_type(benchmark_type)
df = lazy.pd.DataFrame(data, columns=lazy.pd.Index(columns))

df["size"] = df["size"].astype(int)

if "mb_sec" in df.columns:
df["mb_sec"] = df["mb_sec"].astype(float)

if "messages_sec" in df.columns:
df["messages_sec"] = df["messages_sec"].astype(float)

if "avg_lat" in df.columns:
df["avg_lat"] = df["avg_lat"].astype(float)

return df


class OSUBenchReportGenerationStrategy(ReportGenerationStrategy):
"""Report generation strategy for OSU Bench."""

@property
def results_file(self) -> Path:
return self.test_run.output_path / "stdout.txt"

def can_handle_directory(self) -> bool:
df = extract_osu_bench_data(self.results_file)
return not df.empty

def generate_report(self) -> None:
if not self.can_handle_directory():
return

df = extract_osu_bench_data(self.results_file)
df.to_csv(self.test_run.output_path / "osu_bench.csv", index=False)
Loading