Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,23 @@ func (s *Snapshot) Merge(other *Snapshot) Snapshot {
}
return merged
}

// Sub returns a new Snapshot whose counts are the element-wise difference
// of s minus other. Both snapshots must share the same config. This is used
// to compute windowed views by subtracting a baseline snapshot from a
// current cumulative snapshot.
func (s *Snapshot) Sub(other *Snapshot) Snapshot {
diff := Snapshot{
cfg: s.cfg,
Counts: make([]uint64, len(s.Counts)),
ZeroCount: s.ZeroCount - other.ZeroCount,
Underflow: s.Underflow - other.Underflow,
Overflow: s.Overflow - other.Overflow,
TotalCount: s.TotalCount - other.TotalCount,
TotalSum: s.TotalSum - other.TotalSum,
}
for i := range s.Counts {
diff.Counts[i] = s.Counts[i] - other.Counts[i]
}
return diff
}
134 changes: 134 additions & 0 deletions windowed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2026 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0

package goodhistogram

import (
"sync"
"time"
)

// Windowed is a histogram that supports both cumulative and windowed snapshots.
// It maintains a single cumulative histogram and two baseline snapshots that
// are rotated on a configurable interval. The windowed snapshot is computed by
// subtracting the older baseline from the current cumulative state, providing
// a view of recent activity spanning 1x-2x the window duration.
//
// Recording is identical in cost to a plain Histogram (~20ns): values are
// atomically added to the single underlying histogram with no locking. The
// mutex only protects baseline rotation, which happens lazily on
// WindowedSnapshot() calls or explicitly via Tick().
type Windowed struct {
h *Histogram // cumulative — the only histogram, never swapped or reset
interval time.Duration // immutable after construction

mu struct {
sync.Mutex
prevBaseline Snapshot // baseline from 2 ticks ago (subtracted for windowed view)
curBaseline Snapshot // baseline from 1 tick ago (promoted to prev on next tick)
nextTick time.Time
}
}

// NewWindowed creates a new Windowed histogram. The params configure the
// bucket layout (same as New), and window is the rotation interval. Panics
// if window <= 0.
func NewWindowed(p Params, window time.Duration) *Windowed {
if window <= 0 {
panic("goodhistogram: window must be > 0")
}
h := New(p)
empty := h.Snapshot()
w := &Windowed{
h: h,
interval: window,
}
w.mu.prevBaseline = empty
w.mu.curBaseline = empty
w.mu.nextTick = time.Now().Add(window)
return w
}

// Record adds a value to the histogram. This is the hot path: O(1),
// lock-free, no allocations. Identical cost to Histogram.Record.
func (w *Windowed) Record(v int64) {
w.h.Record(v)
}

// Snapshot returns a cumulative (all-time) snapshot of the histogram.
// This does not trigger a tick rotation.
func (w *Windowed) Snapshot() Snapshot {
return w.h.Snapshot()
}

// WindowedSnapshot returns a snapshot of recent activity by subtracting
// the older baseline from the current cumulative state. If the tick
// interval has elapsed, a rotation is performed first (lazy ticking).
//
// The baseline is read before the cumulative snapshot. Snapshot reads
// per-bucket atomic counters non-atomically across buckets, so reading
// cur first would let a concurrent Tick capture a snapshot containing
// increments that had not yet been observed by our partial cur read; if
// that fresher snapshot then became prevBaseline before we read it,
// cur.Sub(&base) would underflow per-bucket uint64 counts. By taking
// base first under the lock and snapshotting cur after, every per-bucket
// read in cur happens strictly after every per-bucket read in base, so
// monotonically-increasing counters guarantee cur[i] >= base[i].
func (w *Windowed) WindowedSnapshot() Snapshot {
w.maybeTick()
w.mu.Lock()
base := w.mu.prevBaseline
w.mu.Unlock()
cur := w.h.Snapshot()
return cur.Sub(&base)
}

// Tick manually rotates the baselines: curBaseline becomes prevBaseline,
// and a fresh snapshot of the cumulative histogram becomes curBaseline.
// The next automatic tick time is reset to now + interval.
func (w *Windowed) Tick() {
w.mu.Lock()
w.tickLocked()
w.mu.Unlock()
}

// maybeTick checks whether the tick interval has elapsed and rotates if so.
func (w *Windowed) maybeTick() {
now := time.Now()
w.mu.Lock()
if now.Before(w.mu.nextTick) {
w.mu.Unlock()
return
}
w.tickLocked()
w.mu.Unlock()
}

// tickLocked performs the actual baseline rotation. Must be called with w.mu held.
func (w *Windowed) tickLocked() {
w.mu.prevBaseline = w.mu.curBaseline
w.mu.curBaseline = w.h.Snapshot()
w.mu.nextTick = time.Now().Add(w.interval)
}

// Schema returns the Prometheus native histogram schema (0-8).
func (w *Windowed) Schema() int32 {
return w.h.Schema()
}

// Reset zeroes the cumulative histogram, resets both baselines to empty,
// and resets the tick timer.
func (w *Windowed) Reset() {
w.h.Reset()
w.mu.Lock()
defer w.mu.Unlock()
empty := w.h.Snapshot()
w.mu.prevBaseline = empty
w.mu.curBaseline = empty
w.mu.nextTick = time.Now().Add(w.interval)
}
128 changes: 128 additions & 0 deletions windowed_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2026 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0

package goodhistogram

import (
"fmt"
"math/rand"
"runtime"
"testing"
"time"
)

func BenchmarkWindowedMemory(b *testing.B) {
var before, after runtime.MemStats

runtime.GC()
runtime.ReadMemStats(&before)

histograms := make([]*Windowed, b.N)
for i := range histograms {
histograms[i] = NewWindowed(
Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound},
10*time.Second,
)
}

runtime.GC()
runtime.ReadMemStats(&after)

bytesPerHist := float64(after.TotalAlloc-before.TotalAlloc) / float64(b.N)
b.ReportMetric(bytesPerHist, "bytes/histogram")
runtime.KeepAlive(histograms)
}

func BenchmarkWindowedRecordSingleThread(b *testing.B) {
rng := rand.New(rand.NewSource(42))
vals := makeInt64Values(rng, 1<<16)

w := NewWindowed(
Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound},
10*time.Second,
)

b.ResetTimer()
for i := 0; i < b.N; i++ {
w.Record(vals[i&(len(vals)-1)])
}
}

func BenchmarkWindowedRecordContention(b *testing.B) {
for _, numGoroutines := range []int{50, 100} {
b.Run(fmt.Sprintf("goroutines=%d", numGoroutines), func(b *testing.B) {
w := NewWindowed(
Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound},
10*time.Second,
)
b.SetParallelism(numGoroutines)
b.RunParallel(func(pb *testing.PB) {
rng := rand.New(rand.NewSource(rand.Int63()))
vals := makeInt64Values(rng, 1<<12)
i := 0
for pb.Next() {
w.Record(vals[i&(len(vals)-1)])
i++
}
})
})
}
}

func BenchmarkWindowedSnapshot(b *testing.B) {
w := NewWindowed(
Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound},
10*time.Second,
)
rng := rand.New(rand.NewSource(42))
for i := 0; i < 10000; i++ {
w.Record(int64(rng.Float64()*benchRange + benchLo))
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = w.Snapshot()
}
}

func BenchmarkWindowedWindowedSnapshot(b *testing.B) {
w := NewWindowed(
Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound},
10*time.Second,
)
rng := rand.New(rand.NewSource(42))
for i := 0; i < 10000; i++ {
w.Record(int64(rng.Float64()*benchRange + benchLo))
}
// Tick once so prevBaseline is non-empty.
w.Tick()
for i := 0; i < 5000; i++ {
w.Record(int64(rng.Float64()*benchRange + benchLo))
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = w.WindowedSnapshot()
}
}

func BenchmarkWindowedTick(b *testing.B) {
w := NewWindowed(
Params{Lo: benchLo, Hi: benchHi, ErrorBound: benchErrBound},
time.Hour, // long interval so maybeTick doesn't interfere
)
rng := rand.New(rand.NewSource(42))
for i := 0; i < 10000; i++ {
w.Record(int64(rng.Float64()*benchRange + benchLo))
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
w.Tick()
}
}
52 changes: 52 additions & 0 deletions windowed_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2026 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0

package goodhistogram

import (
"github.com/prometheus/client_golang/prometheus"
prometheusgo "github.com/prometheus/client_model/go"
)

// WindowedCollector wraps a Windowed histogram as a prometheus.Collector,
// allowing it to be registered with a Prometheus registry. Collect() exports
// cumulative data, as Prometheus expects monotonically increasing counters.
type WindowedCollector struct {
w *Windowed
desc *prometheus.Desc
}

// ToPrometheusCollector returns a WindowedCollector that exposes this
// windowed histogram to a Prometheus registry. The exported data is
// cumulative (all-time), not windowed, since Prometheus expects monotonic
// counters.
func (w *Windowed) ToPrometheusCollector(desc *prometheus.Desc) *WindowedCollector {
return &WindowedCollector{w: w, desc: desc}
}

// Describe implements prometheus.Collector.
func (c *WindowedCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.desc
}

// Collect implements prometheus.Collector.
func (c *WindowedCollector) Collect(ch chan<- prometheus.Metric) {
ch <- c
}

// Desc implements prometheus.Metric.
func (c *WindowedCollector) Desc() *prometheus.Desc {
return c.desc
}

// Write implements prometheus.Metric.
func (c *WindowedCollector) Write(m *prometheusgo.Metric) error {
snap := c.w.Snapshot()
m.Histogram = snap.ToPrometheusHistogram()
return nil
}
Loading