Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions p2p/host/resource-manager/limits_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package rcmgr

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

func TestReportSystemLimits(t *testing.T) {
// Register the metrics
reg := prometheus.NewRegistry()
reg.MustRegister(limits)

// Create a simple limiter with known limits
limiter := NewFixedLimiter(ConcreteLimitConfig{
system: BaseLimit{
Memory: 1024 * 1024 * 1024, // 1GB
FD: 256,
Conns: 100,
ConnsInbound: 50,
ConnsOutbound: 50,
Streams: 200,
StreamsInbound: 100,
StreamsOutbound: 100,
},
transient: BaseLimit{
Memory: 512 * 1024 * 1024, // 512MB
FD: 128,
Conns: 50,
ConnsInbound: 25,
ConnsOutbound: 25,
Streams: 100,
StreamsInbound: 50,
StreamsOutbound: 50,
},
})

// Create a stats reporter
reporter, err := NewStatsTraceReporter()
if err != nil {
t.Fatal(err)
}

// Report the limits
reporter.ReportSystemLimits(limiter)

// Verify that metrics were set
metrics, err := reg.Gather()
if err != nil {
t.Fatal(err)
}

// Find the limits metric
var limitsMetric *dto.MetricFamily
for _, m := range metrics {
if m.GetName() == "libp2p_rcmgr_limit" {
limitsMetric = m
break
}
}

if limitsMetric == nil {
t.Fatal("limits metric not found")
}

// Verify we have metrics for both system and transient scopes
foundSystem := false
foundTransient := false
for _, metric := range limitsMetric.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "scope" {
if label.GetValue() == "system" {
foundSystem = true
}
if label.GetValue() == "transient" {
foundTransient = true
}
}
}
}

if !foundSystem {
t.Error("system scope limits not reported")
}
if !foundTransient {
t.Error("transient scope limits not reported")
}

// Verify specific limit values
expectedLimits := map[string]map[string]float64{
"system": {
"memory": 1024 * 1024 * 1024,
"fd": 256,
"conns": 100,
"conns_inbound": 50,
"conns_outbound": 50,
"streams": 200,
"streams_inbound": 100,
"streams_outbound": 100,
},
"transient": {
"memory": 512 * 1024 * 1024,
"fd": 128,
"conns": 50,
"conns_inbound": 25,
"conns_outbound": 25,
"streams": 100,
"streams_inbound": 50,
"streams_outbound": 50,
},
}

for _, metric := range limitsMetric.GetMetric() {
var scope, resource string
for _, label := range metric.GetLabel() {
if label.GetName() == "scope" {
scope = label.GetValue()
}
if label.GetName() == "resource" {
resource = label.GetValue()
}
}

if scope == "" || resource == "" {
continue
}

expectedValue, ok := expectedLimits[scope][resource]
if !ok {
continue
}

actualValue := metric.GetGauge().GetValue()
if actualValue != expectedValue {
t.Errorf("limit mismatch for %s/%s: expected %v, got %v", scope, resource, expectedValue, actualValue)
}
}
}

func TestReportSystemLimitsNilLimiter(t *testing.T) {
reporter, err := NewStatsTraceReporter()
if err != nil {
t.Fatal(err)
}

// Should not panic with nil limiter
reporter.ReportSystemLimits(nil)
}
7 changes: 5 additions & 2 deletions p2p/host/resource-manager/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,20 @@ func NewResourceManager(limits Limiter, opts ...Option) (network.ResourceManager
r.verifySourceAddressRateLimiter = newVerifySourceAddressRateLimiter(r.connLimiter)

if !r.disableMetrics {
var sr TraceReporter
sr, err := NewStatsTraceReporter()
if err != nil {
log.Error("failed to initialise StatsTraceReporter", "err", err)
} else {
// Report system limits to Prometheus
sr.ReportSystemLimits(limits)

if r.trace == nil {
r.trace = &trace{}
}
found := false
for _, rep := range r.trace.reporters {
if rep == sr {
// Compare the actual reporter, not the interface
if _, ok := rep.(StatsTraceReporter); ok {
found = true
break
}
Expand Down
48 changes: 47 additions & 1 deletion p2p/host/resource-manager/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rcmgr
import (
"strings"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/p2p/metricshelper"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -123,6 +124,13 @@ var (
Name: "blocked_resources",
Help: "Number of blocked resources",
}, []string{"dir", "scope", "resource"})

// System limits
limits = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "limit",
Help: "Resource manager limits",
}, []string{"scope", "resource"})
)

var (
Expand Down Expand Up @@ -157,6 +165,7 @@ func MustRegisterWith(reg prometheus.Registerer) {
previousConnMemory,
fds,
blockedResources,
limits,
)
}

Expand All @@ -171,10 +180,47 @@ func WithMetricsDisabled() Option {
type StatsTraceReporter struct{}

func NewStatsTraceReporter() (StatsTraceReporter, error) {
// TODO tell prometheus the system limits
return StatsTraceReporter{}, nil
}

// reportLimit reports a limit value to Prometheus
func reportLimit(scope, resource string, value int64) {
limits.With(prometheus.Labels{
"scope": scope,
"resource": resource,
}).Set(float64(value))
}

// ReportSystemLimits reports the system limits to Prometheus.
// This should be called after creating the StatsTraceReporter with the resource manager's limits.
func (r StatsTraceReporter) ReportSystemLimits(limiter Limiter) {
if limiter == nil {
return
}

// System limits
systemLimits := limiter.GetSystemLimits()
reportLimit("system", "memory", systemLimits.GetMemoryLimit())
reportLimit("system", "fd", int64(systemLimits.GetFDLimit()))
reportLimit("system", "conns", int64(systemLimits.GetConnTotalLimit()))
reportLimit("system", "conns_inbound", int64(systemLimits.GetConnLimit(network.DirInbound)))
reportLimit("system", "conns_outbound", int64(systemLimits.GetConnLimit(network.DirOutbound)))
reportLimit("system", "streams", int64(systemLimits.GetStreamTotalLimit()))
reportLimit("system", "streams_inbound", int64(systemLimits.GetStreamLimit(network.DirInbound)))
reportLimit("system", "streams_outbound", int64(systemLimits.GetStreamLimit(network.DirOutbound)))

// Transient limits
transientLimits := limiter.GetTransientLimits()
reportLimit("transient", "memory", transientLimits.GetMemoryLimit())
reportLimit("transient", "fd", int64(transientLimits.GetFDLimit()))
reportLimit("transient", "conns", int64(transientLimits.GetConnTotalLimit()))
reportLimit("transient", "conns_inbound", int64(transientLimits.GetConnLimit(network.DirInbound)))
reportLimit("transient", "conns_outbound", int64(transientLimits.GetConnLimit(network.DirOutbound)))
reportLimit("transient", "streams", int64(transientLimits.GetStreamTotalLimit()))
reportLimit("transient", "streams_inbound", int64(transientLimits.GetStreamLimit(network.DirInbound)))
reportLimit("transient", "streams_outbound", int64(transientLimits.GetStreamLimit(network.DirOutbound)))
}

func (r StatsTraceReporter) ConsumeEvent(evt TraceEvt) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
Expand Down