Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ When adding or modifying features, prefer extending existing packages before cre
- `pkg/manager/config/` - Auto-reloads configuration files and provides interfaces to query them.
- `pkg/manager/elect/` - Manages TiProxy owner elections (for example, metrics reader and VIP modules need an owner).
- `pkg/manager/id/` - Generates global IDs.
- `pkg/manager/backendcluster/` - Manages cluster-scoped backend runtimes and shared resources such as PD or etcd clients.
- `pkg/manager/infosync/` - Queries the topology of TiDB and Prometheus from PD and updates TiProxy information to PD.
- `pkg/manager/logger/` - Manages the logger service.
- `pkg/manager/memory/` - Records heap and goroutine profiles when memory usage is high.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
go.uber.org/mock v0.5.2
go.uber.org/ratelimit v0.2.0
go.uber.org/zap v1.27.0
golang.org/x/net v0.48.0
google.golang.org/grpc v1.63.2
)

Expand Down Expand Up @@ -272,7 +273,6 @@ require (
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
golang.org/x/mod v0.31.0 // indirect
golang.org/x/net v0.48.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.40.0 // indirect
Expand Down
7 changes: 4 additions & 3 deletions lib/config/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package config
const (
// LocationLabelName indicates the label name that decides the location of TiProxy and backends.
// We use `zone` because the follower read in TiDB also uses `zone` to decide location.
LocationLabelName = "zone"
KeyspaceLabelName = "keyspace"
CidrLabelName = "cidr"
LocationLabelName = "zone"
KeyspaceLabelName = "keyspace"
CidrLabelName = "cidr"
TiProxyPortLabelName = "tiproxy-port"
)

func (cfg *Config) GetLocation() string {
Expand Down
4 changes: 3 additions & 1 deletion lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var (
ErrInvalidConfigValue = errors.New("invalid config value")
)

const DefaultBackendClusterName = "default"

type Config struct {
Proxy ProxyServer `yaml:"proxy,omitempty" toml:"proxy,omitempty" json:"proxy,omitempty"`
API API `yaml:"api,omitempty" toml:"api,omitempty" json:"api,omitempty"`
Expand Down Expand Up @@ -249,7 +251,7 @@ func (cfg *Config) GetBackendClusters() []BackendCluster {
return nil
}
return []BackendCluster{{
Name: "default",
Name: DefaultBackendClusterName,
PDAddrs: cfg.Proxy.PDAddrs,
}}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func TestGetBackendClusters(t *testing.T) {

clusters := cfg.GetBackendClusters()
require.Len(t, clusters, 1)
require.Equal(t, "default", clusters[0].Name)
require.Equal(t, DefaultBackendClusterName, clusters[0].Name)
require.Equal(t, cfg.Proxy.PDAddrs, clusters[0].PDAddrs)

cfg.Proxy.BackendClusters = []BackendCluster{
Expand Down
4 changes: 2 additions & 2 deletions pkg/balance/factor/factor_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type FactorBasedBalance struct {
factors []Factor
// to reduce memory allocation
cachedList []scoredBackend
mr metricsreader.MetricsReader
mr metricsreader.MetricsQuerier
lg *zap.Logger
factorStatus *FactorStatus
factorLabel *FactorLabel
Expand All @@ -44,7 +44,7 @@ type FactorBasedBalance struct {
routePolicy string
}

func NewFactorBasedBalance(lg *zap.Logger, mr metricsreader.MetricsReader) *FactorBasedBalance {
func NewFactorBasedBalance(lg *zap.Logger, mr metricsreader.MetricsQuerier) *FactorBasedBalance {
return &FactorBasedBalance{
lg: lg,
mr: mr,
Expand Down
4 changes: 2 additions & 2 deletions pkg/balance/factor/factor_cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ type FactorCPU struct {
lastMetricTime time.Time
// The estimated average CPU usage used by one connection.
usagePerConn float64
mr metricsreader.MetricsReader
mr metricsreader.MetricsQuerier
bitNum int
migrationsPerSecond float64
lg *zap.Logger
}

func NewFactorCPU(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorCPU {
func NewFactorCPU(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorCPU {
fc := &FactorCPU{
mr: mr,
bitNum: 5,
Expand Down
6 changes: 3 additions & 3 deletions pkg/balance/factor/factor_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,13 @@ type errIndicator struct {
type FactorHealth struct {
snapshot map[string]healthBackendSnapshot
indicators []errIndicator
mr metricsreader.MetricsReader
mr metricsreader.MetricsQuerier
bitNum int
migrationsPerSecond float64
lg *zap.Logger
}

func NewFactorHealth(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorHealth {
func NewFactorHealth(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorHealth {
return &FactorHealth{
mr: mr,
snapshot: make(map[string]healthBackendSnapshot),
Expand All @@ -203,7 +203,7 @@ func NewFactorHealth(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorHeal
}
}

func initErrIndicator(mr metricsreader.MetricsReader) []errIndicator {
func initErrIndicator(mr metricsreader.MetricsQuerier) []errIndicator {
indicators := make([]errIndicator, 0, len(errDefinitions))
for _, def := range errDefinitions {
indicator := errIndicator{
Expand Down
4 changes: 2 additions & 2 deletions pkg/balance/factor/factor_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ type FactorMemory struct {
snapshot map[string]memBackendSnapshot
// The updated time of the metric that we've read last time.
lastMetricTime time.Time
mr metricsreader.MetricsReader
mr metricsreader.MetricsQuerier
bitNum int
migrationsPerSecond float64
lg *zap.Logger
}

func NewFactorMemory(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorMemory {
func NewFactorMemory(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorMemory {
bitNum := 0
for levels := len(oomRiskLevels); ; bitNum++ {
if levels == 0 {
Expand Down
46 changes: 34 additions & 12 deletions pkg/balance/metricsreader/backend_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"math"
"net"
"net/url"
"slices"
"strconv"
"strings"
Expand All @@ -35,18 +36,15 @@ import (

const (
// readerOwnerKeyPrefix is the key prefix in etcd for backend reader owner election.
// For global owner, the key is "/tiproxy/metric_reader/owner".
// For zonal owner, the key is "/tiproxy/metric_reader/{zone}/owner".
readerOwnerKeyPrefix = "/tiproxy/metric_reader"
// For the default cluster, the key is "/tiproxy/metric_reader/owner".
// For a named cluster, the key is "/tiproxy/metric_reader/{cluster}/owner".
readerOwnerKeySuffix = "owner"
// sessionTTL is the session's TTL in seconds for backend reader owner election.
sessionTTL = 15
// backendMetricPath is the path of backend HTTP API to read metrics.
backendMetricPath = "/metrics"
// ownerMetricPath is the path of reading backend metrics from the backend reader owner.
ownerMetricPath = "/api/backend/metrics"
goPoolSize = 100
goMaxIdle = time.Minute
goPoolSize = 100
goMaxIdle = time.Minute
)

var (
Expand All @@ -72,6 +70,7 @@ type BackendReader struct {
marshalledHistory []byte
cfgGetter config.ConfigGetter
backendFetcher TopologyFetcher
clusterName string
lastZone string
electionCfg elect.ElectionConfig
election elect.Election
Expand All @@ -84,6 +83,11 @@ type BackendReader struct {
}

func NewBackendReader(lg *zap.Logger, cfgGetter config.ConfigGetter, httpCli *http.Client, etcdCli *clientv3.Client,
backendFetcher TopologyFetcher, cfg *config.HealthCheck) *BackendReader {
return NewClusterBackendReader(lg, "", cfgGetter, httpCli, etcdCli, backendFetcher, cfg)
}

func NewClusterBackendReader(lg *zap.Logger, clusterName string, cfgGetter config.ConfigGetter, httpCli *http.Client, etcdCli *clientv3.Client,
backendFetcher TopologyFetcher, cfg *config.HealthCheck) *BackendReader {
return &BackendReader{
queryRules: make(map[string]QueryRule),
Expand All @@ -92,6 +96,7 @@ func NewBackendReader(lg *zap.Logger, cfgGetter config.ConfigGetter, httpCli *ht
lg: lg,
cfgGetter: cfgGetter,
backendFetcher: backendFetcher,
clusterName: strings.TrimSpace(clusterName),
cfg: cfg,
wgp: waitgroup.NewWaitGroupPool(goPoolSize, goMaxIdle),
electionCfg: elect.DefaultElectionConfig(sessionTTL),
Expand All @@ -118,9 +123,9 @@ func (br *BackendReader) initElection(ctx context.Context, cfg *config.Config) e
br.lastZone = cfg.GetLocation()
if len(br.lastZone) > 0 {
// Zonal owners are responsible for the backends in the same zone or not in any TiProxy zone.
key = fmt.Sprintf("%s/%s/%s", readerOwnerKeyPrefix, br.lastZone, readerOwnerKeySuffix)
key = fmt.Sprintf("%s/%s/%s", readerOwnerKeyPrefix(br.clusterName), br.lastZone, readerOwnerKeySuffix)
} else {
key = fmt.Sprintf("%s/%s", readerOwnerKeyPrefix, readerOwnerKeySuffix)
key = fmt.Sprintf("%s/%s", readerOwnerKeyPrefix(br.clusterName), readerOwnerKeySuffix)
}
br.election = elect.NewElection(br.lg.Named("elect"), br.etcdCli, br.electionCfg, id, key, br)
br.election.Start(ctx)
Expand Down Expand Up @@ -213,7 +218,8 @@ func (br *BackendReader) queryAllOwners(ctx context.Context) (zones, owners []st
// Get all owner keys.
opts := []clientv3.OpOption{clientv3.WithPrefix()}
var kvs []*mvccpb.KeyValue
kvs, err = etcd.GetKVs(ctx, br.etcdCli, readerOwnerKeyPrefix, opts, br.electionCfg.Timeout, br.electionCfg.RetryIntvl, br.electionCfg.RetryCnt)
keyPrefix := readerOwnerKeyPrefix(br.clusterName)
kvs, err = etcd.GetKVs(ctx, br.etcdCli, keyPrefix, opts, br.electionCfg.Timeout, br.electionCfg.RetryIntvl, br.electionCfg.RetryCnt)
if err != nil {
return
}
Expand All @@ -227,7 +233,7 @@ func (br *BackendReader) queryAllOwners(ctx context.Context) (zones, owners []st
ownerMap := make(map[string]ownerInfo)
for _, kv := range kvs {
key := hack.String(kv.Key)
key = key[len(readerOwnerKeyPrefix):]
key = key[len(keyPrefix):]
if len(key) == 0 || key[0] != '/' {
continue
}
Expand Down Expand Up @@ -466,7 +472,7 @@ func (br *BackendReader) GetBackendMetrics() []byte {
// If every member queries directly from backends, the backends may suffer from too much pressure.
func (br *BackendReader) readFromOwner(ctx context.Context, ownerAddr string) error {
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(br.cfg.RetryInterval), uint64(br.cfg.MaxRetries)), ctx)
resp, err := br.httpCli.Get(ownerAddr, ownerMetricPath, b, br.cfg.DialTimeout)
resp, err := br.httpCli.Get(ownerAddr, backendMetricOwnerPath(br.clusterName), b, br.cfg.DialTimeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -570,6 +576,22 @@ func (br *BackendReader) Close() {
}
}

func readerOwnerKeyPrefix(clusterName string) string {
clusterName = strings.TrimSpace(clusterName)
if clusterName == "" || clusterName == config.DefaultBackendClusterName {
return "/tiproxy/metric_reader"
}
return fmt.Sprintf("/tiproxy/metric_reader/%s", clusterName)
}

func backendMetricOwnerPath(clusterName string) string {
clusterName = strings.TrimSpace(clusterName)
if clusterName == "" {
return "/api/backend/metrics"
}
return fmt.Sprintf("/api/backend/metrics?cluster=%s", url.QueryEscape(clusterName))
}

func purgeHistory(history []model.SamplePair, retention time.Duration, now time.Time) []model.SamplePair {
idx := -1
for i := range history {
Expand Down
18 changes: 12 additions & 6 deletions pkg/balance/metricsreader/backend_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ func TestQueryAllOwners(t *testing.T) {
br := NewBackendReader(lg, nil, nil, suite.client, nil, nil)
for i, test := range tests {
for i, key := range test.keys {
key = fmt.Sprintf("%s%s", readerOwnerKeyPrefix, key)
key = fmt.Sprintf("%s%s", readerOwnerKeyPrefix(""), key)
suite.putKV(key, test.values[i])
}
zones, owners, err := br.queryAllOwners(context.Background())
Expand All @@ -1166,7 +1166,7 @@ func TestQueryAllOwners(t *testing.T) {
slices.Sort(zones)
require.Equal(t, test.zones, zones, "case %d", i)
}
suite.delKV(readerOwnerKeyPrefix)
suite.delKV(readerOwnerKeyPrefix(""))
}
}

Expand All @@ -1184,15 +1184,15 @@ func TestUpdateLabel(t *testing.T) {
defer br.Close()

checkKeyPrefix := func(prefix string) bool {
kvs := suite.getKV(readerOwnerKeyPrefix)
kvs := suite.getKV(readerOwnerKeyPrefix(""))
if len(kvs) != 1 {
return false
}
return strings.HasPrefix(string(kvs[0].Key), prefix)
}

// campaign for the global owner
prefix := fmt.Sprintf("%s/%s", readerOwnerKeyPrefix, readerOwnerKeySuffix)
prefix := fmt.Sprintf("%s/%s", readerOwnerKeyPrefix(""), readerOwnerKeySuffix)
require.Eventually(t, func() bool {
return checkKeyPrefix(prefix)
}, 3*time.Second, 10*time.Millisecond)
Expand All @@ -1201,7 +1201,7 @@ func TestUpdateLabel(t *testing.T) {
cfg.Labels = map[string]string{config.LocationLabelName: "east"}
err = br.ReadMetrics(context.Background())
require.NoError(t, err)
prefix = fmt.Sprintf("%s/east/%s", readerOwnerKeyPrefix, readerOwnerKeySuffix)
prefix = fmt.Sprintf("%s/east/%s", readerOwnerKeyPrefix(""), readerOwnerKeySuffix)
require.Eventually(t, func() bool {
return checkKeyPrefix(prefix)
}, 3*time.Second, 10*time.Millisecond)
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func TestElection(t *testing.T) {
// setup etcd
suite := newEtcdTestSuite(t)
t.Cleanup(suite.close)
ownerKey := fmt.Sprintf("%s/%s", readerOwnerKeyPrefix, readerOwnerKeySuffix)
ownerKey := fmt.Sprintf("%s/%s", readerOwnerKeyPrefix(""), readerOwnerKeySuffix)
suite.putKV(ownerKey, addr)
require.Eventually(t, func() bool {
kvs := suite.getKV(ownerKey)
Expand Down Expand Up @@ -1330,3 +1330,9 @@ func setupTypicalBackendListener(t *testing.T, respBody string) (backendPort int
t.Cleanup(backendHttpHandler.Close)
return
}

func TestBackendMetricOwnerPath(t *testing.T) {
require.Equal(t, "/api/backend/metrics", backendMetricOwnerPath(""))
require.Equal(t, "/api/backend/metrics?cluster=cluster-a", backendMetricOwnerPath("cluster-a"))
require.Equal(t, "/api/backend/metrics?cluster=cluster+a%2Fb", backendMetricOwnerPath("cluster a/b"))
}
Loading