Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions pkg/controller/drain/drain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,13 @@ func (ctrl *Controller) handleNodeEvent(oldObj, newObj interface{}) {
return
}

// If the desiredDrain annotation are identical between oldNode and newNode, no new action is required by the drain controller
if oldNode.Annotations[daemonconsts.DesiredDrainerAnnotationKey] == newNode.Annotations[daemonconsts.DesiredDrainerAnnotationKey] {
// Enqueue if the drain annotation changed OR if Spec.Unschedulable changed (e.g. external uncordon during drain).
if oldNode.Annotations[daemonconsts.DesiredDrainerAnnotationKey] == newNode.Annotations[daemonconsts.DesiredDrainerAnnotationKey] &&
oldNode.Spec.Unschedulable == newNode.Spec.Unschedulable {
return
}
klog.V(4).Infof("node %s: enqueueing (desiredDrainer=%q unschedulable=%v)", newNode.Name,
newNode.Annotations[daemonconsts.DesiredDrainerAnnotationKey], newNode.Spec.Unschedulable)
ctrl.enqueueNode(newNode)
}

Expand Down Expand Up @@ -290,18 +293,23 @@ func (ctrl *Controller) syncNode(key string) error {
}

desiredState := node.Annotations[daemonconsts.DesiredDrainerAnnotationKey]
desiredVerb := strings.Split(desiredState, "-")[0]

// lastAppliedDrain is only written on success, so desired == lastApplied means
// the controller already completed this request.
if desiredState == node.Annotations[daemonconsts.LastAppliedDrainerAnnotationKey] {
klog.V(4).Infof("Node %v has the correct drain", key)
klog.Infof("Node %v has the correct drain request state", key)
return nil
}

drainer := &drain.Helper{
Client: ctrl.kubeClient,
Force: true,
IgnoreAllDaemonSets: true,
DeleteEmptyDirData: true,
GracePeriodSeconds: -1,
Timeout: ctrl.cfg.DrainHelperTimeout,
Client: ctrl.kubeClient,
Force: true,
IgnoreAllDaemonSets: true,
DeleteEmptyDirData: true,
GracePeriodSeconds: -1,
Timeout: ctrl.cfg.DrainHelperTimeout,
EvictErrorRetryDelay: 5 * time.Second,
OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) {
verbStr := "Deleted"
if usingEviction {
Expand All @@ -320,7 +328,6 @@ func (ctrl *Controller) syncNode(key string) error {
return err
}

desiredVerb := strings.Split(desiredState, "-")[0]
switch desiredVerb {
case daemonconsts.DrainerStateUncordon:
ctrl.logNode(node, "uncordoning")
Expand Down Expand Up @@ -362,6 +369,19 @@ func (ctrl *Controller) syncNode(key string) error {
// from here so that we can requeue.
return nil
}
// Re-fetch the node before writing lastApplied. If the node was externally
// uncordoned during the drain, new pods may have been scheduled; writing
// lastApplied would falsely signal to the MCD that it is safe to proceed.
// The handleNodeEvent will have already re-queued the node on the Spec.Unschedulable
// change, so just return without writing the annotation.
node, err = ctrl.nodeLister.Get(name)
if err != nil {
return err
}
if !node.Spec.Unschedulable {
klog.Infof("node %s: externally uncordoned during drain, skipping completion annotation", name)
return nil
}
default:
return fmt.Errorf("node %s: non-recognized drain verb detected %s", node.Name, desiredVerb)
}
Expand Down
89 changes: 77 additions & 12 deletions pkg/controller/drain/drain_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeinformers "k8s.io/client-go/informers"
coreinformersv1 "k8s.io/client-go/informers/core/v1"
k8sfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"

Expand Down Expand Up @@ -71,7 +72,7 @@ func createTestMCP(name string) *mcfgv1.MachineConfigPool {
}
}

func createTestController(nodes []*corev1.Node, mcps []*mcfgv1.MachineConfigPool) (*Controller, *k8sfake.Clientset, *fakemcfgclientset.Clientset) {
func createTestController(nodes []*corev1.Node, mcps []*mcfgv1.MachineConfigPool) (*Controller, *k8sfake.Clientset, *fakemcfgclientset.Clientset, coreinformersv1.NodeInformer) {
kubeObjs := make([]runtime.Object, len(nodes))
for i, node := range nodes {
kubeObjs[i] = node
Expand Down Expand Up @@ -116,7 +117,7 @@ func createTestController(nodes []*corev1.Node, mcps []*mcfgv1.MachineConfigPool
// Initialize ongoing drains map for testing
ctrl.ongoingDrains = make(map[string]time.Time)

return ctrl, kubeClient, mcfgClient
return ctrl, kubeClient, mcfgClient, nodeInformer
}

func createDrainTestNode(nodeName string, unschedulable bool, desiredState, lastAppliedState string) *corev1.Node {
Expand All @@ -131,7 +132,7 @@ func createDrainTestNode(nodeName string, unschedulable bool, desiredState, last
}

func setupControllerAndSync(node *corev1.Node, ongoingDrains map[string]time.Time) (*Controller, *k8sfake.Clientset, error) {
ctrl, kubeClient, _ := createTestController([]*corev1.Node{node}, []*mcfgv1.MachineConfigPool{createTestMCP(testPoolName)})
ctrl, kubeClient, _, _ := createTestController([]*corev1.Node{node}, []*mcfgv1.MachineConfigPool{createTestMCP(testPoolName)})

if ongoingDrains != nil {
ctrl.ongoingDrains = ongoingDrains
Expand All @@ -141,6 +142,59 @@ func setupControllerAndSync(node *corev1.Node, ongoingDrains map[string]time.Tim
return ctrl, kubeClient, err
}

// updateNodeInIndexer returns a fake client reactor that writes unschedulable into the
// informer indexer whenever a node patch containing spec.unschedulable fires. Passing
// true mirrors the patch faithfully (normal cordon); passing false simulates an external
// actor immediately uncordoning the node after the controller's cordon patch lands.
func updateNodeInIndexer(nodeInformer coreinformersv1.NodeInformer, unschedulable bool) func(core.Action) (bool, runtime.Object, error) {
return func(action core.Action) (bool, runtime.Object, error) {
patchAction := action.(core.PatchAction)
var patch map[string]interface{}
if err := json.Unmarshal(patchAction.GetPatch(), &patch); err != nil {
return false, nil, nil
}
spec, ok := patch["spec"].(map[string]interface{})
if !ok {
return false, nil, nil
}
if _, ok := spec["unschedulable"]; !ok {
return false, nil, nil
}
for _, obj := range nodeInformer.Informer().GetIndexer().List() {
n := obj.(*corev1.Node)
if n.Name == patchAction.GetName() {
updated := n.DeepCopy()
updated.Spec.Unschedulable = unschedulable
nodeInformer.Informer().GetIndexer().Update(updated)
break
}
}
return false, nil, nil
}
}

// setupControllerAndSyncWithIndexerUpdate runs syncNode with a reactor that mirrors cordon
// patches into the informer indexer. The fake client does not emit watch events, so without
// this the lister would return stale data after drainNode cordons the node. This models
// the happy path where no external actor interferes.
func setupControllerAndSyncWithIndexerUpdate(node *corev1.Node) (*Controller, *k8sfake.Clientset, error) {
ctrl, kubeClient, _, nodeInformer := createTestController([]*corev1.Node{node}, []*mcfgv1.MachineConfigPool{createTestMCP(testPoolName)})
kubeClient.PrependReactor("patch", "nodes", updateNodeInIndexer(nodeInformer, true))
err := ctrl.syncNode(testNodeName)
return ctrl, kubeClient, err
}

// setupControllerAndSyncWithExternalUncordon runs syncNode with a reactor that sets
// unschedulable=false in the indexer on every cordon patch, explicitly modeling an external
// actor uncordoning the node before the re-fetch at the end of drainNode. syncNode should
// detect this and skip writing lastApplied rather than falsely signalling the MCD.
func setupControllerAndSyncWithExternalUncordon(node *corev1.Node) (*Controller, *k8sfake.Clientset, error) {
ctrl, kubeClient, _, nodeInformer := createTestController([]*corev1.Node{node}, []*mcfgv1.MachineConfigPool{createTestMCP(testPoolName)})
kubeClient.PrependReactor("patch", "nodes", updateNodeInIndexer(nodeInformer, false))
err := ctrl.syncNode(testNodeName)
return ctrl, kubeClient, err
}

func verifyDrainPatches(t *testing.T, kubeClient *k8sfake.Clientset, expectedUnschedulable bool, expectedAnnotationValue string) {
// Collect all patch actions
patchActions := []core.PatchAction{}
Expand Down Expand Up @@ -192,23 +246,34 @@ func TestSyncNode(t *testing.T) {

t.Run("drain requested", func(t *testing.T) {
node := createDrainTestNode(testNodeName, false, testDrainState, "")
_, kubeClient, err := setupControllerAndSync(node, nil)
_, kubeClient, err := setupControllerAndSyncWithIndexerUpdate(node)
assert.NoError(t, err, "syncNode should not error for drain action")

// Verify patch operations: cordon (unschedulable=true) + completion annotation
verifyDrainPatches(t, kubeClient, true, testDrainState)
})

t.Run("re-cordon required", func(t *testing.T) {
t.Run("externally uncordoned during drain", func(t *testing.T) {
// Drain succeeds but the node is schedulable when re-fetched afterward.
// lastApplied must NOT be written: it is the signal to the MCD that the node
// is safely drained and cordoned, and writing it while schedulable would allow
// the MCD to proceed with the config change while new pods can still land.
node := createDrainTestNode(testNodeName, false, testDrainState, "")
// Simulate ongoing drain (but node is not cordoned - external uncordon)
ongoingDrains := map[string]time.Time{
testNodeName: time.Now().Add(-5 * time.Minute),
_, kubeClient, err := setupControllerAndSyncWithExternalUncordon(node)
assert.NoError(t, err, "syncNode should not error when externally uncordoned during drain")

patchActions := []core.PatchAction{}
for _, action := range kubeClient.Actions() {
if patchAction, ok := action.(core.PatchAction); ok {
patchActions = append(patchActions, patchAction)
}
}
_, kubeClient, err := setupControllerAndSync(node, ongoingDrains)
assert.NoError(t, err, "syncNode should not error for re-cordon action")
assert.Len(t, patchActions, 1, "should have made only the cordon patch, not the completion annotation")

// Verify patch operations: re-cordon (unschedulable=true) + completion annotation
verifyDrainPatches(t, kubeClient, true, testDrainState)
var patch map[string]any
assert.NoError(t, json.Unmarshal(patchActions[0].GetPatch(), &patch))
if spec, ok := patch["spec"].(map[string]any); ok {
assert.Equal(t, true, spec["unschedulable"], "only patch should be the cordon")
}
})
}