From 25af834c4569d38bc0fe79df72ebfda163c3f8d0 Mon Sep 17 00:00:00 2001 From: Loick MAHIEUX Date: Fri, 13 Mar 2026 16:32:36 +0100 Subject: [PATCH 1/2] fix(machine): propagate MaxConcurrentReconciles to drain controller The drain controller was always created with default options, ignoring the MaxConcurrentReconciles value passed via caller opts. This meant platforms like Azure that set concurrent reconciles to 10 would still have the drain controller running with a single worker, creating a bottleneck during large-scale drain operations. Propagate MaxConcurrentReconciles from the incoming opts to the drain controller options so both controllers scale equally. Case: #04399828 --- pkg/controller/machine/controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/controller/machine/controller.go b/pkg/controller/machine/controller.go index 0bb0b3d038..d77ed76bd5 100644 --- a/pkg/controller/machine/controller.go +++ b/pkg/controller/machine/controller.go @@ -100,6 +100,7 @@ func AddWithActuatorOpts(mgr manager.Manager, actuator Actuator, opts controller } if err := addWithOpts(mgr, controller.Options{ + MaxConcurrentReconciles: opts.MaxConcurrentReconciles, Reconciler: newDrainController(mgr), RateLimiter: newDrainRateLimiter(), }, "machine-drain-controller"); err != nil { From 7200e7efcee16ea5151f055abc014c5422dfaf94 Mon Sep 17 00:00:00 2001 From: Loick MAHIEUX Date: Wed, 25 Mar 2026 15:31:12 +0100 Subject: [PATCH 2/2] fix(drain): ensure only one control plane node is cordoned at a time Ensure that only one control plane node can be cordoned at a time by using a mutex lock. This prevents simultaneous drains that could lead to inconsistencies. --- pkg/controller/machine/drain_controller.go | 42 +++++++-- .../machine/drain_controller_test.go | 92 +++++++++++++++++++ 2 files changed, 126 insertions(+), 8 deletions(-) diff --git a/pkg/controller/machine/drain_controller.go b/pkg/controller/machine/drain_controller.go index d75b146dd5..fdf02804d2 100644 --- a/pkg/controller/machine/drain_controller.go +++ b/pkg/controller/machine/drain_controller.go @@ -3,6 +3,7 @@ package machine import ( "context" "fmt" + "sync" "time" "golang.org/x/time/rate" @@ -38,6 +39,13 @@ type machineDrainController struct { scheme *runtime.Scheme eventRecorder record.EventRecorder + + // controlPlaneDrainLock ensures only one control plane node can pass the + // isDrainAllowed check and be cordoned at a time. Without this, concurrent + // reconciles of different control plane machines can both pass + // isDrainAllowed before either cordons, leading to simultaneous control + // plane node drains (TOCTOU race). + controlPlaneDrainLock sync.Mutex } // newDrainController returns a new reconcile.Reconciler for machine-drain-controller @@ -135,10 +143,6 @@ func (d *machineDrainController) drainNode(ctx context.Context, machine *machine return fmt.Errorf("unable to get node %q: %v", machine.Status.NodeRef.Name, err) } - if err := d.isDrainAllowed(ctx, node); err != nil { - return fmt.Errorf("drain not permitted: %w", err) - } - drainer := &drain.Helper{ Ctx: ctx, Client: kubeClient, @@ -170,10 +174,8 @@ func (d *machineDrainController) drainNode(ctx context.Context, machine *machine drainer.GracePeriodSeconds = 1 } - if err := drain.RunCordonOrUncordon(drainer, node, true); err != nil { - // Can't cordon a node - klog.Warningf("cordon failed for node %q: %v", node.Name, err) - return &RequeueAfterError{RequeueAfter: 20 * time.Second} + if err := d.cordonNode(ctx, drainer, node); err != nil { + return err } if err := drain.RunNodeDrain(drainer, node.Name); err != nil { @@ -193,6 +195,30 @@ func (d *machineDrainController) drainNode(ctx context.Context, machine *machine return nil } +// cordonNode checks whether draining is allowed and cordons the node. +// For uncordoned control plane nodes, the check and cordon are held under +// controlPlaneDrainLock so that only one CP node at a time can pass the +// isDrainAllowed check and get cordoned, preventing a TOCTOU race where +// concurrent reconciles would each see no other CP node cordoned and then +// all proceed to drain simultaneously. +func (d *machineDrainController) cordonNode(ctx context.Context, drainer *drain.Helper, node *corev1.Node) error { + if isControlPlaneNode(*node) && !node.Spec.Unschedulable { + d.controlPlaneDrainLock.Lock() + defer d.controlPlaneDrainLock.Unlock() + } + + if err := d.isDrainAllowed(ctx, node); err != nil { + return fmt.Errorf("drain not permitted: %w", err) + } + + if err := drain.RunCordonOrUncordon(drainer, node, true); err != nil { + klog.Warningf("cordon failed for node %q: %v", node.Name, err) + return &RequeueAfterError{RequeueAfter: 20 * time.Second} + } + + return nil +} + // isDrainAllowed checks whether the drain is permitted at this time. // It checks the following: // - Is the node cordoned, if so allow draining to complete any previous attempt to drain. diff --git a/pkg/controller/machine/drain_controller_test.go b/pkg/controller/machine/drain_controller_test.go index e00df049a8..3602f33c3d 100644 --- a/pkg/controller/machine/drain_controller_test.go +++ b/pkg/controller/machine/drain_controller_test.go @@ -2,6 +2,7 @@ package machine import ( "context" + "sync" "testing" "time" @@ -11,8 +12,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + kubefake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" + "k8s.io/kubectl/pkg/drain" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -345,3 +349,91 @@ func controlPlaneLabel(n *corev1.Node) { func masterLabel(n *corev1.Node) { n.GetLabels()[nodeMasterLabel] = "" } + +func TestCordonNodeSerializesCPDrains(t *testing.T) { + g := NewGomegaWithT(t) + + cpNode1 := newNode("cp-node-1", controlPlaneLabel) + cpNode2 := newNode("cp-node-2", controlPlaneLabel) + workerNode := newNode("worker-1") + + // controller-runtime fake client for isDrainAllowed (List) + crClient := fake.NewClientBuilder(). + WithScheme(scheme.Scheme). + WithRuntimeObjects(cpNode1, cpNode2, workerNode). + Build() + + // kube fake clientset for drain.RunCordonOrUncordon (node update) + kubeClient := kubefake.NewSimpleClientset(cpNode1.DeepCopy(), cpNode2.DeepCopy(), workerNode.DeepCopy()) + + // Bridge: when the kube client cordons a node via patch (sets Unschedulable=true), + // also update the controller-runtime fake client so isDrainAllowed sees it. + // This simulates the informer cache catching up after a direct API write. + // RunCordonOrUncordon uses a strategic merge patch, so we intercept "patch" actions. + kubeClient.PrependReactor("patch", "nodes", func(action clienttesting.Action) (bool, runtime.Object, error) { + patchAction, ok := action.(clienttesting.PatchAction) + if !ok { + return false, nil, nil + } + nodeName := patchAction.GetName() + + // Propagate the cordon to the controller-runtime fake client + existing := &corev1.Node{} + if err := crClient.Get(context.Background(), types.NamespacedName{Name: nodeName}, existing); err == nil { + existing.Spec.Unschedulable = true + if err := crClient.Update(context.Background(), existing); err != nil { + t.Logf("bridge: failed to update CR client for node %s: %v", nodeName, err) + } + } + return false, nil, nil // fall through to the default handler + }) + + d := &machineDrainController{ + Client: crClient, + } + + makeDrainer := func() *drain.Helper { + return &drain.Helper{ + Ctx: context.Background(), + Client: kubeClient, + Force: true, + IgnoreAllDaemonSets: true, + DeleteEmptyDirData: true, + GracePeriodSeconds: -1, + Timeout: 20 * time.Second, + Out: writer{t.Log}, + ErrOut: writer{t.Log}, + } + } + + var wg sync.WaitGroup + errs := make([]error, 2) + + wg.Add(2) + go func() { + defer wg.Done() + errs[0] = d.cordonNode(context.Background(), makeDrainer(), cpNode1) + }() + go func() { + defer wg.Done() + errs[1] = d.cordonNode(context.Background(), makeDrainer(), cpNode2) + }() + wg.Wait() + + // Exactly one should succeed and one should get a RequeueAfterError, + // because the mutex serializes the check+cordon and the second goroutine + // sees the first node as already cordoned. + succeeded := 0 + requeued := 0 + for _, err := range errs { + if err == nil { + succeeded++ + } else { + g.Expect(err).To(MatchError(ContainSubstring("drain not permitted"))) + requeued++ + } + } + + g.Expect(succeeded).To(Equal(1), "exactly one CP drain should succeed") + g.Expect(requeued).To(Equal(1), "exactly one CP drain should be requeued") +}