Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion evmrpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,18 @@ type Config struct {
// SS-pebble. Requires MemiavlOnly write mode; falls back transparently.
TraceBakeUseSnapshot bool `mapstructure:"trace_bake_use_snapshot"`
TraceBakeSnapshotWindow int64 `mapstructure:"trace_bake_snapshot_window"` // recent snapshots to keep (default 64)

// RateLimitingEnabled is a temporary Phase-1 rollout gate for the RateLimiterRegistry.
// Set to false to pass all requests through without rate limiting.
// Will be removed in Phase 3 once the feature has stabilised in production.
RateLimitingEnabled bool `mapstructure:"rate_limiting_enabled"`

// IPRateLimitRPS is the per-IP sustained request rate in requests/second.
// Zero disables per-IP rate limiting (all requests pass through).
IPRateLimitRPS float64 `mapstructure:"ip_rate_limit_rps"`

// IPRateLimitBurst is the maximum per-IP burst size.
IPRateLimitBurst int `mapstructure:"ip_rate_limit_burst"`
}

var DefaultConfig = Config{
Expand Down Expand Up @@ -200,6 +212,9 @@ var DefaultConfig = Config{
TraceBakeWindowBlocks: 0,
TraceBakeUseSnapshot: false,
TraceBakeSnapshotWindow: 64,
RateLimitingEnabled: true,
IPRateLimitRPS: 200,
IPRateLimitBurst: 400,
}

const (
Expand Down Expand Up @@ -240,6 +255,9 @@ const (
flagTraceBakeWindowBlocks = "evm.trace_bake_window_blocks"
flagTraceBakeUseSnapshot = "evm.trace_bake_use_snapshot"
flagTraceBakeSnapshotWindow = "evm.trace_bake_snapshot_window"
flagRateLimitingEnabled = "evm.rate_limiting_enabled"
flagIPRateLimitRPS = "evm.ip_rate_limit_rps"
flagIPRateLimitBurst = "evm.ip_rate_limit_burst"
)

func ReadConfig(opts servertypes.AppOptions) (Config, error) {
Expand Down Expand Up @@ -430,7 +448,21 @@ func ReadConfig(opts servertypes.AppOptions) (Config, error) {
return cfg, err
}
}

if v := opts.Get(flagRateLimitingEnabled); v != nil {
if cfg.RateLimitingEnabled, err = cast.ToBoolE(v); err != nil {
return cfg, err
}
}
if v := opts.Get(flagIPRateLimitRPS); v != nil {
if cfg.IPRateLimitRPS, err = cast.ToFloat64E(v); err != nil {
return cfg, err
}
}
if v := opts.Get(flagIPRateLimitBurst); v != nil {
if cfg.IPRateLimitBurst, err = cast.ToIntE(v); err != nil {
return cfg, err
}
}
return cfg, nil
}

Expand Down Expand Up @@ -618,4 +650,17 @@ trace_bake_use_snapshot = {{ .EVM.TraceBakeUseSnapshot }}

# Number of recent memiavl snapshots to retain for trace baking.
trace_bake_snapshot_window = {{ .EVM.TraceBakeSnapshotWindow }}

# rate_limiting_enabled is a temporary Phase-1 rollout gate for the per-IP RateLimiterRegistry.
# Set to false to disable rate limiting entirely without tuning individual RPS values.
# This flag will be removed in Phase 3 once the feature has stabilised in production.
rate_limiting_enabled = {{ .EVM.RateLimitingEnabled }}

# ip_rate_limit_rps is the per-IP sustained request rate in requests/second.
# Set to 0 to disable per-IP rate limiting (all requests pass through).
ip_rate_limit_rps = {{ .EVM.IPRateLimitRPS }}

# ip_rate_limit_burst is the maximum per-IP burst above the sustained rate.
ip_rate_limit_burst = {{ .EVM.IPRateLimitBurst }}

`
32 changes: 32 additions & 0 deletions evmrpc/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type opts struct {
rpcStatsInterval interface{}
workerPoolSize interface{}
workerQueueSize interface{}
rateLimitingEnabled interface{}
ipRateLimitRPS interface{}
ipRateLimitBurst interface{}
}

func (o *opts) Get(k string) interface{} {
Expand Down Expand Up @@ -144,6 +147,15 @@ func (o *opts) Get(k string) interface{} {
k == "evm.trace_bake_snapshot_window" {
return nil
}
if k == "evm.rate_limiting_enabled" {
return o.rateLimitingEnabled
}
if k == "evm.ip_rate_limit_rps" {
return o.ipRateLimitRPS
}
if k == "evm.ip_rate_limit_burst" {
return o.ipRateLimitBurst
}
panic("unknown key")
}

Expand Down Expand Up @@ -180,6 +192,9 @@ func getDefaultOpts() opts {
10 * time.Second,
32,
1000,
true,
200.0,
400,
}
}

Expand Down Expand Up @@ -294,6 +309,23 @@ func TestReadConfig(t *testing.T) {
badOpts.workerQueueSize = "bad"
_, err = config.ReadConfig(&badOpts)
require.NotNil(t, err)

// Test bad types for rate limit config
badOpts = goodOpts
badOpts.rateLimitingEnabled = "bad"
_, err = config.ReadConfig(&badOpts)
require.NotNil(t, err)

badOpts = goodOpts
badOpts.ipRateLimitRPS = "bad"
_, err = config.ReadConfig(&badOpts)
require.NotNil(t, err)

badOpts = goodOpts
badOpts.ipRateLimitBurst = "bad"
_, err = config.ReadConfig(&badOpts)
require.NotNil(t, err)

}

// Test worker pool configuration values
Expand Down
27 changes: 27 additions & 0 deletions ratelimiter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ratelimiter

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

var (
registryMeter = otel.Meter("ratelimiter")

registryMetrics = struct {
rejectedCounter metric.Int64Counter
}{
rejectedCounter: must(registryMeter.Int64Counter(
"rpc_rate_limit_rejected_total",
metric.WithDescription("Total RPC requests rejected by the per-IP rate limiter"),
metric.WithUnit("{request}"),
)),
}
)

func must[V any](v V, err error) V {
if err != nil {
panic(err)
}
return v
}
197 changes: 197 additions & 0 deletions ratelimiter/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package ratelimiter

import (
"context"
"net"
"net/http"
"strings"
"time"

"github.com/hashicorp/golang-lru/v2/expirable"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"golang.org/x/time/rate"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)

const (
DefaultRPS = 200.0
DefaultBurst = 400

// lruSize bounds memory to ~8 MB at 50k entries (~160 bytes each).
lruSize = 50_000
// lruTTL evicts IP entries that have been idle for 1 hour.
lruTTL = time.Hour
)

// DefaultTrustedProxyCIDRs contains RFC-1918 ranges and loopback addresses.
// Requests arriving from these CIDRs are trusted to supply a valid X-Forwarded-For header.
var DefaultTrustedProxyCIDRs = []string{
"127.0.0.0/8",
"::1/128",
"10.0.0.0/8",
"172.16.0.0/12",
"192.168.0.0/16",
"fc00::/7",
}
Comment thread
cursor[bot] marked this conversation as resolved.

// Config holds the configuration for a Registry
type Config struct {
// Enabled is a temporary rollout gate (Phase 1 only). False passes all requests through.
Enabled bool
// RPS is the sustained request rate allowed per IP in requests/second.
// Zero disables per-IP rate limiting (all requests pass).
RPS float64
// Burst is the maximum number of requests allowed in a single burst.
// Zero disables per-IP rate limiting (all requests pass).
Burst int
// TrustedProxyCIDRs lists CIDRs whose X-Forwarded-For headers are trusted.
// Empty means trust no proxy; use RemoteAddr / peer address directly.
TrustedProxyCIDRs []string
}

var DefaultConfig = Config{
Enabled: true,
RPS: DefaultRPS,
Burst: DefaultBurst,
TrustedProxyCIDRs: DefaultTrustedProxyCIDRs,
}

// Registry is a per-IP token-bucket rate limiter backed by an expirable LRU.
// It is safe for concurrent use.
type Registry struct {
cfg Config
trustedProxies []*net.IPNet
lru *expirable.LRU[string, *rate.Limiter]
}

// New creates a Registry from cfg. Invalid CIDRs in TrustedProxyCIDRs are silently skipped.
func New(cfg Config) *Registry {
return &Registry{
cfg: cfg,
trustedProxies: parseCIDRs(cfg.TrustedProxyCIDRs),
lru: expirable.NewLRU[string, *rate.Limiter](lruSize, nil, lruTTL),
}
}

// Allow reports whether the request from ip should be allowed for the given plane.
// Rejections increment rpc_rate_limit_rejected_total{plane}.
func (r *Registry) Allow(ctx context.Context, ip, plane string) bool {
if !r.cfg.Enabled || r.cfg.RPS <= 0 || r.cfg.Burst <= 0 {
return true
}
if r.getOrCreate(ip).Allow() {
return true
}
registryMetrics.rejectedCounter.Add(
ctx,
1,
metric.WithAttributes(
attribute.String("plane", plane),
),
)
Comment thread
amir-deris marked this conversation as resolved.
return false
}
Comment thread
cursor[bot] marked this conversation as resolved.

// IPFromHTTPRequest extracts the client IP from an HTTP request.
// If RemoteAddr belongs to a trusted proxy CIDR, the rightmost untrusted X-Forwarded-For
// entry is used. Walking right-to-left and skipping trusted CIDRs prevents a client from
// spoofing their IP by pre-setting X-Forwarded-For before the request reaches the proxy.
func (r *Registry) IPFromHTTPRequest(req *http.Request) string {
remoteIP := stripPort(req.RemoteAddr)
if r.isTrustedProxy(remoteIP) {
if xff := strings.Join(req.Header.Values("X-Forwarded-For"), ", "); xff != "" {
if ip := r.rightmostUntrustedIP(xff); ip != "" {
return ip
}
}
}
return remoteIP
}
Comment thread
amir-deris marked this conversation as resolved.

// IPFromGRPCContext extracts the client IP from a gRPC request context.
// If the transport peer belongs to a trusted proxy CIDR, the rightmost untrusted
// x-forwarded-for metadata entry is used.
func (r *Registry) IPFromGRPCContext(ctx context.Context) string {
peerIP := grpcPeerIP(ctx)
if peerIP != "" && r.isTrustedProxy(peerIP) {
if md, ok := metadata.FromIncomingContext(ctx); ok {
if vals := md.Get("x-forwarded-for"); len(vals) > 0 {
if ip := r.rightmostUntrustedIP(strings.Join(vals, ", ")); ip != "" {
return ip
}
}
}
}
return peerIP
}

// rightmostUntrustedIP walks the comma-separated XFF list from right to left and returns
// the first IP that is not in TrustedProxyCIDRs. This is the real client IP: proxies
// append their view of the source address, so the rightmost untrusted entry cannot be
// forged by the client.
func (r *Registry) rightmostUntrustedIP(xff string) string {
parts := strings.Split(xff, ",")
for i := len(parts) - 1; i >= 0; i-- {
candidate := strings.TrimSpace(parts[i])
if net.ParseIP(candidate) == nil {
continue
}
if !r.isTrustedProxy(candidate) {
return candidate
}
}
return ""
}
Comment thread
amir-deris marked this conversation as resolved.

// getOrCreate returns the existing limiter for ip or creates a fresh one.
// Add is called on every hit to refresh the TTL, ensuring only truly idle IPs expire.
func (r *Registry) getOrCreate(ip string) *rate.Limiter {
if l, ok := r.lru.Get(ip); ok {
r.lru.Add(ip, l)
return l
}
l := rate.NewLimiter(rate.Limit(r.cfg.RPS), r.cfg.Burst)
r.lru.Add(ip, l)
return l
}
Comment thread
cursor[bot] marked this conversation as resolved.

func (r *Registry) isTrustedProxy(ip string) bool {
parsed := net.ParseIP(ip)
if parsed == nil {
return false
}
for _, n := range r.trustedProxies {
if n.Contains(parsed) {
return true
}
}
return false
}

func parseCIDRs(cidrs []string) []*net.IPNet {
out := make([]*net.IPNet, 0, len(cidrs))
for _, cidr := range cidrs {
if _, network, err := net.ParseCIDR(cidr); err == nil {
out = append(out, network)
}
}
return out
}

func stripPort(addr string) string {
host, _, err := net.SplitHostPort(addr)
if err != nil {
return addr
}
return host
}

func grpcPeerIP(ctx context.Context) string {
p, ok := peer.FromContext(ctx)
if !ok || p.Addr == nil {
return ""
}
return stripPort(p.Addr.String())
}
Loading
Loading