From 25d9c6dc59b7cfaf678cc1180e452ba3a0424324 Mon Sep 17 00:00:00 2001 From: David Joshy Date: Tue, 12 May 2026 10:40:34 -0400 Subject: [PATCH] drain: fix re-cordon detection --- pkg/controller/drain/drain_controller.go | 40 ++++++--- pkg/controller/drain/drain_controller_test.go | 89 ++++++++++++++++--- 2 files changed, 107 insertions(+), 22 deletions(-) diff --git a/pkg/controller/drain/drain_controller.go b/pkg/controller/drain/drain_controller.go index 6ecd820067..42063f44de 100644 --- a/pkg/controller/drain/drain_controller.go +++ b/pkg/controller/drain/drain_controller.go @@ -206,10 +206,13 @@ func (ctrl *Controller) handleNodeEvent(oldObj, newObj interface{}) { return } - // If the desiredDrain annotation are identical between oldNode and newNode, no new action is required by the drain controller - if oldNode.Annotations[daemonconsts.DesiredDrainerAnnotationKey] == newNode.Annotations[daemonconsts.DesiredDrainerAnnotationKey] { + // Enqueue if the drain annotation changed OR if Spec.Unschedulable changed (e.g. external uncordon during drain). + if oldNode.Annotations[daemonconsts.DesiredDrainerAnnotationKey] == newNode.Annotations[daemonconsts.DesiredDrainerAnnotationKey] && + oldNode.Spec.Unschedulable == newNode.Spec.Unschedulable { return } + klog.V(4).Infof("node %s: enqueueing (desiredDrainer=%q unschedulable=%v)", newNode.Name, + newNode.Annotations[daemonconsts.DesiredDrainerAnnotationKey], newNode.Spec.Unschedulable) ctrl.enqueueNode(newNode) } @@ -290,18 +293,23 @@ func (ctrl *Controller) syncNode(key string) error { } desiredState := node.Annotations[daemonconsts.DesiredDrainerAnnotationKey] + desiredVerb := strings.Split(desiredState, "-")[0] + + // lastAppliedDrain is only written on success, so desired == lastApplied means + // the controller already completed this request. if desiredState == node.Annotations[daemonconsts.LastAppliedDrainerAnnotationKey] { - klog.V(4).Infof("Node %v has the correct drain", key) + klog.Infof("Node %v has the correct drain request state", key) return nil } drainer := &drain.Helper{ - Client: ctrl.kubeClient, - Force: true, - IgnoreAllDaemonSets: true, - DeleteEmptyDirData: true, - GracePeriodSeconds: -1, - Timeout: ctrl.cfg.DrainHelperTimeout, + Client: ctrl.kubeClient, + Force: true, + IgnoreAllDaemonSets: true, + DeleteEmptyDirData: true, + GracePeriodSeconds: -1, + Timeout: ctrl.cfg.DrainHelperTimeout, + EvictErrorRetryDelay: 5 * time.Second, OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) { verbStr := "Deleted" if usingEviction { @@ -320,7 +328,6 @@ func (ctrl *Controller) syncNode(key string) error { return err } - desiredVerb := strings.Split(desiredState, "-")[0] switch desiredVerb { case daemonconsts.DrainerStateUncordon: ctrl.logNode(node, "uncordoning") @@ -362,6 +369,19 @@ func (ctrl *Controller) syncNode(key string) error { // from here so that we can requeue. return nil } + // Re-fetch the node before writing lastApplied. If the node was externally + // uncordoned during the drain, new pods may have been scheduled; writing + // lastApplied would falsely signal to the MCD that it is safe to proceed. + // The handleNodeEvent will have already re-queued the node on the Spec.Unschedulable + // change, so just return without writing the annotation. + node, err = ctrl.nodeLister.Get(name) + if err != nil { + return err + } + if !node.Spec.Unschedulable { + klog.Infof("node %s: externally uncordoned during drain, skipping completion annotation", name) + return nil + } default: return fmt.Errorf("node %s: non-recognized drain verb detected %s", node.Name, desiredVerb) } diff --git a/pkg/controller/drain/drain_controller_test.go b/pkg/controller/drain/drain_controller_test.go index 87dd57f3ca..e50bd370ef 100644 --- a/pkg/controller/drain/drain_controller_test.go +++ b/pkg/controller/drain/drain_controller_test.go @@ -11,6 +11,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" kubeinformers "k8s.io/client-go/informers" + coreinformersv1 "k8s.io/client-go/informers/core/v1" k8sfake "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" @@ -71,7 +72,7 @@ func createTestMCP(name string) *mcfgv1.MachineConfigPool { } } -func createTestController(nodes []*corev1.Node, mcps []*mcfgv1.MachineConfigPool) (*Controller, *k8sfake.Clientset, *fakemcfgclientset.Clientset) { +func createTestController(nodes []*corev1.Node, mcps []*mcfgv1.MachineConfigPool) (*Controller, *k8sfake.Clientset, *fakemcfgclientset.Clientset, coreinformersv1.NodeInformer) { kubeObjs := make([]runtime.Object, len(nodes)) for i, node := range nodes { kubeObjs[i] = node @@ -116,7 +117,7 @@ func createTestController(nodes []*corev1.Node, mcps []*mcfgv1.MachineConfigPool // Initialize ongoing drains map for testing ctrl.ongoingDrains = make(map[string]time.Time) - return ctrl, kubeClient, mcfgClient + return ctrl, kubeClient, mcfgClient, nodeInformer } func createDrainTestNode(nodeName string, unschedulable bool, desiredState, lastAppliedState string) *corev1.Node { @@ -131,7 +132,7 @@ func createDrainTestNode(nodeName string, unschedulable bool, desiredState, last } func setupControllerAndSync(node *corev1.Node, ongoingDrains map[string]time.Time) (*Controller, *k8sfake.Clientset, error) { - ctrl, kubeClient, _ := createTestController([]*corev1.Node{node}, []*mcfgv1.MachineConfigPool{createTestMCP(testPoolName)}) + ctrl, kubeClient, _, _ := createTestController([]*corev1.Node{node}, []*mcfgv1.MachineConfigPool{createTestMCP(testPoolName)}) if ongoingDrains != nil { ctrl.ongoingDrains = ongoingDrains @@ -141,6 +142,59 @@ func setupControllerAndSync(node *corev1.Node, ongoingDrains map[string]time.Tim return ctrl, kubeClient, err } +// updateNodeInIndexer returns a fake client reactor that writes unschedulable into the +// informer indexer whenever a node patch containing spec.unschedulable fires. Passing +// true mirrors the patch faithfully (normal cordon); passing false simulates an external +// actor immediately uncordoning the node after the controller's cordon patch lands. +func updateNodeInIndexer(nodeInformer coreinformersv1.NodeInformer, unschedulable bool) func(core.Action) (bool, runtime.Object, error) { + return func(action core.Action) (bool, runtime.Object, error) { + patchAction := action.(core.PatchAction) + var patch map[string]interface{} + if err := json.Unmarshal(patchAction.GetPatch(), &patch); err != nil { + return false, nil, nil + } + spec, ok := patch["spec"].(map[string]interface{}) + if !ok { + return false, nil, nil + } + if _, ok := spec["unschedulable"]; !ok { + return false, nil, nil + } + for _, obj := range nodeInformer.Informer().GetIndexer().List() { + n := obj.(*corev1.Node) + if n.Name == patchAction.GetName() { + updated := n.DeepCopy() + updated.Spec.Unschedulable = unschedulable + nodeInformer.Informer().GetIndexer().Update(updated) + break + } + } + return false, nil, nil + } +} + +// setupControllerAndSyncWithIndexerUpdate runs syncNode with a reactor that mirrors cordon +// patches into the informer indexer. The fake client does not emit watch events, so without +// this the lister would return stale data after drainNode cordons the node. This models +// the happy path where no external actor interferes. +func setupControllerAndSyncWithIndexerUpdate(node *corev1.Node) (*Controller, *k8sfake.Clientset, error) { + ctrl, kubeClient, _, nodeInformer := createTestController([]*corev1.Node{node}, []*mcfgv1.MachineConfigPool{createTestMCP(testPoolName)}) + kubeClient.PrependReactor("patch", "nodes", updateNodeInIndexer(nodeInformer, true)) + err := ctrl.syncNode(testNodeName) + return ctrl, kubeClient, err +} + +// setupControllerAndSyncWithExternalUncordon runs syncNode with a reactor that sets +// unschedulable=false in the indexer on every cordon patch, explicitly modeling an external +// actor uncordoning the node before the re-fetch at the end of drainNode. syncNode should +// detect this and skip writing lastApplied rather than falsely signalling the MCD. +func setupControllerAndSyncWithExternalUncordon(node *corev1.Node) (*Controller, *k8sfake.Clientset, error) { + ctrl, kubeClient, _, nodeInformer := createTestController([]*corev1.Node{node}, []*mcfgv1.MachineConfigPool{createTestMCP(testPoolName)}) + kubeClient.PrependReactor("patch", "nodes", updateNodeInIndexer(nodeInformer, false)) + err := ctrl.syncNode(testNodeName) + return ctrl, kubeClient, err +} + func verifyDrainPatches(t *testing.T, kubeClient *k8sfake.Clientset, expectedUnschedulable bool, expectedAnnotationValue string) { // Collect all patch actions patchActions := []core.PatchAction{} @@ -192,23 +246,34 @@ func TestSyncNode(t *testing.T) { t.Run("drain requested", func(t *testing.T) { node := createDrainTestNode(testNodeName, false, testDrainState, "") - _, kubeClient, err := setupControllerAndSync(node, nil) + _, kubeClient, err := setupControllerAndSyncWithIndexerUpdate(node) assert.NoError(t, err, "syncNode should not error for drain action") // Verify patch operations: cordon (unschedulable=true) + completion annotation verifyDrainPatches(t, kubeClient, true, testDrainState) }) - t.Run("re-cordon required", func(t *testing.T) { + t.Run("externally uncordoned during drain", func(t *testing.T) { + // Drain succeeds but the node is schedulable when re-fetched afterward. + // lastApplied must NOT be written: it is the signal to the MCD that the node + // is safely drained and cordoned, and writing it while schedulable would allow + // the MCD to proceed with the config change while new pods can still land. node := createDrainTestNode(testNodeName, false, testDrainState, "") - // Simulate ongoing drain (but node is not cordoned - external uncordon) - ongoingDrains := map[string]time.Time{ - testNodeName: time.Now().Add(-5 * time.Minute), + _, kubeClient, err := setupControllerAndSyncWithExternalUncordon(node) + assert.NoError(t, err, "syncNode should not error when externally uncordoned during drain") + + patchActions := []core.PatchAction{} + for _, action := range kubeClient.Actions() { + if patchAction, ok := action.(core.PatchAction); ok { + patchActions = append(patchActions, patchAction) + } } - _, kubeClient, err := setupControllerAndSync(node, ongoingDrains) - assert.NoError(t, err, "syncNode should not error for re-cordon action") + assert.Len(t, patchActions, 1, "should have made only the cordon patch, not the completion annotation") - // Verify patch operations: re-cordon (unschedulable=true) + completion annotation - verifyDrainPatches(t, kubeClient, true, testDrainState) + var patch map[string]any + assert.NoError(t, json.Unmarshal(patchActions[0].GetPatch(), &patch)) + if spec, ok := patch["spec"].(map[string]any); ok { + assert.Equal(t, true, spec["unschedulable"], "only patch should be the cordon") + } }) }