From 7e721156c3377cbdde1706f18c5f44f49128afd6 Mon Sep 17 00:00:00 2001 From: Brian Dillmann Date: Mon, 27 Apr 2026 10:11:06 -0400 Subject: [PATCH] goodhistogram: add windowed histogram with Prometheus integration Cumulative histograms are the right primitive for Prometheus scraping, but operators and dashboards need rolling-window quantiles to see recent behavior. Windowed maintains a single cumulative histogram with two baseline snapshots rotated on a configurable interval. The windowed view is computed by subtracting the older baseline from the current cumulative state, covering 1-2x the window duration. Recording cost is identical to a plain Histogram (~20ns, lock-free); the mutex only protects baseline rotation. WindowedCollector and WindowedVec provide Prometheus integration for single and labeled windowed histograms, following the same adapter pattern as PrometheusCollector and HistogramVec. Co-Authored-By: roachdev-claude --- histogram.go | 20 ++ windowed.go | 134 +++++++++ windowed_benchmark_test.go | 128 ++++++++ windowed_collector.go | 52 ++++ windowed_test.go | 591 +++++++++++++++++++++++++++++++++++++ windowed_vec.go | 152 ++++++++++ 6 files changed, 1077 insertions(+) create mode 100644 windowed.go create mode 100644 windowed_benchmark_test.go create mode 100644 windowed_collector.go create mode 100644 windowed_test.go create mode 100644 windowed_vec.go diff --git a/histogram.go b/histogram.go index 6751294..aff0aa6 100644 --- a/histogram.go +++ b/histogram.go @@ -389,3 +389,23 @@ func (s *Snapshot) Merge(other *Snapshot) Snapshot { } return merged } + +// Sub returns a new Snapshot whose counts are the element-wise difference +// of s minus other. Both snapshots must share the same config. This is used +// to compute windowed views by subtracting a baseline snapshot from a +// current cumulative snapshot. +func (s *Snapshot) Sub(other *Snapshot) Snapshot { + diff := Snapshot{ + cfg: s.cfg, + Counts: make([]uint64, len(s.Counts)), + ZeroCount: s.ZeroCount - other.ZeroCount, + Underflow: s.Underflow - other.Underflow, + Overflow: s.Overflow - other.Overflow, + TotalCount: s.TotalCount - other.TotalCount, + TotalSum: s.TotalSum - other.TotalSum, + } + for i := range s.Counts { + diff.Counts[i] = s.Counts[i] - other.Counts[i] + } + return diff +} diff --git a/windowed.go b/windowed.go new file mode 100644 index 0000000..f7797ab --- /dev/null +++ b/windowed.go @@ -0,0 +1,134 @@ +// Copyright 2026 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package goodhistogram + +import ( + "sync" + "time" +) + +// Windowed is a histogram that supports both cumulative and windowed snapshots. +// It maintains a single cumulative histogram and two baseline snapshots that +// are rotated on a configurable interval. The windowed snapshot is computed by +// subtracting the older baseline from the current cumulative state, providing +// a view of recent activity spanning 1x-2x the window duration. +// +// Recording is identical in cost to a plain Histogram (~20ns): values are +// atomically added to the single underlying histogram with no locking. The +// mutex only protects baseline rotation, which happens lazily on +// WindowedSnapshot() calls or explicitly via Tick(). +type Windowed struct { + h *Histogram // cumulative — the only histogram, never swapped or reset + interval time.Duration // immutable after construction + + mu struct { + sync.Mutex + prevBaseline Snapshot // baseline from 2 ticks ago (subtracted for windowed view) + curBaseline Snapshot // baseline from 1 tick ago (promoted to prev on next tick) + nextTick time.Time + } +} + +// NewWindowed creates a new Windowed histogram. The params configure the +// bucket layout (same as New), and window is the rotation interval. Panics +// if window <= 0. +func NewWindowed(p Params, window time.Duration) *Windowed { + if window <= 0 { + panic("goodhistogram: window must be > 0") + } + h := New(p) + empty := h.Snapshot() + w := &Windowed{ + h: h, + interval: window, + } + w.mu.prevBaseline = empty + w.mu.curBaseline = empty + w.mu.nextTick = time.Now().Add(window) + return w +} + +// Record adds a value to the histogram. This is the hot path: O(1), +// lock-free, no allocations. Identical cost to Histogram.Record. +func (w *Windowed) Record(v int64) { + w.h.Record(v) +} + +// Snapshot returns a cumulative (all-time) snapshot of the histogram. +// This does not trigger a tick rotation. +func (w *Windowed) Snapshot() Snapshot { + return w.h.Snapshot() +} + +// WindowedSnapshot returns a snapshot of recent activity by subtracting +// the older baseline from the current cumulative state. If the tick +// interval has elapsed, a rotation is performed first (lazy ticking). +// +// The baseline is read before the cumulative snapshot. Snapshot reads +// per-bucket atomic counters non-atomically across buckets, so reading +// cur first would let a concurrent Tick capture a snapshot containing +// increments that had not yet been observed by our partial cur read; if +// that fresher snapshot then became prevBaseline before we read it, +// cur.Sub(&base) would underflow per-bucket uint64 counts. By taking +// base first under the lock and snapshotting cur after, every per-bucket +// read in cur happens strictly after every per-bucket read in base, so +// monotonically-increasing counters guarantee cur[i] >= base[i]. +func (w *Windowed) WindowedSnapshot() Snapshot { + w.maybeTick() + w.mu.Lock() + base := w.mu.prevBaseline + w.mu.Unlock() + cur := w.h.Snapshot() + return cur.Sub(&base) +} + +// Tick manually rotates the baselines: curBaseline becomes prevBaseline, +// and a fresh snapshot of the cumulative histogram becomes curBaseline. +// The next automatic tick time is reset to now + interval. +func (w *Windowed) Tick() { + w.mu.Lock() + w.tickLocked() + w.mu.Unlock() +} + +// maybeTick checks whether the tick interval has elapsed and rotates if so. +func (w *Windowed) maybeTick() { + now := time.Now() + w.mu.Lock() + if now.Before(w.mu.nextTick) { + w.mu.Unlock() + return + } + w.tickLocked() + w.mu.Unlock() +} + +// tickLocked performs the actual baseline rotation. Must be called with w.mu held. +func (w *Windowed) tickLocked() { + w.mu.prevBaseline = w.mu.curBaseline + w.mu.curBaseline = w.h.Snapshot() + w.mu.nextTick = time.Now().Add(w.interval) +} + +// Schema returns the Prometheus native histogram schema (0-8). +func (w *Windowed) Schema() int32 { + return w.h.Schema() +} + +// Reset zeroes the cumulative histogram, resets both baselines to empty, +// and resets the tick timer. +func (w *Windowed) Reset() { + w.h.Reset() + w.mu.Lock() + defer w.mu.Unlock() + empty := w.h.Snapshot() + w.mu.prevBaseline = empty + w.mu.curBaseline = empty + w.mu.nextTick = time.Now().Add(w.interval) +} diff --git a/windowed_benchmark_test.go b/windowed_benchmark_test.go new file mode 100644 index 0000000..ec6e54d --- /dev/null +++ b/windowed_benchmark_test.go @@ -0,0 +1,128 @@ +// Copyright 2026 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package goodhistogram + +import ( + "fmt" + "math/rand" + "runtime" + "testing" + "time" +) + +func BenchmarkWindowedMemory(b *testing.B) { + var before, after runtime.MemStats + + runtime.GC() + runtime.ReadMemStats(&before) + + histograms := make([]*Windowed, b.N) + for i := range histograms { + histograms[i] = NewWindowed( + Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound}, + 10*time.Second, + ) + } + + runtime.GC() + runtime.ReadMemStats(&after) + + bytesPerHist := float64(after.TotalAlloc-before.TotalAlloc) / float64(b.N) + b.ReportMetric(bytesPerHist, "bytes/histogram") + runtime.KeepAlive(histograms) +} + +func BenchmarkWindowedRecordSingleThread(b *testing.B) { + rng := rand.New(rand.NewSource(42)) + vals := makeInt64Values(rng, 1<<16) + + w := NewWindowed( + Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound}, + 10*time.Second, + ) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w.Record(vals[i&(len(vals)-1)]) + } +} + +func BenchmarkWindowedRecordContention(b *testing.B) { + for _, numGoroutines := range []int{50, 100} { + b.Run(fmt.Sprintf("goroutines=%d", numGoroutines), func(b *testing.B) { + w := NewWindowed( + Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound}, + 10*time.Second, + ) + b.SetParallelism(numGoroutines) + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(rand.Int63())) + vals := makeInt64Values(rng, 1<<12) + i := 0 + for pb.Next() { + w.Record(vals[i&(len(vals)-1)]) + i++ + } + }) + }) + } +} + +func BenchmarkWindowedSnapshot(b *testing.B) { + w := NewWindowed( + Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound}, + 10*time.Second, + ) + rng := rand.New(rand.NewSource(42)) + for i := 0; i < 10000; i++ { + w.Record(int64(rng.Float64()*benchRange + benchLo)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = w.Snapshot() + } +} + +func BenchmarkWindowedWindowedSnapshot(b *testing.B) { + w := NewWindowed( + Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound}, + 10*time.Second, + ) + rng := rand.New(rand.NewSource(42)) + for i := 0; i < 10000; i++ { + w.Record(int64(rng.Float64()*benchRange + benchLo)) + } + // Tick once so prevBaseline is non-empty. + w.Tick() + for i := 0; i < 5000; i++ { + w.Record(int64(rng.Float64()*benchRange + benchLo)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = w.WindowedSnapshot() + } +} + +func BenchmarkWindowedTick(b *testing.B) { + w := NewWindowed( + Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound}, + time.Hour, // long interval so maybeTick doesn't interfere + ) + rng := rand.New(rand.NewSource(42)) + for i := 0; i < 10000; i++ { + w.Record(int64(rng.Float64()*benchRange + benchLo)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w.Tick() + } +} diff --git a/windowed_collector.go b/windowed_collector.go new file mode 100644 index 0000000..30bdb00 --- /dev/null +++ b/windowed_collector.go @@ -0,0 +1,52 @@ +// Copyright 2026 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package goodhistogram + +import ( + "github.com/prometheus/client_golang/prometheus" + prometheusgo "github.com/prometheus/client_model/go" +) + +// WindowedCollector wraps a Windowed histogram as a prometheus.Collector, +// allowing it to be registered with a Prometheus registry. Collect() exports +// cumulative data, as Prometheus expects monotonically increasing counters. +type WindowedCollector struct { + w *Windowed + desc *prometheus.Desc +} + +// ToPrometheusCollector returns a WindowedCollector that exposes this +// windowed histogram to a Prometheus registry. The exported data is +// cumulative (all-time), not windowed, since Prometheus expects monotonic +// counters. +func (w *Windowed) ToPrometheusCollector(desc *prometheus.Desc) *WindowedCollector { + return &WindowedCollector{w: w, desc: desc} +} + +// Describe implements prometheus.Collector. +func (c *WindowedCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect implements prometheus.Collector. +func (c *WindowedCollector) Collect(ch chan<- prometheus.Metric) { + ch <- c +} + +// Desc implements prometheus.Metric. +func (c *WindowedCollector) Desc() *prometheus.Desc { + return c.desc +} + +// Write implements prometheus.Metric. +func (c *WindowedCollector) Write(m *prometheusgo.Metric) error { + snap := c.w.Snapshot() + m.Histogram = snap.ToPrometheusHistogram() + return nil +} diff --git a/windowed_test.go b/windowed_test.go new file mode 100644 index 0000000..c6e3e71 --- /dev/null +++ b/windowed_test.go @@ -0,0 +1,591 @@ +// Copyright 2026 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package goodhistogram + +import ( + "fmt" + "math/rand" + "sort" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + prometheusgo "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func TestWindowedRecordAndSnapshot(t *testing.T) { + w := NewWindowed(Params{Lo: 100, Hi: 1e9}, 10*time.Second) + + w.Record(500) + w.Record(1000) + w.Record(2000) + + snap := w.Snapshot() + require.Equal(t, uint64(3), snap.TotalCount) + require.Equal(t, int64(3500), snap.TotalSum) +} + +func TestWindowedSnapshotIsCumulative(t *testing.T) { + w := NewWindowed(Params{Lo: 100, Hi: 1e9}, 10*time.Second) + + w.Record(500) + w.Record(1000) + w.Tick() + w.Record(2000) + w.Tick() + w.Record(3000) + + // Cumulative snapshot should have all 4 observations. + snap := w.Snapshot() + require.Equal(t, uint64(4), snap.TotalCount) + require.Equal(t, int64(6500), snap.TotalSum) +} + +func TestWindowedWindowedSnapshotBeforeTick(t *testing.T) { + w := NewWindowed(Params{Lo: 100, Hi: 1e9}, time.Hour) + + w.Record(500) + w.Record(1000) + + // Before any tick, windowed = cumulative - emptyBaseline = cumulative. + windowed := w.WindowedSnapshot() + cum := w.Snapshot() + require.Equal(t, cum.TotalCount, windowed.TotalCount) + require.Equal(t, cum.TotalSum, windowed.TotalSum) +} + +func TestWindowedTick(t *testing.T) { + w := NewWindowed(Params{Lo: 100, Hi: 1e9}, time.Hour) + + // Window 1: record 3 values. + w.Record(100) + w.Record(200) + w.Record(300) + w.Tick() + + // Window 2: record 2 values. + w.Record(400) + w.Record(500) + + // Cumulative: all 5. + snap := w.Snapshot() + require.Equal(t, uint64(5), snap.TotalCount) + require.Equal(t, int64(1500), snap.TotalSum) + + // Windowed: cum - prevBaseline (which is the empty initial baseline). + // Since only one tick happened, prevBaseline is still the initial empty + // snapshot, so windowed == cumulative. + windowed := w.WindowedSnapshot() + require.Equal(t, uint64(5), windowed.TotalCount) + require.Equal(t, int64(1500), windowed.TotalSum) +} + +func TestWindowedMultipleTicks(t *testing.T) { + w := NewWindowed(Params{Lo: 100, Hi: 1e9}, time.Hour) + + // Window 1: 100 observations of 100. + for i := 0; i < 100; i++ { + w.Record(100) + } + w.Tick() + + // Window 2: 50 observations of 200. + for i := 0; i < 50; i++ { + w.Record(200) + } + w.Tick() + + // Window 3 (current): 25 observations of 300. + for i := 0; i < 25; i++ { + w.Record(300) + } + + // Cumulative: all 175 observations. + snap := w.Snapshot() + require.Equal(t, uint64(175), snap.TotalCount) + require.Equal(t, int64(100*100+50*200+25*300), snap.TotalSum) + + // Windowed: cum - prevBaseline. After 2 ticks, prevBaseline is the + // snapshot taken at tick 1 (100 observations). So windowed should + // have 75 observations (50 + 25). + windowed := w.WindowedSnapshot() + require.Equal(t, uint64(75), windowed.TotalCount) + require.Equal(t, int64(50*200+25*300), windowed.TotalSum) +} + +func TestWindowedOldDataEvicted(t *testing.T) { + w := NewWindowed(Params{Lo: 100, Hi: 1e9}, time.Hour) + + // Window 1. + for i := 0; i < 100; i++ { + w.Record(100) + } + w.Tick() + + // Window 2. + for i := 0; i < 50; i++ { + w.Record(200) + } + w.Tick() + + // Window 3. + for i := 0; i < 30; i++ { + w.Record(300) + } + w.Tick() + + // Window 4 (current): nothing recorded yet. + + // After 3 ticks, prevBaseline = snapshot at tick 2 (100 + 50 = 150 obs). + // Cumulative has 180. Windowed = 180 - 150 = 30. + windowed := w.WindowedSnapshot() + require.Equal(t, uint64(30), windowed.TotalCount) + require.Equal(t, int64(30*300), windowed.TotalSum) +} + +func TestWindowedLazyTick(t *testing.T) { + w := NewWindowed(Params{Lo: 100, Hi: 1e9}, 10*time.Millisecond) + + w.Record(500) + time.Sleep(20 * time.Millisecond) + w.Record(1000) + + // WindowedSnapshot should trigger a lazy tick. After the tick, + // prevBaseline = initial empty, curBaseline = snapshot taken now. + // So windowed = cum - empty = cum. + windowed := w.WindowedSnapshot() + require.Equal(t, uint64(2), windowed.TotalCount) +} + +func TestWindowedSnapshotDoesNotTick(t *testing.T) { + w := NewWindowed(Params{Lo: 100, Hi: 1e9}, 10*time.Millisecond) + + w.Record(500) + time.Sleep(20 * time.Millisecond) + + // Calling Snapshot() (cumulative) should NOT trigger a tick. + _ = w.Snapshot() + + time.Sleep(20 * time.Millisecond) + w.Record(1000) + + // WindowedSnapshot will lazy-tick. If Snapshot() had ticked earlier, + // prevBaseline would include the first observation, and the second + // sleep would cause another tick advancing prevBaseline further — + // yielding only 1 observation. With Snapshot() not ticking, only + // one tick happens here (from the first expired window), so + // prevBaseline remains the initial empty snapshot and windowed + // includes both observations. + windowed := w.WindowedSnapshot() + require.Equal(t, uint64(2), windowed.TotalCount) +} + +func TestWindowedReset(t *testing.T) { + w := NewWindowed(Params{Lo: 100, Hi: 1e9}, time.Hour) + + for i := 0; i < 100; i++ { + w.Record(500) + } + w.Tick() + for i := 0; i < 50; i++ { + w.Record(1000) + } + + w.Reset() + + snap := w.Snapshot() + require.Equal(t, uint64(0), snap.TotalCount) + require.Equal(t, int64(0), snap.TotalSum) + + windowed := w.WindowedSnapshot() + require.Equal(t, uint64(0), windowed.TotalCount) + require.Equal(t, int64(0), windowed.TotalSum) + + // Recording after reset should work normally. + w.Record(123) + snap = w.Snapshot() + require.Equal(t, uint64(1), snap.TotalCount) + require.Equal(t, int64(123), snap.TotalSum) +} + +func TestWindowedConcurrentRecord(t *testing.T) { + w := NewWindowed(Params{Lo: 1, Hi: 1e6}, time.Hour) + const goroutines = 8 + const recordsPerGoroutine = 10000 + + var wg sync.WaitGroup + wg.Add(goroutines) + for g := 0; g < goroutines; g++ { + go func(seed int64) { + defer wg.Done() + rng := rand.New(rand.NewSource(seed)) + for i := 0; i < recordsPerGoroutine; i++ { + v := rng.Int63n(1e6) + 1 + w.Record(v) + } + }(int64(g)) + } + wg.Wait() + + snap := w.Snapshot() + require.Equal(t, uint64(goroutines*recordsPerGoroutine), snap.TotalCount) +} + +func TestWindowedConcurrentRecordAndTick(t *testing.T) { + w := NewWindowed(Params{Lo: 1, Hi: 1e6}, time.Hour) + const goroutines = 8 + const recordsPerGoroutine = 10000 + + var wg sync.WaitGroup + wg.Add(goroutines + 1) + + // Recording goroutines. + for g := 0; g < goroutines; g++ { + go func(seed int64) { + defer wg.Done() + rng := rand.New(rand.NewSource(seed)) + for i := 0; i < recordsPerGoroutine; i++ { + v := rng.Int63n(1e6) + 1 + w.Record(v) + } + }(int64(g)) + } + + // Ticking goroutine. + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + w.Tick() + _ = w.WindowedSnapshot() + } + }() + + wg.Wait() + + // Cumulative should have all observations. + snap := w.Snapshot() + require.Equal(t, uint64(goroutines*recordsPerGoroutine), snap.TotalCount) +} + +// TestWindowedSnapshotConcurrentTickNoUnderflow asserts that WindowedSnapshot +// never underflows when racing against concurrent Record and Tick calls. The +// invariant is that every diff field stays bounded by the total number of +// records ever made; underflow on uint64 subtraction would produce values +// near 2^64. +func TestWindowedSnapshotConcurrentTickNoUnderflow(t *testing.T) { + w := NewWindowed(Params{Lo: 1, Hi: 1e6}, time.Hour) + const recorders = 8 + const recordsPerGoroutine = 50000 + const maxTotal = uint64(recorders * recordsPerGoroutine) + + var recordersWg sync.WaitGroup + var helpersWg sync.WaitGroup + stop := make(chan struct{}) + + check := func(field string, got uint64) { + if got > maxTotal { + t.Errorf("WindowedSnapshot %s underflowed: got %d, max possible %d", + field, got, maxTotal) + } + } + + recordersWg.Add(recorders) + for g := 0; g < recorders; g++ { + go func(seed int64) { + defer recordersWg.Done() + rng := rand.New(rand.NewSource(seed)) + for i := 0; i < recordsPerGoroutine; i++ { + w.Record(rng.Int63n(1e6) + 1) + } + }(int64(g)) + } + + // Ticker: rotate baselines as fast as possible to maximize the chance + // of two ticks landing inside a single Snapshot read. + helpersWg.Add(1) + go func() { + defer helpersWg.Done() + for { + select { + case <-stop: + return + default: + w.Tick() + } + } + }() + + // Observer: take WindowedSnapshots and check the invariant. + helpersWg.Add(1) + go func() { + defer helpersWg.Done() + for { + select { + case <-stop: + return + default: + snap := w.WindowedSnapshot() + check("TotalCount", snap.TotalCount) + check("ZeroCount", snap.ZeroCount) + check("Underflow", snap.Underflow) + check("Overflow", snap.Overflow) + for i, c := range snap.Counts { + check(fmt.Sprintf("bucket[%d]", i), c) + } + } + } + }() + + recordersWg.Wait() + close(stop) + helpersWg.Wait() +} + +func TestWindowedSchema(t *testing.T) { + w := NewWindowed(Params{Lo: 1, Hi: 1e6, ErrorBound: 0.05}, time.Hour) + require.Equal(t, int32(3), w.Schema()) +} + +func TestNewWindowedPanicsOnZeroWindow(t *testing.T) { + require.Panics(t, func() { + NewWindowed(Params{Lo: 1, Hi: 1000}, 0) + }) + require.Panics(t, func() { + NewWindowed(Params{Lo: 1, Hi: 1000}, -time.Second) + }) +} + +// WindowedCollector tests. + +func TestWindowedCollectorRegistration(t *testing.T) { + w := NewWindowed(Params{Lo: 100, Hi: 1e9}, time.Hour) + desc := prometheus.NewDesc("test_windowed", "test", nil, nil) + collector := w.ToPrometheusCollector(desc) + + reg := prometheus.NewRegistry() + require.NoError(t, reg.Register(collector)) + + w.Record(500) + w.Record(1000) + w.Record(2000) + + families, err := reg.Gather() + require.NoError(t, err) + require.Len(t, families, 1) + require.Equal(t, "test_windowed", *families[0].Name) + + metrics := families[0].Metric + require.Len(t, metrics, 1) + require.Equal(t, uint64(3), *metrics[0].Histogram.SampleCount) + require.Equal(t, float64(3500), *metrics[0].Histogram.SampleSum) +} + +func TestWindowedCollectorExportsCumulative(t *testing.T) { + w := NewWindowed(Params{Lo: 100, Hi: 1e9}, time.Hour) + + // Record in window 1. + w.Record(500) + w.Record(1000) + w.Tick() + + // Record in window 2. + w.Record(2000) + w.Tick() + + // Record in window 3. + w.Record(3000) + + // Collector should export cumulative data (all 4 observations), + // not windowed data. + var collected prometheusgo.Metric + desc := prometheus.NewDesc("test", "test", nil, nil) + collector := w.ToPrometheusCollector(desc) + require.NoError(t, collector.Write(&collected)) + + require.Equal(t, uint64(4), collected.Histogram.GetSampleCount()) + require.Equal(t, float64(6500), collected.Histogram.GetSampleSum()) +} + +// WindowedVec tests. + +func TestWindowedVecGather(t *testing.T) { + vec := NewWindowedVec( + Params{Lo: 100, Hi: 1e9}, time.Hour, + "request_duration_ns", "Request duration", []string{"method", "path"}, + ) + reg := prometheus.NewRegistry() + require.NoError(t, reg.Register(vec)) + + vec.WithLabelValues("GET", "/api").Record(1000) + vec.WithLabelValues("GET", "/api").Record(2000) + vec.WithLabelValues("POST", "/api").Record(5000) + + families, err := reg.Gather() + require.NoError(t, err) + require.Len(t, families, 1) + require.Equal(t, "request_duration_ns", *families[0].Name) + + metrics := families[0].Metric + require.Len(t, metrics, 2) + + sort.Slice(metrics, func(i, j int) bool { + return *metrics[i].Label[0].Value < *metrics[j].Label[0].Value + }) + + require.Equal(t, "GET", *metrics[0].Label[0].Value) + require.Equal(t, uint64(2), *metrics[0].Histogram.SampleCount) + require.Equal(t, float64(3000), *metrics[0].Histogram.SampleSum) + + require.Equal(t, "POST", *metrics[1].Label[0].Value) + require.Equal(t, uint64(1), *metrics[1].Histogram.SampleCount) + require.Equal(t, float64(5000), *metrics[1].Histogram.SampleSum) +} + +func TestWindowedVecSamePointer(t *testing.T) { + vec := NewWindowedVec( + Params{Lo: 100, Hi: 1e9}, time.Hour, + "test", "test", []string{"x"}, + ) + w1 := vec.WithLabelValues("a") + w2 := vec.WithLabelValues("a") + require.True(t, w1 == w2, "WithLabelValues should return the same *Windowed") +} + +func TestWindowedVecWrongLabelCountPanics(t *testing.T) { + vec := NewWindowedVec( + Params{Lo: 100, Hi: 1e9}, time.Hour, + "test", "test", []string{"method", "path"}, + ) + require.Panics(t, func() { vec.WithLabelValues("GET") }) +} + +func TestWindowedVecDeleteAndReset(t *testing.T) { + vec := NewWindowedVec( + Params{Lo: 100, Hi: 1e9}, time.Hour, + "test", "test", []string{"x"}, + ) + reg := prometheus.NewRegistry() + require.NoError(t, reg.Register(vec)) + + vec.WithLabelValues("a").Record(100) + vec.WithLabelValues("b").Record(200) + + require.True(t, vec.DeleteLabelValues("a")) + require.False(t, vec.DeleteLabelValues("a")) + + families, err := reg.Gather() + require.NoError(t, err) + require.Len(t, families[0].Metric, 1) + require.Equal(t, "b", *families[0].Metric[0].Label[0].Value) + + vec.Reset() + families, err = reg.Gather() + require.NoError(t, err) + require.Empty(t, families) +} + +func TestWindowedVecTickAll(t *testing.T) { + vec := NewWindowedVec( + Params{Lo: 100, Hi: 1e9}, time.Hour, + "test", "test", []string{"x"}, + ) + + // Record in window 1. + vec.WithLabelValues("a").Record(100) + vec.WithLabelValues("b").Record(200) + vec.TickAll() + + // Record in window 2. + vec.WithLabelValues("a").Record(300) + vec.WithLabelValues("b").Record(400) + vec.TickAll() + + // Record in window 3. + vec.WithLabelValues("a").Record(500) + vec.WithLabelValues("b").Record(600) + + // Windowed snapshots should only cover windows 2+3 (after 2 ticks, + // prevBaseline = snapshot at tick 1). + wA := vec.WithLabelValues("a").WindowedSnapshot() + require.Equal(t, uint64(2), wA.TotalCount) // 300 + 500 + require.Equal(t, int64(800), wA.TotalSum) + + wB := vec.WithLabelValues("b").WindowedSnapshot() + require.Equal(t, uint64(2), wB.TotalCount) // 400 + 600 + require.Equal(t, int64(1000), wB.TotalSum) +} + +func TestWindowedVecConcurrent(t *testing.T) { + vec := NewWindowedVec( + Params{Lo: 100, Hi: 1e9}, time.Hour, + "test", "test", []string{"x"}, + ) + + var wg sync.WaitGroup + labels := []string{"a", "b", "c", "d"} + for _, l := range labels { + for g := 0; g < 10; g++ { + wg.Add(1) + go func(label string) { + defer wg.Done() + w := vec.WithLabelValues(label) + for i := 0; i < 1000; i++ { + w.Record(500) + } + }(l) + } + } + wg.Wait() + + reg := prometheus.NewRegistry() + require.NoError(t, reg.Register(vec)) + families, err := reg.Gather() + require.NoError(t, err) + require.Len(t, families[0].Metric, 4) + for _, m := range families[0].Metric { + require.Equal(t, uint64(10000), *m.Histogram.SampleCount) + } +} + +// Snapshot.Sub tests. + +func TestSnapshotSub(t *testing.T) { + h := New(Params{Lo: 100, Hi: 1e9}) + + // Take baseline after 3 observations. + h.Record(100) + h.Record(200) + h.Record(300) + baseline := h.Snapshot() + + // Record 2 more observations. + h.Record(400) + h.Record(500) + current := h.Snapshot() + + diff := current.Sub(&baseline) + require.Equal(t, uint64(2), diff.TotalCount) + require.Equal(t, int64(900), diff.TotalSum) // 400 + 500 +} + +func TestSnapshotSubIdentity(t *testing.T) { + h := New(Params{Lo: 100, Hi: 1e9}) + h.Record(500) + h.Record(1000) + + snap := h.Snapshot() + diff := snap.Sub(&snap) + require.Equal(t, uint64(0), diff.TotalCount) + require.Equal(t, int64(0), diff.TotalSum) + for _, c := range diff.Counts { + require.Equal(t, uint64(0), c) + } +} diff --git a/windowed_vec.go b/windowed_vec.go new file mode 100644 index 0000000..b8028c0 --- /dev/null +++ b/windowed_vec.go @@ -0,0 +1,152 @@ +// Copyright 2026 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package goodhistogram + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + prometheusgo "github.com/prometheus/client_model/go" +) + +// WindowedVec is a collection of Windowed histograms partitioned by label +// values. It implements prometheus.Collector so the entire vec can be +// registered with a Prometheus registry. Recording is done on the individual +// *Windowed returned by WithLabelValues. +type WindowedVec struct { + params Params + window time.Duration + desc *prometheus.Desc + labelNames []string + + mu struct { + sync.RWMutex + histograms map[string]*labeledWindowed + } +} + +type labeledWindowed struct { + w *Windowed + labelPairs []*prometheusgo.LabelPair +} + +// NewWindowedVec creates a new WindowedVec. All child histograms share the +// same Params and window duration. +func NewWindowedVec( + p Params, window time.Duration, name, help string, labelNames []string, +) *WindowedVec { + v := &WindowedVec{ + params: p, + window: window, + desc: prometheus.NewDesc(name, help, labelNames, nil), + labelNames: labelNames, + } + v.mu.histograms = make(map[string]*labeledWindowed) + return v +} + +// WithLabelValues returns the Windowed histogram for the given label values, +// creating it if it doesn't exist. Panics if the number of values doesn't +// match the number of label names. +func (v *WindowedVec) WithLabelValues(lvs ...string) *Windowed { + if len(lvs) != len(v.labelNames) { + panic(fmt.Sprintf( + "goodhistogram: expected %d label values, got %d", + len(v.labelNames), len(lvs), + )) + } + key := strings.Join(lvs, "\xff") + + v.mu.RLock() + if lw, ok := v.mu.histograms[key]; ok { + v.mu.RUnlock() + return lw.w + } + v.mu.RUnlock() + + v.mu.Lock() + defer v.mu.Unlock() + // Re-check after upgrading to a write lock: another goroutine may + // have inserted the entry between our RUnlock and Lock. + if lw, ok := v.mu.histograms[key]; ok { + return lw.w + } + w := NewWindowed(v.params, v.window) + v.mu.histograms[key] = &labeledWindowed{ + w: w, + labelPairs: makeLabelPairs(v.labelNames, lvs), + } + return w +} + +// DeleteLabelValues removes the Windowed histogram for the given label values. +// Returns true if the entry existed. +func (v *WindowedVec) DeleteLabelValues(lvs ...string) bool { + key := strings.Join(lvs, "\xff") + v.mu.Lock() + defer v.mu.Unlock() + _, ok := v.mu.histograms[key] + delete(v.mu.histograms, key) + return ok +} + +// Reset removes all child histograms. +func (v *WindowedVec) Reset() { + v.mu.Lock() + defer v.mu.Unlock() + v.mu.histograms = make(map[string]*labeledWindowed) +} + +// TickAll manually rotates all child histograms. +func (v *WindowedVec) TickAll() { + v.mu.RLock() + defer v.mu.RUnlock() + for _, lw := range v.mu.histograms { + lw.w.Tick() + } +} + +// Describe implements prometheus.Collector. +func (v *WindowedVec) Describe(ch chan<- *prometheus.Desc) { + ch <- v.desc +} + +// Collect implements prometheus.Collector. Exports cumulative data since +// Prometheus expects monotonically increasing counters. +func (v *WindowedVec) Collect(ch chan<- prometheus.Metric) { + v.mu.RLock() + defer v.mu.RUnlock() + for _, lw := range v.mu.histograms { + ch <- &windowedMetric{ + desc: v.desc, + w: lw.w, + labelPairs: lw.labelPairs, + } + } +} + +// windowedMetric implements prometheus.Metric for a single labeled +// windowed histogram. +type windowedMetric struct { + desc *prometheus.Desc + w *Windowed + labelPairs []*prometheusgo.LabelPair +} + +func (m *windowedMetric) Desc() *prometheus.Desc { return m.desc } + +func (m *windowedMetric) Write(out *prometheusgo.Metric) error { + snap := m.w.Snapshot() // cumulative + out.Histogram = snap.ToPrometheusHistogram() + out.Label = m.labelPairs + return nil +}