-
Notifications
You must be signed in to change notification settings - Fork 37
config: add backend cluster schema and multi-port listener config #1103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,8 @@ import ( | |
| "net" | ||
| "os" | ||
| "path/filepath" | ||
| "slices" | ||
| "strconv" | ||
| "strings" | ||
| "time" | ||
|
|
||
|
|
@@ -63,15 +65,25 @@ type ProxyServerOnline struct { | |
| GracefulCloseConnTimeout int `yaml:"graceful-close-conn-timeout,omitempty" toml:"graceful-close-conn-timeout,omitempty" json:"graceful-close-conn-timeout,omitempty" reloadable:"true"` | ||
| // Public and private traffic are metered separately. | ||
| PublicEndpoints []string `yaml:"public-endpoints,omitempty" toml:"public-endpoints,omitempty" json:"public-endpoints,omitempty" reloadable:"true"` | ||
| // BackendClusters represents multiple backend clusters that the proxy can route to. It can be reloaded | ||
| // online. | ||
| BackendClusters []BackendCluster `yaml:"backend-clusters,omitempty" toml:"backend-clusters,omitempty" json:"backend-clusters,omitempty" reloadable:"true"` | ||
| } | ||
|
|
||
| type ProxyServer struct { | ||
| Addr string `yaml:"addr,omitempty" toml:"addr,omitempty" json:"addr,omitempty" reloadable:"false"` | ||
| AdvertiseAddr string `yaml:"advertise-addr,omitempty" toml:"advertise-addr,omitempty" json:"advertise-addr,omitempty" reloadable:"false"` | ||
| PDAddrs string `yaml:"pd-addrs,omitempty" toml:"pd-addrs,omitempty" json:"pd-addrs,omitempty" reloadable:"false"` | ||
| PortRange []int `yaml:"port-range,omitempty" toml:"port-range,omitempty" json:"port-range,omitempty" reloadable:"false"` | ||
| ProxyServerOnline `yaml:",inline" toml:",inline" json:",inline"` | ||
| } | ||
|
|
||
| type BackendCluster struct { | ||
| Name string `yaml:"name,omitempty" toml:"name,omitempty" json:"name,omitempty" reloadable:"true"` | ||
| PDAddrs string `yaml:"pd-addrs,omitempty" toml:"pd-addrs,omitempty" json:"pd-addrs,omitempty" reloadable:"true"` | ||
| NSServers []string `yaml:"ns-servers,omitempty" toml:"ns-servers,omitempty" json:"ns-servers,omitempty" reloadable:"true"` | ||
| } | ||
|
|
||
| type API struct { | ||
| Addr string `yaml:"addr,omitempty" toml:"addr,omitempty" json:"addr,omitempty" reloadable:"false"` | ||
| ProxyProtocol string `yaml:"proxy-protocol,omitempty" toml:"proxy-protocol,omitempty" json:"proxy-protocol,omitempty" reloadable:"false"` | ||
|
|
@@ -146,6 +158,11 @@ func NewConfig() *Config { | |
| func (cfg *Config) Clone() *Config { | ||
| newCfg := *cfg | ||
| newCfg.Labels = maps.Clone(cfg.Labels) | ||
| newCfg.Proxy.PublicEndpoints = slices.Clone(cfg.Proxy.PublicEndpoints) | ||
| newCfg.Proxy.BackendClusters = slices.Clone(cfg.Proxy.BackendClusters) | ||
| for i := range newCfg.Proxy.BackendClusters { | ||
| newCfg.Proxy.BackendClusters[i].NSServers = slices.Clone(newCfg.Proxy.BackendClusters[i].NSServers) | ||
| } | ||
| return &newCfg | ||
| } | ||
|
|
||
|
|
@@ -168,6 +185,9 @@ func (cfg *Config) Check() error { | |
| if cfg.Proxy.ConnBufferSize > 0 && (cfg.Proxy.ConnBufferSize > 16*1024*1024 || cfg.Proxy.ConnBufferSize < 1024) { | ||
| return errors.Wrapf(ErrInvalidConfigValue, "conn-buffer-size must be between 1K and 16M") | ||
| } | ||
| if err := cfg.Proxy.Check(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err := cfg.Balance.Check(); err != nil { | ||
| return err | ||
|
|
@@ -183,15 +203,16 @@ func (cfg *Config) ToBytes() ([]byte, error) { | |
| } | ||
|
|
||
| func (cfg *Config) GetIPPort() (ip, port, statusPort string, err error) { | ||
| addrs := strings.Split(cfg.Proxy.Addr, ",") | ||
| addrs, err := cfg.Proxy.GetSQLAddrs() | ||
| if err != nil { | ||
| return | ||
| } | ||
| ip, port, err = net.SplitHostPort(addrs[0]) | ||
| if err != nil { | ||
| err = errors.WithStack(err) | ||
| return | ||
| } | ||
| _, statusPort, err = net.SplitHostPort(cfg.API.Addr) | ||
| if err != nil { | ||
| err = errors.WithStack(err) | ||
| return | ||
| } | ||
| // AdvertiseAddr may be a DNS in k8s and certificate SAN typically contains DNS but not IP. | ||
|
|
@@ -217,3 +238,140 @@ func (cfg *Config) GetIPPort() (ip, port, statusPort string, err error) { | |
| } | ||
| return | ||
| } | ||
|
|
||
| // GetBackendClusters returns configured backend clusters. | ||
| // It keeps backward compatibility for the legacy `proxy.pd-addrs` setting. | ||
| func (cfg *Config) GetBackendClusters() []BackendCluster { | ||
| if len(cfg.Proxy.BackendClusters) > 0 { | ||
| return cfg.Proxy.BackendClusters | ||
| } | ||
| if strings.TrimSpace(cfg.Proxy.PDAddrs) == "" { | ||
| return nil | ||
| } | ||
| return []BackendCluster{{ | ||
| Name: "default", | ||
| PDAddrs: cfg.Proxy.PDAddrs, | ||
| }} | ||
| } | ||
|
|
||
| func (ps *ProxyServer) Check() error { | ||
| if _, err := ps.GetSQLAddrs(); err != nil { | ||
| return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.addr or proxy.port-range: %s", err.Error()) | ||
| } | ||
| if len(ps.BackendClusters) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| clusterNames := make(map[string]struct{}, len(ps.BackendClusters)) | ||
| for i, cluster := range ps.BackendClusters { | ||
| name := strings.TrimSpace(cluster.Name) | ||
| if name == "" { | ||
| return errors.Wrapf(ErrInvalidConfigValue, "proxy.backend-clusters[%d].name is empty", i) | ||
| } | ||
| if _, ok := clusterNames[name]; ok { | ||
| return errors.Wrapf(ErrInvalidConfigValue, "duplicate proxy.backend-clusters name %s", name) | ||
| } | ||
| clusterNames[name] = struct{}{} | ||
| if err := validateAddrList(cluster.PDAddrs, "proxy.backend-clusters.pd-addrs"); err != nil { | ||
| return err | ||
| } | ||
| if _, err := ParseNSServers(cluster.NSServers); err != nil { | ||
| return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.backend-clusters.ns-servers: %s", err.Error()) | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func splitAddrList(addrs string) []string { | ||
| parts := strings.Split(addrs, ",") | ||
| trimmed := make([]string, 0, len(parts)) | ||
| for _, part := range parts { | ||
| addr := strings.TrimSpace(part) | ||
| if addr != "" { | ||
| trimmed = append(trimmed, addr) | ||
| } | ||
| } | ||
| return trimmed | ||
| } | ||
|
|
||
| func validateAddrList(addrs, field string) error { | ||
| parts := splitAddrList(addrs) | ||
| if len(parts) == 0 { | ||
| return errors.Wrapf(ErrInvalidConfigValue, "%s is empty", field) | ||
| } | ||
| for _, addr := range parts { | ||
| if _, _, err := net.SplitHostPort(addr); err != nil { | ||
| return errors.Wrapf(ErrInvalidConfigValue, "invalid %s address %s", field, addr) | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func ParseNSServers(nsServers []string) ([]string, error) { | ||
| if len(nsServers) == 0 { | ||
| return nil, nil | ||
| } | ||
| normalized := make([]string, 0, len(nsServers)) | ||
| for _, server := range nsServers { | ||
| addr, err := normalizeNSServer(server) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| normalized = append(normalized, addr) | ||
| } | ||
| return normalized, nil | ||
| } | ||
|
|
||
| func normalizeNSServer(server string) (string, error) { | ||
| host, port, err := net.SplitHostPort(server) | ||
| if err == nil { | ||
| if host == "" { | ||
| return "", errors.Wrapf(ErrInvalidConfigValue, "host is empty") | ||
| } | ||
| portNum, err := strconv.Atoi(port) | ||
| if err != nil || portNum < 1 || portNum > 65535 { | ||
| return "", errors.Wrapf(ErrInvalidConfigValue, "port is invalid") | ||
| } | ||
| return net.JoinHostPort(host, strconv.Itoa(portNum)), nil | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just return
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's also fine to just return |
||
| } | ||
|
|
||
| if server == "" { | ||
| return "", errors.Wrapf(ErrInvalidConfigValue, "host is empty") | ||
| } | ||
| if strings.ContainsAny(server, "[]") { | ||
| return "", errors.Wrapf(ErrInvalidConfigValue, "host is invalid") | ||
| } | ||
| return net.JoinHostPort(server, "53"), nil | ||
| } | ||
|
|
||
| func (ps *ProxyServer) GetSQLAddrs() ([]string, error) { | ||
| addrs := splitAddrList(ps.Addr) | ||
| if len(addrs) == 0 { | ||
| if len(ps.PortRange) == 0 { | ||
| return []string{""}, nil | ||
| } | ||
| return nil, errors.Wrapf(ErrInvalidConfigValue, "proxy.addr is empty") | ||
| } | ||
| if len(ps.PortRange) == 0 { | ||
| return addrs, nil | ||
| } | ||
| if len(ps.PortRange) != 2 { | ||
| return nil, errors.Wrapf(ErrInvalidConfigValue, "proxy.port-range must contain exactly two ports") | ||
| } | ||
| start, end := ps.PortRange[0], ps.PortRange[1] | ||
| if start < 1 || start > 65535 || end < 1 || end > 65535 || start > end { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if a port is assigned by BackendIO before it's assigned as a listening port?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Listeners are not hot-configurable, so it will listen to all ports at the beginning, and no port will be assigned before it listens the port. The |
||
| return nil, errors.Wrapf(ErrInvalidConfigValue, "proxy.port-range is invalid") | ||
| } | ||
| if len(addrs) != 1 { | ||
| return nil, errors.Wrapf(ErrInvalidConfigValue, "proxy.addr must contain exactly one host when proxy.port-range is set") | ||
| } | ||
| host, _, err := net.SplitHostPort(addrs[0]) | ||
| if err != nil { | ||
| return nil, errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.addr: %s", err.Error()) | ||
| } | ||
| sqlAddrs := make([]string, 0, end-start+1) | ||
| for port := start; port <= end; port++ { | ||
| sqlAddrs = append(sqlAddrs, net.JoinHostPort(host, strconv.Itoa(port))) | ||
| } | ||
| return sqlAddrs, nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reloadable means it can be updated online. Why not put it in
ProxyServerOnline?Besides, I don't see how it's hot-reloaded.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I've moved it to
ProxyServerOnline. Sorry I didn't get the meaning ofProxyServerOnlinepreviously 😢Also through API and
watchConfigchannel, you can find it in later PR https://github.com/pingcap/tiproxy/pull/1104/changes#diff-3e09c0d7ae18b71320d7c24a83a3be7bc8437ca68f7aaa9de953ef7fbc0a6810R80 .