Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions cmd/bench/c2client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"sync"
"time"

"github.com/goceleris/benchmarks/internal/bench"
"github.com/goceleris/loadgen"
"github.com/goceleris/loadgen/checkpoint"
)

// C2Client handles communication with the C2 orchestration server.
Expand Down Expand Up @@ -46,15 +47,15 @@ type C2BenchResult struct {
Errors int64 `json:"errors"`

// System metrics — client-side and server-side resource usage.
ClientCPUPercent float64 `json:"client_cpu_percent,omitempty"`
ServerCPUPercent float64 `json:"server_cpu_percent,omitempty"`
ServerCPUUserPercent float64 `json:"server_cpu_user_percent,omitempty"`
ServerCPUSysPercent float64 `json:"server_cpu_sys_percent,omitempty"`
ServerMemoryRSSMB float64 `json:"server_memory_rss_mb,omitempty"`
GCTotalPauseMs float64 `json:"gc_total_pause_ms,omitempty"`
GCMaxPauseMs float64 `json:"gc_max_pause_ms,omitempty"`
GCNumGC int `json:"gc_num_gc,omitempty"`
Timeseries []bench.TimeseriesPoint `json:"timeseries,omitempty"`
ClientCPUPercent float64 `json:"client_cpu_percent,omitempty"`
ServerCPUPercent float64 `json:"server_cpu_percent,omitempty"`
ServerCPUUserPercent float64 `json:"server_cpu_user_percent,omitempty"`
ServerCPUSysPercent float64 `json:"server_cpu_sys_percent,omitempty"`
ServerMemoryRSSMB float64 `json:"server_memory_rss_mb,omitempty"`
GCTotalPauseMs float64 `json:"gc_total_pause_ms,omitempty"`
GCMaxPauseMs float64 `json:"gc_max_pause_ms,omitempty"`
GCNumGC int `json:"gc_num_gc,omitempty"`
Timeseries []loadgen.TimeseriesPoint `json:"timeseries,omitempty"`
}

// runHeartbeat sends periodic heartbeats to C2.
Expand Down Expand Up @@ -116,7 +117,7 @@ func (c *C2Client) sendHeartbeat(ctx context.Context) {
}

// sendResult sends a single benchmark result to C2.
func (c *C2Client) sendResult(ctx context.Context, sr bench.ServerResult, result *bench.Result) error {
func (c *C2Client) sendResult(ctx context.Context, sr checkpoint.ServerResult, result *loadgen.Result) error {
url := fmt.Sprintf("%s/api/worker/results", c.endpoint)

// Convert to C2 format with proper duration values
Expand Down
26 changes: 13 additions & 13 deletions cmd/bench/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"strings"
"time"

"github.com/goceleris/benchmarks/internal/bench"
"github.com/goceleris/loadgen/checkpoint"
)

// benchConfig holds all parsed configuration for the benchmark runner.
Expand Down Expand Up @@ -333,22 +333,22 @@ func parseFlags() *benchConfig {
}

// setupCheckpoint creates or resumes a benchmark checkpoint.
func setupCheckpoint(cfg *benchConfig) *bench.Checkpoint {
var checkpoint *bench.Checkpoint
func setupCheckpoint(cfg *benchConfig) *checkpoint.Checkpoint {
var cp *checkpoint.Checkpoint
if cfg.resume {
if cp, err := bench.LoadCheckpoint(cfg.checkpointFile); err == nil {
checkpoint = cp
log.Printf("Resuming from checkpoint: %s (%d results completed)", cfg.checkpointFile, len(cp.Results))
if loaded, err := checkpoint.LoadCheckpoint(cfg.checkpointFile); err == nil {
cp = loaded
log.Printf("Resuming from checkpoint: %s (%d results completed)", cfg.checkpointFile, len(loaded.Results))
} else {
log.Printf("No existing checkpoint found, starting fresh")
}
}

if checkpoint == nil {
output := &bench.BenchmarkOutput{
if cp == nil {
output := &checkpoint.BenchmarkOutput{
Timestamp: time.Now().UTC().Format("2006-01-02T15_04_05Z"),
Architecture: cfg.arch,
Config: bench.BenchmarkConfig{
Config: checkpoint.BenchmarkConfig{
Duration: cfg.duration.String(),
DurationSecs: cfg.duration.Seconds(),
WarmupSecs: cfg.warmup.Seconds(),
Expand All @@ -359,9 +359,9 @@ func setupCheckpoint(cfg *benchConfig) *bench.Checkpoint {
H2MaxStreams: cfg.h2Streams,
BodySizeBytes: defaultBodySize,
},
Results: []bench.ServerResult{},
Results: []checkpoint.ServerResult{},
}
checkpoint = bench.NewCheckpoint(output)
cp = checkpoint.NewCheckpoint(output)
}

if cfg.skipBenchmarks != "" {
Expand All @@ -374,7 +374,7 @@ func setupCheckpoint(cfg *benchConfig) *bench.Checkpoint {
}
parts := strings.SplitN(pair, ":", 2)
if len(parts) == 2 {
checkpoint.MarkCompleted(parts[0], parts[1])
cp.MarkCompleted(parts[0], parts[1])
skippedCount++
}
}
Expand All @@ -383,5 +383,5 @@ func setupCheckpoint(cfg *benchConfig) *bench.Checkpoint {
}
}

return checkpoint
return cp
}
56 changes: 33 additions & 23 deletions cmd/bench/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ import (
"syscall"
"time"

"github.com/goceleris/benchmarks/internal/bench"
"github.com/goceleris/loadgen"
"github.com/goceleris/loadgen/checkpoint"

"github.com/goceleris/benchmarks/internal/dashboard"
)

// benchRunner coordinates benchmark execution across servers.
type benchRunner struct {
cfg *benchConfig
checkpoint *bench.Checkpoint
checkpoint *checkpoint.Checkpoint
rc *RemoteController
c2 *C2Client
}
Expand Down Expand Up @@ -150,19 +152,20 @@ func (r *benchRunner) runServerBenchmarks(ctx context.Context, serverType, serve
}

isH2 := strings.Contains(serverType, "-h2") || strings.Contains(serverType, "-hybrid")
cfg := bench.Config{
URL: fmt.Sprintf("http://%s:%s%s", serverHost, r.cfg.port, bt.Path),
Method: bt.Method,
Body: bt.Body,
Headers: bt.Headers,
Duration: r.cfg.duration,
Connections: r.cfg.connections,
Workers: r.cfg.workers,
WarmupTime: r.cfg.warmup,
KeepAlive: true,
H2C: isH2,
H2Connections: r.cfg.h2Conns,
H2MaxStreams: r.cfg.h2Streams,
cfg := loadgen.Config{
URL: fmt.Sprintf("http://%s:%s%s", serverHost, r.cfg.port, bt.Path),
Method: bt.Method,
Body: bt.Body,
Headers: bt.Headers,
Duration: r.cfg.duration,
Connections: r.cfg.connections,
Workers: r.cfg.workers,
Warmup: r.cfg.warmup,
HTTP2: isH2,
HTTP2Options: loadgen.HTTP2Options{
Connections: r.cfg.h2Conns,
MaxStreams: r.cfg.h2Streams,
},
}

// Fetch baseline server metrics before benchmark (remote mode only)
Expand All @@ -174,13 +177,17 @@ func (r *benchRunner) runServerBenchmarks(ctx context.Context, serverType, serve
}

// In remote mode, wrap the benchmark with retry logic
var result *bench.Result
var result *loadgen.Result
var err error

if r.cfg.remoteMode {
result, err = runBenchmarkWithRetry(ctx, cfg, r.rc, serverType, r.cfg.benchmarkTimeout, r.cfg.serverRetryInterval)
} else {
benchmarker := bench.New(cfg)
benchmarker, benchErr := loadgen.New(cfg)
if benchErr != nil {
log.Printf("ERROR: Failed to create benchmarker for %s/%s: %v — continuing with next benchmark", serverType, bt.Name, benchErr)
continue
}
result, err = benchmarker.Run(ctx)
}

Expand All @@ -204,21 +211,21 @@ func (r *benchRunner) runServerBenchmarks(ctx context.Context, serverType, serve

log.Printf("Completed: %.2f req/s, avg latency: %s", result.RequestsPerSec, result.Latency.Avg)

serverResult := result.ToServerResult(serverType, bt.Name, bt.Method, bt.Path)
serverResult := checkpoint.NewServerResult(result, serverType, bt.Name, bt.Method, bt.Path)

// Fetch post-benchmark server metrics and compute deltas
if r.cfg.remoteMode && r.rc != nil {
if postMetrics, err := r.rc.GetServerMetrics(ctx); err == nil {
sm := serverResult.Metrics
if sm == nil {
sm = &bench.SystemMetrics{}
sm = &checkpoint.SystemMetrics{}
}
sm.ServerCPUPercent = postMetrics.CPUPercent
sm.ServerCPUUserPercent = postMetrics.CPUUserPercent
sm.ServerCPUSysPercent = postMetrics.CPUSysPercent
sm.ServerMemoryRSSMB = postMetrics.MemoryRSSMB
if postMetrics.HasGC {
gc := &bench.GCPauseStats{
gc := &checkpoint.GCPauseStats{
TotalPauseMs: postMetrics.GCTotalPauseMs,
MaxPauseMs: postMetrics.GCMaxPauseMs,
NumGC: postMetrics.GCNumGC,
Expand Down Expand Up @@ -253,7 +260,7 @@ func (r *benchRunner) runServerBenchmarks(ctx context.Context, serverType, serve
// saveResults writes final output files, dashboard JSON, and logs the summary.
func (r *benchRunner) saveResults(completedBefore, skipped int) {
if r.cfg.mergeFile != "" {
if otherCP, err := bench.LoadCheckpoint(r.cfg.mergeFile); err == nil {
if otherCP, err := checkpoint.LoadCheckpoint(r.cfg.mergeFile); err == nil {
beforeMerge := len(r.checkpoint.Results)
r.checkpoint.MergeResults(otherCP)
afterMerge := len(r.checkpoint.Results)
Expand Down Expand Up @@ -326,7 +333,7 @@ func (r *benchRunner) saveResults(completedBefore, skipped int) {
}

// runBenchmarkWithRetry runs a benchmark with automatic retry on server failure.
func runBenchmarkWithRetry(ctx context.Context, cfg bench.Config, rc *RemoteController, serverType string, timeout, interval time.Duration) (*bench.Result, error) {
func runBenchmarkWithRetry(ctx context.Context, cfg loadgen.Config, rc *RemoteController, serverType string, timeout, interval time.Duration) (*loadgen.Result, error) {
deadline := time.Now().Add(timeout)
var lastErr error

Expand Down Expand Up @@ -383,7 +390,10 @@ func runBenchmarkWithRetry(ctx context.Context, cfg bench.Config, rc *RemoteCont
}

// Run the benchmark
benchmarker := bench.New(cfg)
benchmarker, benchErr := loadgen.New(cfg)
if benchErr != nil {
return nil, benchErr
}
result, err := benchmarker.Run(ctx)
if err != nil {
// Check if it's a connection error (server might have died)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/gin-gonic/gin v1.12.0
github.com/go-chi/chi/v5 v5.2.5
github.com/goceleris/celeris v1.0.0
github.com/goceleris/loadgen v1.0.0
github.com/gofiber/fiber/v2 v2.52.12
github.com/google/uuid v1.6.0
github.com/hertz-contrib/http2 v0.1.8
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ github.com/goccy/go-yaml v1.19.2 h1:PmFC1S6h8ljIz6gMRBopkjP1TVT7xuwrButHID66PoM=
github.com/goccy/go-yaml v1.19.2/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/goceleris/celeris v1.0.0 h1:MZsvsQVqck4lSxVom+6w02IHJva5hgwvydlMFtFTzP0=
github.com/goceleris/celeris v1.0.0/go.mod h1:RiNVnUsmvfsPxGQaT7XQduPCF+amSn1AsipteFNO+2o=
github.com/goceleris/loadgen v1.0.0 h1:3ViKRy+r1MUXtRvNrgWekZd1g9+9e+Yt/v43LnhUL2k=
github.com/goceleris/loadgen v1.0.0/go.mod h1:/GSoaCJ2VR2Kt/nzfRLrJTXLrZN5lEF+bR9AqfzQRTc=
github.com/gofiber/fiber/v2 v2.52.12 h1:0LdToKclcPOj8PktUdIKo9BUohjjwfnQl42Dhw8/WUw=
github.com/gofiber/fiber/v2 v2.52.12/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
Expand Down
9 changes: 5 additions & 4 deletions internal/dashboard/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"strings"
"time"

"github.com/goceleris/benchmarks/internal/bench"
"github.com/goceleris/loadgen/checkpoint"

"github.com/goceleris/benchmarks/internal/version"
)

Expand Down Expand Up @@ -127,7 +128,7 @@ type Fastest struct {
}

// ConvertToDashboard converts a BenchmarkOutput to the dashboard JSON format.
func ConvertToDashboard(output *bench.BenchmarkOutput, env EnvironmentInfo) *DashboardData {
func ConvertToDashboard(output *checkpoint.BenchmarkOutput, env EnvironmentInfo) *DashboardData {
// Group results by benchmark type
suiteMap := make(map[string][]ServerBenchmark)
suiteOrder := []string{}
Expand Down Expand Up @@ -232,7 +233,7 @@ func ConvertToDashboard(output *bench.BenchmarkOutput, env EnvironmentInfo) *Das
}

// convertSystemMetrics converts bench.SystemMetrics to DashboardSystemMetrics.
func convertSystemMetrics(m *bench.SystemMetrics) *DashboardSystemMetrics {
func convertSystemMetrics(m *checkpoint.SystemMetrics) *DashboardSystemMetrics {
dm := &DashboardSystemMetrics{
ServerCPUPercent: m.ServerCPUPercent,
ServerCPUUserPercent: m.ServerCPUUserPercent,
Expand Down Expand Up @@ -355,7 +356,7 @@ func parseCelerisObjective(server string) string {
}

// parseLatency converts string latency values to float64 milliseconds.
func parseLatency(l bench.LatencyResult) DashboardLatency {
func parseLatency(l checkpoint.LatencyResult) DashboardLatency {
return DashboardLatency{
Avg: parseDurationMs(l.Avg),
Max: parseDurationMs(l.Max),
Expand Down
Loading
Loading