From 1b539a622e0bc4827cfdb976688b1d65b22e04fa Mon Sep 17 00:00:00 2001 From: yibin87 Date: Thu, 23 Apr 2026 10:37:07 +0800 Subject: [PATCH 1/2] topsql_v2: discover coprocessor workers in nextgen topology --- src/common/topology/fetch/tikv_nextgen.rs | 164 +++++++++++++++++++--- 1 file changed, 143 insertions(+), 21 deletions(-) diff --git a/src/common/topology/fetch/tikv_nextgen.rs b/src/common/topology/fetch/tikv_nextgen.rs index a38c4017..7aaf51e5 100644 --- a/src/common/topology/fetch/tikv_nextgen.rs +++ b/src/common/topology/fetch/tikv_nextgen.rs @@ -18,6 +18,23 @@ pub enum FetchError { }, } +const K8S_COMPONENT_LABEL: &str = "app.kubernetes.io/component"; +const K8S_INSTANCE_LABEL: &str = "app.kubernetes.io/instance"; + +const TIKV_COMPONENT: &str = "tikv"; +const COPROCESSOR_WORKER_COMPONENTS: [&str; 5] = [ + "coprocessor_worker", + "coprocessor-worker", + "tikv-worker", + "cloud-worker", + "cloud_worker", +]; + +const TIKV_GRPC_PORT: u16 = 20160; +const TIKV_STATUS_PORT: u16 = 20180; +const COPROCESSOR_WORKER_GRPC_PORT: u16 = 9500; +const COPROCESSOR_WORKER_HTTP_PORT: u16 = 19000; + pub struct TiKVNextGenTopologyFetcher { client: Client, label_k8s_instance: String, @@ -39,10 +56,7 @@ impl TiKVNextGenTopologyFetcher { tokio::fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace") .await .context(GetNamespaceSnafu)?; - let label_selector = format!( - "app.kubernetes.io/component=tikv,app.kubernetes.io/instance={}", - self.label_k8s_instance - ); + let label_selector = build_label_selector(&self.label_k8s_instance); let pod_list = Api::::namespaced(self.client.clone(), &namespace) .list(&ListParams::default().labels(&label_selector)) .await @@ -51,25 +65,133 @@ impl TiKVNextGenTopologyFetcher { label_k8s_instance: self.label_k8s_instance.clone(), })?; for pod in pod_list.items { - if let Some(status) = pod.status { - if status.phase.as_deref() != Some("Running") { - continue; - } - if let Some(pod_ip) = status.pod_ip { - if pod_ip.is_empty() { - continue; - } - let pod_name = pod.metadata.name.clone().unwrap_or_default(); - components.insert(Component { - instance_type: InstanceType::TiKV, - host: pod_ip, - primary_port: 20160, - secondary_port: 20180, - instance_name: Some(pod_name), - }); - } + if let Some(component) = component_from_pod(&pod) { + components.insert(component); } } Ok(()) } } + +fn build_label_selector(label_k8s_instance: &str) -> String { + let component_values = std::iter::once(TIKV_COMPONENT) + .chain(COPROCESSOR_WORKER_COMPONENTS) + .collect::>() + .join(","); + format!( + "{K8S_COMPONENT_LABEL} in ({component_values}),{K8S_INSTANCE_LABEL}={label_k8s_instance}" + ) +} + +fn component_from_pod(pod: &Pod) -> Option { + let status = pod.status.as_ref()?; + if status.phase.as_deref() != Some("Running") { + return None; + } + + let pod_ip = status.pod_ip.as_deref()?; + if pod_ip.trim().is_empty() { + return None; + } + + let component_label = pod + .metadata + .labels + .as_ref() + .and_then(|labels| labels.get(K8S_COMPONENT_LABEL)) + .map(String::as_str) + .unwrap_or(TIKV_COMPONENT); + + let (primary_port, secondary_port) = if is_coprocessor_worker_component(component_label) { + (COPROCESSOR_WORKER_GRPC_PORT, COPROCESSOR_WORKER_HTTP_PORT) + } else { + (TIKV_GRPC_PORT, TIKV_STATUS_PORT) + }; + + Some(Component { + instance_type: InstanceType::TiKV, + host: pod_ip.to_string(), + primary_port, + secondary_port, + instance_name: pod.metadata.name.clone(), + }) +} + +fn is_coprocessor_worker_component(component: &str) -> bool { + COPROCESSOR_WORKER_COMPONENTS.contains(&component) +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use k8s_openapi::{api::core::v1::PodStatus, apimachinery::pkg::apis::meta::v1::ObjectMeta}; + + use super::*; + + fn pod(name: &str, component: &str, phase: &str, pod_ip: &str) -> Pod { + Pod { + metadata: ObjectMeta { + name: Some(name.to_string()), + labels: Some(BTreeMap::from([( + K8S_COMPONENT_LABEL.to_string(), + component.to_string(), + )])), + ..Default::default() + }, + status: Some(PodStatus { + phase: Some(phase.to_string()), + pod_ip: Some(pod_ip.to_string()), + ..Default::default() + }), + ..Default::default() + } + } + + #[test] + fn builds_selector_with_coprocessor_worker_aliases() { + let selector = build_label_selector("demo-cluster"); + + assert!(selector.contains("app.kubernetes.io/component in (")); + assert!(selector.contains("tikv")); + assert!(selector.contains("coprocessor_worker")); + assert!(selector.contains("tikv-worker")); + assert!(selector.contains("app.kubernetes.io/instance=demo-cluster")); + } + + #[test] + fn builds_tikv_component_from_tikv_pod() { + let component = component_from_pod(&pod("tikv-0", "tikv", "Running", "10.0.0.1")).unwrap(); + + assert_eq!(component.instance_type, InstanceType::TiKV); + assert_eq!(component.host, "10.0.0.1"); + assert_eq!(component.primary_port, TIKV_GRPC_PORT); + assert_eq!(component.secondary_port, TIKV_STATUS_PORT); + assert_eq!(component.instance_name.as_deref(), Some("tikv-0")); + } + + #[test] + fn builds_tikv_component_from_coprocessor_worker_pod() { + let component = component_from_pod(&pod( + "coprocessor-worker-0", + "coprocessor_worker", + "Running", + "10.0.0.2", + )) + .unwrap(); + + assert_eq!(component.instance_type, InstanceType::TiKV); + assert_eq!(component.host, "10.0.0.2"); + assert_eq!(component.primary_port, COPROCESSOR_WORKER_GRPC_PORT); + assert_eq!(component.secondary_port, COPROCESSOR_WORKER_HTTP_PORT); + assert_eq!( + component.instance_name.as_deref(), + Some("coprocessor-worker-0") + ); + } + + #[test] + fn skips_non_running_pod() { + assert!(component_from_pod(&pod("tikv-1", "tikv", "Pending", "10.0.0.3")).is_none()); + } +} From 92d0c15864816729d135c0391833b91582e013e4 Mon Sep 17 00:00:00 2001 From: yibin87 Date: Thu, 23 Apr 2026 13:28:34 +0800 Subject: [PATCH 2/2] topsql_v2: narrow worker discovery label --- src/common/topology/fetch/tikv_nextgen.rs | 29 +++++++++++++---------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/common/topology/fetch/tikv_nextgen.rs b/src/common/topology/fetch/tikv_nextgen.rs index 7aaf51e5..ffe835c1 100644 --- a/src/common/topology/fetch/tikv_nextgen.rs +++ b/src/common/topology/fetch/tikv_nextgen.rs @@ -22,13 +22,7 @@ const K8S_COMPONENT_LABEL: &str = "app.kubernetes.io/component"; const K8S_INSTANCE_LABEL: &str = "app.kubernetes.io/instance"; const TIKV_COMPONENT: &str = "tikv"; -const COPROCESSOR_WORKER_COMPONENTS: [&str; 5] = [ - "coprocessor_worker", - "coprocessor-worker", - "tikv-worker", - "cloud-worker", - "cloud_worker", -]; +const COPROCESSOR_WORKER_COMPONENT: &str = "coprocessor-worker"; const TIKV_GRPC_PORT: u16 = 20160; const TIKV_STATUS_PORT: u16 = 20180; @@ -75,7 +69,7 @@ impl TiKVNextGenTopologyFetcher { fn build_label_selector(label_k8s_instance: &str) -> String { let component_values = std::iter::once(TIKV_COMPONENT) - .chain(COPROCESSOR_WORKER_COMPONENTS) + .chain(std::iter::once(COPROCESSOR_WORKER_COMPONENT)) .collect::>() .join(","); format!( @@ -118,7 +112,7 @@ fn component_from_pod(pod: &Pod) -> Option { } fn is_coprocessor_worker_component(component: &str) -> bool { - COPROCESSOR_WORKER_COMPONENTS.contains(&component) + component == COPROCESSOR_WORKER_COMPONENT } #[cfg(test)] @@ -149,13 +143,12 @@ mod tests { } #[test] - fn builds_selector_with_coprocessor_worker_aliases() { + fn builds_selector_with_coprocessor_worker_component() { let selector = build_label_selector("demo-cluster"); assert!(selector.contains("app.kubernetes.io/component in (")); assert!(selector.contains("tikv")); - assert!(selector.contains("coprocessor_worker")); - assert!(selector.contains("tikv-worker")); + assert!(selector.contains("coprocessor-worker")); assert!(selector.contains("app.kubernetes.io/instance=demo-cluster")); } @@ -174,7 +167,7 @@ mod tests { fn builds_tikv_component_from_coprocessor_worker_pod() { let component = component_from_pod(&pod( "coprocessor-worker-0", - "coprocessor_worker", + "coprocessor-worker", "Running", "10.0.0.2", )) @@ -194,4 +187,14 @@ mod tests { fn skips_non_running_pod() { assert!(component_from_pod(&pod("tikv-1", "tikv", "Pending", "10.0.0.3")).is_none()); } + + #[test] + fn does_not_treat_other_worker_aliases_as_coprocessor_worker() { + let component = + component_from_pod(&pod("tikv-worker-0", "tikv-worker", "Running", "10.0.0.4")) + .unwrap(); + + assert_eq!(component.primary_port, TIKV_GRPC_PORT); + assert_eq!(component.secondary_port, TIKV_STATUS_PORT); + } }