Skip to content

Commit 98ea284

Browse files
committed
backendcluster: add cluster manager and cluster-scoped topology runtime
Introduce the backend cluster manager, cluster-scoped InfoSync runtime, topology aggregation, and single-cluster compatibility hooks.
1 parent b56b915 commit 98ea284

25 files changed

Lines changed: 792 additions & 90 deletions

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ When adding or modifying features, prefer extending existing packages before cre
4747
- `pkg/manager/config/` - Auto-reloads configuration files and provides interfaces to query them.
4848
- `pkg/manager/elect/` - Manages TiProxy owner elections (for example, metrics reader and VIP modules need an owner).
4949
- `pkg/manager/id/` - Generates global IDs.
50+
- `pkg/manager/backendcluster/` - Manages cluster-scoped backend runtimes and shared resources such as PD or etcd clients.
5051
- `pkg/manager/infosync/` - Queries the topology of TiDB and Prometheus from PD and updates TiProxy information to PD.
5152
- `pkg/manager/logger/` - Manages the logger service.
5253
- `pkg/manager/memory/` - Records heap and goroutine profiles when memory usage is high.

pkg/balance/observer/backend_fetcher.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,13 @@ func NewPDFetcher(tpFetcher TopologyFetcher, logger *zap.Logger, config *config.
4646
func (pf *PDFetcher) GetBackendList(ctx context.Context) (map[string]*BackendInfo, error) {
4747
backends := pf.fetchBackendList(ctx)
4848
infos := make(map[string]*BackendInfo, len(backends))
49-
for addr, backend := range backends {
50-
infos[addr] = &BackendInfo{
51-
Labels: backend.Labels,
52-
IP: backend.IP,
53-
StatusPort: backend.StatusPort,
49+
for key, backend := range backends {
50+
infos[key] = &BackendInfo{
51+
Addr: backend.Addr,
52+
ClusterName: backend.ClusterName,
53+
Labels: backend.Labels,
54+
IP: backend.IP,
55+
StatusPort: backend.StatusPort,
5456
}
5557
}
5658
return infos, nil
@@ -98,7 +100,7 @@ func (sf *StaticFetcher) GetBackendList(context.Context) (map[string]*BackendInf
98100
func backendListToMap(addrs []string) map[string]*BackendInfo {
99101
backends := make(map[string]*BackendInfo, len(addrs))
100102
for _, addr := range addrs {
101-
backends[addr] = &BackendInfo{}
103+
backends[addr] = &BackendInfo{Addr: addr}
102104
}
103105
return backends
104106
}

pkg/balance/observer/backend_fetcher_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func TestPDFetcher(t *testing.T) {
2626
{
2727
infos: map[string]*infosync.TiDBTopologyInfo{
2828
"1.1.1.1:4000": {
29+
Addr: "1.1.1.1:4000",
2930
Labels: map[string]string{"k1": "v1"},
3031
IP: "1.1.1.1",
3132
StatusPort: 10080,
@@ -34,6 +35,7 @@ func TestPDFetcher(t *testing.T) {
3435
check: func(m map[string]*BackendInfo) {
3536
require.Len(t, m, 1)
3637
require.NotNil(t, m["1.1.1.1:4000"])
38+
require.Equal(t, "1.1.1.1:4000", m["1.1.1.1:4000"].Addr)
3739
require.Equal(t, "1.1.1.1", m["1.1.1.1:4000"].IP)
3840
require.Equal(t, uint(10080), m["1.1.1.1:4000"].StatusPort)
3941
require.Equal(t, map[string]string{"k1": "v1"}, m["1.1.1.1:4000"].Labels)
@@ -42,24 +44,43 @@ func TestPDFetcher(t *testing.T) {
4244
{
4345
infos: map[string]*infosync.TiDBTopologyInfo{
4446
"1.1.1.1:4000": {
47+
Addr: "1.1.1.1:4000",
4548
IP: "1.1.1.1",
4649
StatusPort: 10080,
4750
},
4851
"2.2.2.2:4000": {
52+
Addr: "2.2.2.2:4000",
4953
IP: "2.2.2.2",
5054
StatusPort: 10080,
5155
},
5256
},
5357
check: func(m map[string]*BackendInfo) {
5458
require.Len(t, m, 2)
5559
require.NotNil(t, m["1.1.1.1:4000"])
60+
require.Equal(t, "1.1.1.1:4000", m["1.1.1.1:4000"].Addr)
5661
require.Equal(t, "1.1.1.1", m["1.1.1.1:4000"].IP)
5762
require.Equal(t, uint(10080), m["1.1.1.1:4000"].StatusPort)
5863
require.NotNil(t, m["2.2.2.2:4000"])
64+
require.Equal(t, "2.2.2.2:4000", m["2.2.2.2:4000"].Addr)
5965
require.Equal(t, "2.2.2.2", m["2.2.2.2:4000"].IP)
6066
require.Equal(t, uint(10080), m["2.2.2.2:4000"].StatusPort)
6167
},
6268
},
69+
{
70+
infos: map[string]*infosync.TiDBTopologyInfo{
71+
"cluster-a/shared.tidb:4000": {
72+
Addr: "shared.tidb:4000",
73+
IP: "10.0.0.1",
74+
StatusPort: 10080,
75+
},
76+
},
77+
check: func(m map[string]*BackendInfo) {
78+
require.Len(t, m, 1)
79+
require.NotNil(t, m["cluster-a/shared.tidb:4000"])
80+
require.Equal(t, "shared.tidb:4000", m["cluster-a/shared.tidb:4000"].Addr)
81+
require.Equal(t, "10.0.0.1", m["cluster-a/shared.tidb:4000"].IP)
82+
},
83+
},
6384
{
6485
ctx: func() context.Context {
6586
ctx, cancel := context.WithCancel(context.Background())

pkg/balance/observer/backend_health.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,17 @@ func (bh *BackendHealth) String() string {
7676

7777
// BackendInfo stores the status info of each backend.
7878
type BackendInfo struct {
79-
Labels map[string]string
80-
IP string
81-
StatusPort uint
79+
Addr string
80+
ClusterName string
81+
Labels map[string]string
82+
IP string
83+
StatusPort uint
8284
}
8385

8486
func (bi BackendInfo) Equals(other BackendInfo) bool {
85-
return bi.IP == other.IP &&
87+
return bi.Addr == other.Addr &&
88+
bi.ClusterName == other.ClusterName &&
89+
bi.IP == other.IP &&
8690
bi.StatusPort == other.StatusPort &&
8791
maps.Equal(bi.Labels, other.Labels)
8892
}

pkg/balance/observer/backend_health_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ func TestBackendHealthToString(t *testing.T) {
1515
{},
1616
{
1717
BackendInfo: BackendInfo{
18+
Addr: "127.0.0.1:4000",
1819
IP: "127.0.0.1",
1920
StatusPort: 1,
2021
Labels: map[string]string{"k1": "v1", "k2": "v2"},
@@ -45,13 +46,15 @@ func TestBackendHealthEquals(t *testing.T) {
4546
{
4647
a: BackendHealth{
4748
BackendInfo: BackendInfo{
49+
Addr: "127.0.0.1:4000",
4850
IP: "127.0.0.1",
4951
StatusPort: 1,
5052
Labels: map[string]string{"k1": "v1", "k2": "v2"},
5153
},
5254
},
5355
b: BackendHealth{
5456
BackendInfo: BackendInfo{
57+
Addr: "127.0.0.1:4000",
5558
IP: "127.0.0.1",
5659
StatusPort: 1,
5760
},
@@ -61,13 +64,15 @@ func TestBackendHealthEquals(t *testing.T) {
6164
{
6265
a: BackendHealth{
6366
BackendInfo: BackendInfo{
67+
Addr: "127.0.0.1:4000",
6468
IP: "127.0.0.1",
6569
StatusPort: 1,
6670
Labels: map[string]string{"k1": "v1", "k2": "v2"},
6771
},
6872
},
6973
b: BackendHealth{
7074
BackendInfo: BackendInfo{
75+
Addr: "127.0.0.1:4000",
7176
IP: "127.0.0.1",
7277
StatusPort: 1,
7378
Labels: map[string]string{"k1": "v1", "k2": "v2"},
@@ -78,6 +83,7 @@ func TestBackendHealthEquals(t *testing.T) {
7883
{
7984
a: BackendHealth{
8085
BackendInfo: BackendInfo{
86+
Addr: "127.0.0.1:4000",
8187
IP: "127.0.0.1",
8288
StatusPort: 1,
8389
Labels: map[string]string{"k1": "v1", "k2": "v2"},

pkg/balance/observer/backend_observer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ func (ts *observerTestSuite) addBackend() (string, BackendInfo) {
279279
ts.backendIdx++
280280
addr := fmt.Sprintf("%d", ts.backendIdx)
281281
info := &BackendInfo{
282+
Addr: addr,
282283
IP: "127.0.0.1",
283284
StatusPort: uint(ts.backendIdx),
284285
}

pkg/balance/observer/health_check.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
// HealthCheck is used to check the backends of one backend. One can pass a customized health check function to the observer.
2323
type HealthCheck interface {
24-
Check(ctx context.Context, addr string, info *BackendInfo, lastHealth *BackendHealth) *BackendHealth
24+
Check(ctx context.Context, backendID string, info *BackendInfo, lastHealth *BackendHealth) *BackendHealth
2525
}
2626

2727
const (
@@ -62,7 +62,7 @@ func NewDefaultHealthCheck(httpCli *http.Client, cfg *config.HealthCheck, logger
6262
}
6363
}
6464

65-
func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *BackendInfo, lastBh *BackendHealth) *BackendHealth {
65+
func (dhc *DefaultHealthCheck) Check(ctx context.Context, _ string, info *BackendInfo, lastBh *BackendHealth) *BackendHealth {
6666
bh := &BackendHealth{
6767
BackendInfo: *info,
6868
Healthy: true,
@@ -80,16 +80,22 @@ func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *Bac
8080
if !bh.Healthy {
8181
return bh
8282
}
83-
dhc.checkSqlPort(ctx, addr, bh)
83+
dhc.checkSqlPort(ctx, info, bh)
8484
if !bh.Healthy {
8585
return bh
8686
}
8787
dhc.queryConfig(ctx, info, bh, lastBh)
8888
return bh
8989
}
9090

91-
func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, addr string, bh *BackendHealth) {
91+
func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, info *BackendInfo, bh *BackendHealth) {
9292
// Also dial the SQL port just in case that the SQL port hangs.
93+
if info == nil || info.Addr == "" {
94+
bh.Healthy = false
95+
bh.PingErr = errors.New("backend address is empty")
96+
return
97+
}
98+
addr := info.Addr
9399
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(dhc.cfg.RetryInterval), uint64(dhc.cfg.MaxRetries)), ctx)
94100
err := http.ConnectWithRetry(func() error {
95101
startTime := time.Now()

pkg/balance/observer/health_check_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func newBackendServer(t *testing.T) (*backendServer, *BackendInfo) {
143143
backend.setSqlResp(true)
144144
backend.startSQLServer()
145145
return backend, &BackendInfo{
146+
Addr: backend.sqlAddr,
146147
IP: backend.ip,
147148
StatusPort: backend.statusPort,
148149
}

pkg/balance/observer/mock_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,11 @@ func newMockHealthCheck() *mockHealthCheck {
8282
}
8383
}
8484

85-
func (mhc *mockHealthCheck) Check(_ context.Context, addr string, info *BackendInfo, _ *BackendHealth) *BackendHealth {
85+
func (mhc *mockHealthCheck) Check(_ context.Context, backendID string, info *BackendInfo, _ *BackendHealth) *BackendHealth {
8686
mhc.Lock()
8787
defer mhc.Unlock()
88-
mhc.backends[addr].BackendInfo = *info
89-
return mhc.backends[addr]
88+
mhc.backends[backendID].BackendInfo = *info
89+
return mhc.backends[backendID]
9090
}
9191

9292
func (mhc *mockHealthCheck) setBackend(addr string, health *BackendHealth) {

pkg/balance/router/group.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -158,17 +158,17 @@ func (g *Group) RefreshCidr() {
158158
}
159159
}
160160

161-
func (g *Group) AddBackend(addr string, backend *backendWrapper) {
161+
func (g *Group) AddBackend(backendID string, backend *backendWrapper) {
162162
g.Lock()
163163
defer g.Unlock()
164-
g.backends[addr] = backend
164+
g.backends[backendID] = backend
165165
backend.group = g
166166
}
167167

168-
func (g *Group) RemoveBackend(addr string) {
168+
func (g *Group) RemoveBackend(backendID string) {
169169
g.Lock()
170170
defer g.Unlock()
171-
delete(g.backends, addr)
171+
delete(g.backends, backendID)
172172
}
173173

174174
func (g *Group) Empty() bool {
@@ -200,7 +200,7 @@ func (g *Group) Route(excluded []BackendInst) (policy.BackendCtx, error) {
200200
// Exclude the backends that are already tried.
201201
found := false
202202
for _, e := range excluded {
203-
if backend.Addr() == e.Addr() {
203+
if backend.ID() == e.ID() {
204204
found = true
205205
break
206206
}
@@ -273,7 +273,7 @@ func (g *Group) Balance(ctx context.Context) {
273273
func (g *Group) onCreateConn(backendInst BackendInst, conn RedirectableConn, succeed bool) {
274274
g.Lock()
275275
defer g.Unlock()
276-
backend := g.ensureBackend(backendInst.Addr())
276+
backend := g.ensureBackend(backendInst.ID())
277277
if succeed {
278278
connWrapper := &connWrapper{
279279
RedirectableConn: conn,
@@ -319,23 +319,24 @@ func (g *Group) RedirectConnections() error {
319319
return nil
320320
}
321321

322-
func (g *Group) ensureBackend(addr string) *backendWrapper {
323-
backend, ok := g.backends[addr]
322+
func (g *Group) ensureBackend(backendID string) *backendWrapper {
323+
backend, ok := g.backends[backendID]
324324
if ok {
325325
return backend
326326
}
327327
// The backend should always exist if it will be needed. Add a warning and add it back.
328-
g.lg.Warn("backend is not found in the router", zap.String("backend_addr", addr), zap.Stack("stack"))
329-
ip, _, _ := net.SplitHostPort(addr)
330-
backend = newBackendWrapper(addr, observer.BackendHealth{
328+
g.lg.Warn("backend is not found in the router", zap.String("backend_id", backendID), zap.Stack("stack"))
329+
ip, _, _ := net.SplitHostPort(backendID)
330+
backend = newBackendWrapper(backendID, observer.BackendHealth{
331331
BackendInfo: observer.BackendInfo{
332+
Addr: backendID,
332333
IP: ip,
333334
StatusPort: 10080, // impossible anyway
334335
},
335336
SupportRedirection: true,
336337
Healthy: false,
337338
})
338-
g.backends[addr] = backend
339+
g.backends[backendID] = backend
339340
return backend
340341
}
341342

0 commit comments

Comments
 (0)