From 521a38e8ede6786a6e5dc6981c1451a8fce2ce8e Mon Sep 17 00:00:00 2001 From: Michael Weibel Date: Fri, 5 Dec 2025 17:31:54 +0100 Subject: [PATCH] feat: add nodeselector annotation to limit LB pool members --- .golangci.yml | 4 - README.md | 11 + examples/nginx-hello-nodeselector.yml | 56 +++++ pkg/cloudscale_ccm/cloud.go | 32 ++- pkg/cloudscale_ccm/loadbalancer.go | 95 ++++++-- pkg/cloudscale_ccm/loadbalancer_test.go | 281 ++++++++++++++++++++++ pkg/cloudscale_ccm/reconcile.go | 20 +- pkg/cloudscale_ccm/reconcile_test.go | 9 +- pkg/cloudscale_ccm/service_info.go | 2 + pkg/internal/integration/service_test.go | 289 +++++++++++++++++------ pkg/internal/testkit/http.go | 5 +- 11 files changed, 687 insertions(+), 117 deletions(-) create mode 100644 examples/nginx-hello-nodeselector.yml create mode 100644 pkg/cloudscale_ccm/loadbalancer_test.go diff --git a/.golangci.yml b/.golangci.yml index cae4f1a..905f773 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -29,10 +29,6 @@ linters: - usestdlibvars - whitespace - wsl - settings: - lll: - line-length: 80 - tab-width: 4 exclusions: generated: lax presets: diff --git a/README.md b/README.md index 7675126..af16114 100644 --- a/README.md +++ b/README.md @@ -287,6 +287,17 @@ Changes to following annotations may lead to new connections timing out until th - `k8s.cloudscale.ch/loadbalancer-health-monitor-type` - `k8s.cloudscale.ch/loadbalancer-health-monitor-http` +Changes to the `k8s.cloudscale.ch/loadbalancer-node-selector` annotation are generally safe, +as long as the selector matches valid node labels. +When using `externalTrafficPolicy: Local`, ensure the selector targets nodes that are +actually running the backend pods. Otherwise, traffic will be dropped. + +Unlike the well-known node label [`node.kubernetes.io/exclude-from-external-load-balancers=true`](https://kubernetes.io/docs/reference/labels-annotations-taints/#node-kubernetes-io-exclude-from-external-load-balancers), +which globally excludes nodes from *all* LoadBalancer services, this annotation allows +targeting a specific subset of nodes on a per-service basis. +Note that the `exclude-from-external-load-balancers` label is applied first: nodes with +this label are excluded before the `loadbalancer-node-selector` is evaluated. + ##### Listener Port Changes Changes to the outward bound service port have a downtime ranging from 15s to 120s, depending on the action. Since the name of the port is used to avoid expensive pool recreation, the impact is minimal if the port name does not change. diff --git a/examples/nginx-hello-nodeselector.yml b/examples/nginx-hello-nodeselector.yml new file mode 100644 index 0000000..8edb539 --- /dev/null +++ b/examples/nginx-hello-nodeselector.yml @@ -0,0 +1,56 @@ +# Deploys the docker.io/nginxdemos/hello:plain-text container and creates a +# loadbalancer service with a node-selector annotation for it: +# +# export KUBECONFIG=path/to/kubeconfig +# kubectl apply -f nginx-hello-nodeselector.yml +# +# Wait for `kubectl describe service hello` to show "Loadbalancer Ensured", +# then use the IP address found under "LoadBalancer Ingress" to connect to the +# service. +# +# You can also use the following shortcut: +# +# curl http://$(kubectl get service hello -o jsonpath='{.status.loadBalancer.ingress[0].ip}') +# +# If you adjust the nodeSelector of the deployment, or the `loadbalancer-node-selector` annotation on the service, +# you'll notice that if they don't match, the request will fail. +# +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: hello +spec: + replicas: 2 + selector: + matchLabels: + app: hello + template: + metadata: + labels: + app: hello + spec: + containers: + - name: hello + image: docker.io/nginxdemos/hello:plain-text + nodeSelector: + kubernetes.io/hostname: k8test-worker-2 +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: hello + annotations: + k8s.cloudscale.ch/loadbalancer-node-selector: "kubernetes.io/hostname=k8test-worker-2" + name: hello +spec: + ports: + - port: 80 + protocol: TCP + targetPort: 80 + name: http + selector: + app: hello + type: LoadBalancer + externalTrafficPolicy: Local diff --git a/pkg/cloudscale_ccm/cloud.go b/pkg/cloudscale_ccm/cloud.go index 9ea001e..716257a 100644 --- a/pkg/cloudscale_ccm/cloud.go +++ b/pkg/cloudscale_ccm/cloud.go @@ -9,9 +9,12 @@ import ( "strings" "time" - cloudscale "github.com/cloudscale-ch/cloudscale-go-sdk/v6" + "github.com/cloudscale-ch/cloudscale-go-sdk/v6" "golang.org/x/oauth2" - "k8s.io/client-go/kubernetes" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes/scheme" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" cloudprovider "k8s.io/cloud-provider" @@ -32,6 +35,8 @@ const ( type cloud struct { instances *instances loadbalancer *loadbalancer + + eventRecorder record.EventRecorder } // Register this provider with Kubernetes. @@ -112,8 +117,27 @@ func (c *cloud) Initialize( // This cannot be configured earlier, even though it seems better situated // in newCloudscaleClient - c.loadbalancer.k8s = kubernetes.NewForConfigOrDie( - clientBuilder.ConfigOrDie("cloudscale-cloud-controller-manager")) + c.loadbalancer.k8s = clientBuilder.ClientOrDie( + "cloudscale-cloud-controller-manager", + ) + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{ + Interface: c.loadbalancer.k8s.CoreV1().Events(""), + }) + c.eventRecorder = eventBroadcaster.NewRecorder(scheme.Scheme, + corev1.EventSource{ + Component: "cloudscale-cloud-controller-manager", + }, + ) + + go func() { + // wait until stop chan closes + <-stop + eventBroadcaster.Shutdown() + }() + + c.loadbalancer.recorder = c.eventRecorder } // LoadBalancer returns a balancer interface. Also returns true if the diff --git a/pkg/cloudscale_ccm/loadbalancer.go b/pkg/cloudscale_ccm/loadbalancer.go index 82ee99c..b13d405 100644 --- a/pkg/cloudscale_ccm/loadbalancer.go +++ b/pkg/cloudscale_ccm/loadbalancer.go @@ -2,18 +2,20 @@ package cloudscale_ccm import ( "context" - "errors" "fmt" "slices" "strings" - "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/kubeutil" "github.com/cloudscale-ch/cloudscale-go-sdk/v6" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/utils/ptr" + + "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/kubeutil" ) // Annotations used by the loadbalancer integration of cloudscale_ccm. Those @@ -208,7 +210,7 @@ const ( // connections timing out while the monitor is updated. LoadBalancerHealthMonitorTimeoutS = "k8s.cloudscale.ch/loadbalancer-health-monitor-timeout-s" - // LoadBalancerHealthMonitorDownThreshold is the number of the checks that + // LoadBalancerHealthMonitorUpThreshold is the number of the checks that // need to succeed before a pool member is considered up. Defaults to 2. LoadBalancerHealthMonitorUpThreshold = "k8s.cloudscale.ch/loadbalancer-health-monitor-up-threshold" @@ -278,7 +280,7 @@ const ( // Changing this annotation on an established service is considered safe. LoadBalancerListenerTimeoutMemberDataMS = "k8s.cloudscale.ch/loadbalancer-timeout-member-data-ms" - // LoadBalancerSubnetLimit is a JSON list of subnet UUIDs that the + // LoadBalancerListenerAllowedSubnets is a JSON list of subnet UUIDs that the // loadbalancer should use. By default, all subnets of a node are used: // // * `[]` means that anyone is allowed to connect (default). @@ -291,12 +293,21 @@ const ( // This is an advanced feature, useful if you have nodes that are in // multiple private subnets. LoadBalancerListenerAllowedSubnets = "k8s.cloudscale.ch/loadbalancer-listener-allowed-subnets" + + // LoadBalancerNodeSelector can be set to restrict which nodes are added to the LB pool. + // It accepts a standard Kubernetes label selector string. + // + // N.B.: If the node-selector annotation doesn't match any nodes, the LoadBalancer will remove all members + // from the LB pool, effectively causing a downtime on the LB. + // Make sure to verify the node selector well before setting it. + LoadBalancerNodeSelector = "k8s.cloudscale.ch/loadbalancer-node-selector" ) type loadbalancer struct { - lbs lbMapper - srv serverMapper - k8s kubernetes.Interface + lbs lbMapper + srv serverMapper + k8s kubernetes.Interface + recorder record.EventRecorder } // GetLoadBalancer returns whether the specified load balancer exists, and @@ -387,24 +398,34 @@ func (l *loadbalancer) EnsureLoadBalancer( return nil, err } - // Refuse to do anything if there are no nodes - if len(nodes) == 0 { - return nil, errors.New( - "no valid nodes for service found, please verify there is " + - "at least one that allows load balancers", + filteredNodes, err := filterNodesBySelector(serviceInfo, nodes) + if err != nil { + return nil, err + } + + if len(filteredNodes) == 0 { + l.recorder.Event( + service, + v1.EventTypeWarning, + "NoValidNodes", + fmt.Sprintf("No valid nodes for service found, "+ + "double-check node-selector annotation: %s: %s", + LoadBalancerNodeSelector, + serviceInfo.annotation(LoadBalancerNodeSelector), + ), ) } // Reconcile - err := reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) { + err = reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) { // Get the desired state from Kubernetes - servers, err := l.srv.mapNodes(ctx, nodes).All() + servers, err := l.srv.mapNodes(ctx, filteredNodes).All() if err != nil { return nil, fmt.Errorf( "unable to get load balancer for %s: %w", service.Name, err) } - return desiredLbState(serviceInfo, nodes, servers) + return desiredLbState(serviceInfo, filteredNodes, servers) }, func() (*lbState, error) { // Get the current state from cloudscale.ch return actualLbState(ctx, &l.lbs, serviceInfo) @@ -442,6 +463,28 @@ func (l *loadbalancer) EnsureLoadBalancer( return result, nil } +func filterNodesBySelector( + serviceInfo *serviceInfo, + nodes []*v1.Node, +) ([]*v1.Node, error) { + selector := labels.Everything() + if v := serviceInfo.annotation(LoadBalancerNodeSelector); v != "" { + var err error + selector, err = labels.Parse(v) + if err != nil { + return nil, fmt.Errorf("unable to parse selector: %w", err) + } + } + selectedNodes := make([]*v1.Node, 0, len(nodes)) + for _, node := range nodes { + if selector.Matches(labels.Set(node.Labels)) { + selectedNodes = append(selectedNodes, node) + } + } + + return selectedNodes, nil +} + // UpdateLoadBalancer updates hosts under the specified load balancer. // Implementations must treat the *v1.Service and *v1.Node // parameters as read-only and not modify them. @@ -461,16 +504,34 @@ func (l *loadbalancer) UpdateLoadBalancer( return err } + filteredNodes, err := filterNodesBySelector(serviceInfo, nodes) + if err != nil { + return err + } + + if len(filteredNodes) == 0 { + l.recorder.Event( + service, + v1.EventTypeWarning, + "NoValidNodes", + fmt.Sprintf("No valid nodes for service found, "+ + "double-check node-selector annotation: %s: %s", + LoadBalancerNodeSelector, + serviceInfo.annotation(LoadBalancerNodeSelector), + ), + ) + } + // Reconcile return reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) { // Get the desired state from Kubernetes - servers, err := l.srv.mapNodes(ctx, nodes).All() + servers, err := l.srv.mapNodes(ctx, filteredNodes).All() if err != nil { return nil, fmt.Errorf( "unable to get load balancer for %s: %w", service.Name, err) } - return desiredLbState(serviceInfo, nodes, servers) + return desiredLbState(serviceInfo, filteredNodes, servers) }, func() (*lbState, error) { // Get the current state from cloudscale.ch return actualLbState(ctx, &l.lbs, serviceInfo) diff --git a/pkg/cloudscale_ccm/loadbalancer_test.go b/pkg/cloudscale_ccm/loadbalancer_test.go new file mode 100644 index 0000000..74043b0 --- /dev/null +++ b/pkg/cloudscale_ccm/loadbalancer_test.go @@ -0,0 +1,281 @@ +package cloudscale_ccm + +import ( + "testing" + + "github.com/cloudscale-ch/cloudscale-go-sdk/v6" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/version" + fakediscovery "k8s.io/client-go/discovery/fake" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" + + "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/testkit" +) + +func TestLoadBalancer_EnsureLoadBalancer(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + annotations map[string]string + nodes []*v1.Node + setup func(*testkit.MockAPIServer) + wantErr bool + wantErrMsg string + wantEvent string + }{ + { + name: "invalid node selector returns error", + annotations: map[string]string{ + LoadBalancerNodeSelector: "invalid===syntax", + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}, + }, + setup: func(apiServer *testkit.MockAPIServer) {}, + wantErr: true, + wantErrMsg: "unable to parse selector", + }, + { + name: "node selector matching no nodes emits warning event", + annotations: map[string]string{ + LoadBalancerNodeSelector: "nonexistent=value", + LoadBalancerName: "test-lb", + LoadBalancerFlavor: "lb-standard", + LoadBalancerZone: "rma1", + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{"env": "prod"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"env": "staging"}}}, + }, + setup: func(apiServer *testkit.MockAPIServer) { + lbUUID := "00000000-0000-0000-0000-000000000001" + poolUUID := "00000000-0000-0000-0000-000000000002" + listenerUUID := "00000000-0000-0000-0000-000000000003" + monitorUUID := "00000000-0000-0000-0000-000000000004" + + apiServer.WithLoadBalancers([]cloudscale.LoadBalancer{{ + HREF: "/v1/load-balancers/" + lbUUID, + UUID: lbUUID, + Name: "test-lb", + Status: "running", + ZonalResource: cloudscale.ZonalResource{ + Zone: cloudscale.Zone{Slug: "rma1"}, + }, + Flavor: cloudscale.LoadBalancerFlavorStub{Slug: "lb-standard"}, + }}) + apiServer.On("/v1/load-balancers/pools", 200, []cloudscale.LoadBalancerPool{{ + HREF: "/v1/load-balancers/pools/" + poolUUID, + UUID: poolUUID, + Name: "tcp", + LoadBalancer: cloudscale.LoadBalancerStub{UUID: lbUUID}, + Algorithm: "round_robin", + Protocol: "tcp", + }}) + apiServer.On("/v1/load-balancers/pools/"+poolUUID+"/members", 200, + []cloudscale.LoadBalancerPoolMember{}) + apiServer.On("/v1/load-balancers/listeners", 200, []cloudscale.LoadBalancerListener{{ + HREF: "/v1/load-balancers/listeners/" + listenerUUID, + UUID: listenerUUID, + Name: "tcp/80", + Pool: &cloudscale.LoadBalancerPoolStub{UUID: poolUUID}, + LoadBalancer: cloudscale.LoadBalancerStub{UUID: lbUUID}, + Protocol: "tcp", + ProtocolPort: 80, + TimeoutClientDataMS: 50000, + TimeoutMemberConnectMS: 5000, + TimeoutMemberDataMS: 50000, + AllowedCIDRs: []string{}, + }}) + apiServer.On("/v1/load-balancers/health-monitors", 200, []cloudscale.LoadBalancerHealthMonitor{{ + HREF: "/v1/load-balancers/health-monitors/" + monitorUUID, + UUID: monitorUUID, + Pool: cloudscale.LoadBalancerPoolStub{UUID: poolUUID}, + Type: "tcp", + DelayS: 2, + TimeoutS: 1, + UpThreshold: 2, + DownThreshold: 3, + }}) + apiServer.On("/v1/floating-ips", 200, []cloudscale.FloatingIP{}) + }, + wantErr: false, + wantEvent: "NoValidNodes", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + apiServer := testkit.NewMockAPIServer() + if tt.setup != nil { + tt.setup(apiServer) + } + apiServer.Start() + defer apiServer.Close() + + fr := record.NewFakeRecorder(10) + client := fake.NewSimpleClientset() + + // ensure `kubeutil.IsKubernetesReleaseOrNewer` works by faking a server version which is usually not set + // when using fake client. + fakeDiscovery, ok := client.Discovery().(*fakediscovery.FakeDiscovery) + require.True(t, ok, "couldn't convert Discovery() to *FakeDiscovery") + + fakeDiscovery.FakedServerVersion = &version.Info{ + Major: "1", + Minor: "34", + } + + l := &loadbalancer{ + lbs: lbMapper{client: apiServer.Client()}, + srv: serverMapper{client: apiServer.Client()}, + k8s: client, + recorder: fr, + } + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: "default", + UID: "test-uid", + Annotations: tt.annotations, + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeLoadBalancer, + Ports: []v1.ServicePort{ + {Protocol: v1.ProtocolTCP, Port: 80, NodePort: 80}, + }, + }, + } + + // Pre-create the service in the fake k8s client so annotations can be applied + _, _ = l.k8s.CoreV1().Services("default").Create(t.Context(), service, metav1.CreateOptions{}) + + _, err := l.EnsureLoadBalancer(t.Context(), "test-cluster", service, tt.nodes) + if tt.wantErr { + assert.Error(t, err) + if tt.wantErrMsg != "" { + assert.Contains(t, err.Error(), tt.wantErrMsg) + } + } else { + assert.NoError(t, err) + } + + if tt.wantEvent != "" { + select { + case event := <-fr.Events: + assert.Contains(t, event, tt.wantEvent) + default: + t.Error("expected warning event but none was recorded") + } + } + }) + } +} + +func TestFilterNodesBySelector(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + annotations map[string]string + nodes []*v1.Node + wantNames []string + wantErr bool + }{ + { + name: "selector matches subset of nodes", + annotations: map[string]string{ + LoadBalancerNodeSelector: "env=prod", + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{"env": "prod"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"env": "staging"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-3", Labels: map[string]string{"env": "prod"}}}, + }, + wantNames: []string{"node-1", "node-3"}, + wantErr: false, + }, + { + name: "invalid selector syntax returns error", + annotations: map[string]string{ + LoadBalancerNodeSelector: "invalid===syntax", + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}, + }, + wantNames: nil, + wantErr: true, + }, + { + name: "no annotation returns all nodes", + annotations: nil, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-2"}}, + }, + wantNames: []string{"node-1", "node-2"}, + wantErr: false, + }, + { + name: "selector matches no nodes returns empty slice", + annotations: map[string]string{ + LoadBalancerNodeSelector: "nonexistent=value", + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{"env": "prod"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"env": "staging"}}}, + }, + wantNames: []string{}, + wantErr: false, + }, + { + name: "selector with multiple requirements", + annotations: map[string]string{ + LoadBalancerNodeSelector: "env=prod,tier=frontend", + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{"env": "prod", "tier": "frontend"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"env": "prod", "tier": "backend"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-3", Labels: map[string]string{"env": "staging", "tier": "frontend"}}}, + }, + wantNames: []string{"node-1"}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Annotations: tt.annotations, + }, + } + info := newServiceInfo(service, "test-cluster") + + got, err := filterNodesBySelector(info, tt.nodes) + if tt.wantErr { + assert.Error(t, err) + assert.Contains(t, err.Error(), "unable to parse selector") + + return + } + assert.NoError(t, err) + + gotNames := make([]string, len(got)) + for i, node := range got { + gotNames[i] = node.Name + } + assert.Equal(t, tt.wantNames, gotNames) + }) + } +} diff --git a/pkg/cloudscale_ccm/reconcile.go b/pkg/cloudscale_ccm/reconcile.go index a387c86..ef7596a 100644 --- a/pkg/cloudscale_ccm/reconcile.go +++ b/pkg/cloudscale_ccm/reconcile.go @@ -22,16 +22,13 @@ type lbState struct { // Pool pointers are used to refer to members by pool, therefore use a // pointer here as well, to not accidentally copy the struct. - pools []*cloudscale.LoadBalancerPool - members map[*cloudscale.LoadBalancerPool][]cloudscale. - LoadBalancerPoolMember - monitors map[*cloudscale.LoadBalancerPool][]cloudscale. - LoadBalancerHealthMonitor + pools []*cloudscale.LoadBalancerPool + members map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerPoolMember + monitors map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerHealthMonitor // Though not currently used that way, listeners are not // necessarily bound to any given pool. - listeners map[*cloudscale.LoadBalancerPool][]cloudscale. - LoadBalancerListener + listeners map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerListener // The assigned floating IPs floatingIPs []string @@ -201,15 +198,6 @@ func desiredLbState( } } - // If there are no pool members, return an error. It would be possible - // to just put a load balancer up that has no function, but it seems - // more useful to err instead, as there's likely something wrong. - if len(s.members[&pool]) == 0 { - return nil, fmt.Errorf( - "service %s: no private address found on any node", - serviceInfo.Service.Name) - } - // Add a health monitor for each pool monitor, err := healthMonitorForPort(serviceInfo) if err != nil { diff --git a/pkg/cloudscale_ccm/reconcile_test.go b/pkg/cloudscale_ccm/reconcile_test.go index 84c9e9a..0833be2 100644 --- a/pkg/cloudscale_ccm/reconcile_test.go +++ b/pkg/cloudscale_ccm/reconcile_test.go @@ -4,11 +4,12 @@ import ( "testing" "time" - "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/actions" - "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/testkit" "github.com/cloudscale-ch/cloudscale-go-sdk/v6" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + + "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/actions" + "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/testkit" ) func TestPoolName(t *testing.T) { @@ -1156,10 +1157,10 @@ func TestLimitSubnets(t *testing.T) { assert.Equal(t, "10.0.1.1", state.members[state.pools[0]][0].Address) assert.Equal(t, "10.0.1.2", state.members[state.pools[0]][1].Address) - // If we have no valid addresses, we get an error + // If we have no valid addresses, we get no error s.Annotations[LoadBalancerListenerAllowedSubnets] = ` ["00000000-0000-0000-0000-000000000003"]` _, err = desiredLbState(i, nodes, servers) - assert.Error(t, err) + assert.NoError(t, err) } diff --git a/pkg/cloudscale_ccm/service_info.go b/pkg/cloudscale_ccm/service_info.go index 5fe5eee..e4a51f2 100644 --- a/pkg/cloudscale_ccm/service_info.go +++ b/pkg/cloudscale_ccm/service_info.go @@ -118,6 +118,8 @@ func (s serviceInfo) annotation(key string) string { return s.annotationOrDefault(key, "50000") case LoadBalancerListenerAllowedSubnets: return s.annotationOrDefault(key, "[]") + case LoadBalancerNodeSelector: + return s.annotationOrDefault(key, "") default: return s.annotationOrElse(key, func() string { klog.Warning("unknown annotation:", key) diff --git a/pkg/internal/integration/service_test.go b/pkg/internal/integration/service_test.go index fb7f1d3..f5b066a 100644 --- a/pkg/internal/integration/service_test.go +++ b/pkg/internal/integration/service_test.go @@ -14,10 +14,7 @@ import ( "strings" "time" - "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/cloudscale_ccm" - "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/kubeutil" - "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/testkit" - cloudscale "github.com/cloudscale-ch/cloudscale-go-sdk/v6" + "github.com/cloudscale-ch/cloudscale-go-sdk/v6" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" @@ -25,17 +22,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + + "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/cloudscale_ccm" + "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/kubeutil" + "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/testkit" ) func (s *IntegrationTestSuite) CreateDeployment( - name string, image string, replicas int32, protocol v1.Protocol, port int32, args ...string) { - - var command []string - - if len(args) > 0 { - command = args[:1] - args = args[1:] - } + name string, image string, replicas int32, protocol v1.Protocol, port int32, options ...func(*appsv1.DeploymentSpec)) { spec := appsv1.DeploymentSpec{ Replicas: &replicas, @@ -53,10 +47,8 @@ func (s *IntegrationTestSuite) CreateDeployment( Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: name, - Image: image, - Command: command, - Args: args, + Name: name, + Image: image, Ports: []v1.ContainerPort{ {ContainerPort: port, Protocol: protocol}, }, @@ -66,6 +58,10 @@ func (s *IntegrationTestSuite) CreateDeployment( }, } + for _, opt := range options { + opt(&spec) + } + _, err := s.k8s.AppsV1().Deployments(s.ns).Create( context.Background(), &appsv1.Deployment{ @@ -100,25 +96,33 @@ type ServicePortSpec struct { TargetPort int32 } -func (s *IntegrationTestSuite) ExposeDeployment( - name string, annotations map[string]string, ports ...ServicePortSpec) { - - servicePorts := make([]v1.ServicePort, len(ports)) - for i, p := range ports { - servicePorts[i] = v1.ServicePort{ - Name: fmt.Sprintf("port%d", i), - Protocol: p.Protocol, - Port: p.Port, - TargetPort: intstr.FromInt32(p.TargetPort), +func WithServicePort(sp ServicePortSpec) func(spec *v1.ServiceSpec) { + return func(spec *v1.ServiceSpec) { + if spec.Ports == nil { + spec.Ports = make([]v1.ServicePort, 0) } + + spec.Ports = append(spec.Ports, v1.ServicePort{ + Name: fmt.Sprintf("port%d", len(spec.Ports)), + Protocol: sp.Protocol, + Port: sp.Port, + TargetPort: intstr.FromInt32(sp.TargetPort), + }) } +} + +func (s *IntegrationTestSuite) ExposeDeployment( + name string, annotations map[string]string, options ...func(spec *v1.ServiceSpec)) { spec := v1.ServiceSpec{ Type: v1.ServiceTypeLoadBalancer, Selector: map[string]string{ "app": name, }, - Ports: servicePorts, + } + + for _, f := range options { + f(&spec) } service, err := s.k8s.CoreV1().Services(s.ns).Get( @@ -140,7 +144,12 @@ func (s *IntegrationTestSuite) ExposeDeployment( s.Require().NoError(err) } else { service.Spec = spec - service.ObjectMeta.Annotations = annotations + if service.Annotations == nil { + service.Annotations = map[string]string{} + } + for key, value := range annotations { + service.Annotations[key] = value + } _, err = s.k8s.CoreV1().Services(s.ns).Update( context.Background(), @@ -271,7 +280,10 @@ func (s *IntegrationTestSuite) ServiceNamed(name string) *v1.Service { } func (s *IntegrationTestSuite) AwaitServiceReady( - name string, timeout time.Duration) *v1.Service { + name string, timeout, minimumWaitDuration time.Duration) *v1.Service { + if minimumWaitDuration > 0 { + <-time.After(minimumWaitDuration) + } var service *v1.Service start := time.Now() @@ -281,7 +293,7 @@ func (s *IntegrationTestSuite) AwaitServiceReady( s.Require().NotNil(service) if service.Annotations != nil { - uuid := service.Annotations["k8s.cloudscale.ch/loadbalancer-uuid"] + uuid := service.Annotations[cloudscale_ccm.LoadBalancerUUID] // EnsureLoadBalancer sets the annotation, and then returns the // load balancer status to Kubernetes. This means there is a short @@ -308,15 +320,15 @@ func (s *IntegrationTestSuite) TestServiceEndToEnd() { // Deploy a TCP server that returns the hostname s.T().Log("Creating nginx deployment") - s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) + s.CreateDeployment("nginx", "docker.io/nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) // Expose the deployment using a LoadBalancer service s.ExposeDeployment("nginx", nil, - ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) // Wait for the service to be ready s.T().Log("Waiting for nginx service to be ready") - service := s.AwaitServiceReady("nginx", 180*time.Second) + service := s.AwaitServiceReady("nginx", 180*time.Second, 5*time.Second) s.Require().NotNil(service) // Ensure the annotations are set @@ -366,6 +378,141 @@ func (s *IntegrationTestSuite) TestServiceEndToEnd() { s.Assert().NotContains(lines, "Warn") } +// TestServiceEndToEndNodeSelector tests whether annotating a service with loadbalancer-node-selector works as intended. +// +// It first deploys an nginx on the first node and exposes it using a service having the annotation set to this node as well. +// After verifying accessing the nginx works, the test adjusts the annotation to the second node and verifies accessing nginx doesn't work anymore. +// As the last step, it sets the nodeSelector to a not existing label, making sure the LB has no pool members anymore. +// +// This is dependent on setting `externalTrafficPolicy: Local` on the service, since otherwise the request would just be forwarded to the other node. +func (s *IntegrationTestSuite) TestServiceEndToEndNodeSelector() { + start := time.Now() + + // fetch a valid node from k8s, since the prefix varies between test runs on CI + nodes, err := s.k8s.CoreV1().Nodes().List(s.T().Context(), metav1.ListOptions{ + LabelSelector: "!node-role.kubernetes.io/control-plane", + }) + if !s.Assert().NoError(err) || !s.Assert().NotNil(nodes) { + return + } + if !s.Assert().GreaterOrEqual(len(nodes.Items), 2) { + s.T().Log("invalid amount of nodes received", "nodes", nodes.Items) + return + } + firstNodeName := nodes.Items[0].Name + secondNodeName := nodes.Items[1].Name + + s.T().Log("Creating nginx deployment") + s.CreateDeployment("nginx-selected", "docker.io/nginxdemos/hello:plain-text", 1, v1.ProtocolTCP, 80, func(spec *appsv1.DeploymentSpec) { + spec.Template.Spec.NodeSelector = map[string]string{ + "kubernetes.io/hostname": firstNodeName, + } + }) + + s.T().Log("exposing service with valid node-selector") + s.ExposeDeployment("nginx-selected", map[string]string{cloudscale_ccm.LoadBalancerNodeSelector: fmt.Sprintf("kubernetes.io/hostname=%s", firstNodeName)}, + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}), + func(spec *v1.ServiceSpec) { + spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + }) + + _ = s.verifyLBAvailability("nginx-selected", start, true) + + // adjust LoadBalancer to second node + // and ensure we can't reach nginx anymore since it's on another node and externalTrafficPolicy is Local. + s.T().Log("exposing service with valid node-selector but targeting a node without pod") + s.ExposeDeployment("nginx-selected", map[string]string{cloudscale_ccm.LoadBalancerNodeSelector: fmt.Sprintf("kubernetes.io/hostname=%s", secondNodeName)}, + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}), + func(spec *v1.ServiceSpec) { + spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + }) + + service := s.verifyLBAvailability("nginx-selected", start, false) + + // verify only one pool member exists, abort test if there was an error checking this. + s.Require().NoError(s.assertLBPoolMembersLen(service, 1)) + + // adjust LoadBalancer to a selector where no nodes are existing + s.T().Log("exposing service with node-selector targeting no nodes") + s.ExposeDeployment("nginx-selected", map[string]string{cloudscale_ccm.LoadBalancerNodeSelector: "kubernetes.io/hostname=notexisting"}, + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}), + func(spec *v1.ServiceSpec) { + spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + }) + + service = s.verifyLBAvailability("nginx-selected", start, false) + + // verify only one pool member exists, abort test if there was an error checking this. + s.Require().NoError(s.assertLBPoolMembersLen(service, 0)) +} + +// verifyLBAvailability verifies that the service of type LoadBalancer is available. +// It checks annotations and reachability, as well as verifying there are no error/warning logs in CCM. +// +// N.B. if the service is not found, it'll abort the test. +func (s *IntegrationTestSuite) verifyLBAvailability(name string, start time.Time, reachable bool) *v1.Service { + s.T().Helper() + + // Wait for the service to be ready + s.T().Log("Waiting for nginx service to be ready") + service := s.AwaitServiceReady(name, 180*time.Second, 30*time.Second) + s.Require().NotNil(service) + + // Ensure the annotations are set + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerUUID]) + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerConfigVersion]) + s.Assert().NotEmpty( + service.Annotations[cloudscale_ccm.LoadBalancerZone]) + + // Ensure we have two public IP addresses + s.Require().Len(service.Status.LoadBalancer.Ingress, 2) + addr := service.Status.LoadBalancer.Ingress[0].IP + + response, err := testkit.HelloNginx(addr, 80) + if reachable { + s.Assert().NoError(err, "request failed") + if s.Assert().NotNil(response, "response is empty") { + s.Assert().NotEmpty(response.ServerName) + } + } else { + s.Assert().Error(err, "request successful") + s.Assert().Nil(response, "response is empty") + } + + // we expect no warnings. Errors can happen for a short time when setting to 0 nodes reachable. + s.T().Log("Checking log output for errors/warnings") + lines := s.CCMLogs(start) + + s.Assert().NotContains(lines, "warn") + s.Assert().NotContains(lines, "Warn") + return service +} + +// assertLBPoolMembersLen fetches the loadbalancer pool members and asserts the size is equal given len. +func (s *IntegrationTestSuite) assertLBPoolMembersLen(service *v1.Service, len int) error { + pools, err := s.api.LoadBalancerPools.List(s.T().Context()) + if err != nil { + return fmt.Errorf("unable to list loadbalancer pools: %w", err) + } + + var pool cloudscale.LoadBalancerPool + for _, p := range pools { + if p.LoadBalancer.UUID == service.Annotations[cloudscale_ccm.LoadBalancerUUID] { + pool = p + break + } + } + members, err := s.api.LoadBalancerPoolMembers.List(s.T().Context(), pool.UUID) + if err != nil { + return fmt.Errorf("unable to list pool members: %w", err) + } + + s.Assert().Len(members, len) + return nil +} + func (s *IntegrationTestSuite) TestServiceEndToEndUDP() { // Note the start for the log @@ -373,23 +520,21 @@ func (s *IntegrationTestSuite) TestServiceEndToEndUDP() { // Deploy a UDP echo server s.T().Log("Creating udp-echo deployment") - s.CreateDeployment("udp-echo", "docker.io/alpine/socat", 2, v1.ProtocolUDP, 5353, - "socat", - "-v", - "UDP4-RECVFROM:5353,fork", - "SYSTEM:echo 'I could tell you a UDP joke, but you might not get it...',pipes", - ) + s.CreateDeployment("udp-echo", "docker.io/alpine/socat", 2, v1.ProtocolUDP, 5353, func(spec *appsv1.DeploymentSpec) { + spec.Template.Spec.Containers[0].Command = []string{"socat"} + spec.Template.Spec.Containers[0].Args = []string{"-v", "UDP4-RECVFROM:5353,fork", "SYSTEM:echo 'I could tell you a UDP joke, but you might not get it...',pipes"} + }) // Expose the deployment using a LoadBalancer service with UDP annotations s.ExposeDeployment("udp-echo", map[string]string{ "k8s.cloudscale.ch/loadbalancer-health-monitor-type": "udp-connect", "k8s.cloudscale.ch/loadbalancer-health-monitor-delay-s": "3", "k8s.cloudscale.ch/loadbalancer-health-monitor-timeout-s": "2", - }, ServicePortSpec{Protocol: v1.ProtocolUDP, Port: 5000, TargetPort: 5353}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolUDP, Port: 5000, TargetPort: 5353})) // Wait for the service to be ready s.T().Log("Waiting for udp-echo service to be ready") - service := s.AwaitServiceReady("udp-echo", 180*time.Second) + service := s.AwaitServiceReady("udp-echo", 180*time.Second, 5*time.Second) s.Require().NotNil(service) // Ensure the annotations are set @@ -546,13 +691,13 @@ func (s *IntegrationTestSuite) TestServiceEndToEndDualProtocol() { s.Require().NoError(err) s.ExposeDeployment("dns-server", nil, - ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 53, TargetPort: 53}, - ServicePortSpec{Protocol: v1.ProtocolUDP, Port: 53, TargetPort: 53}, + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 53, TargetPort: 53}), + WithServicePort(ServicePortSpec{Protocol: v1.ProtocolUDP, Port: 53, TargetPort: 53}), ) // Wait for the service to be ready s.T().Log("Waiting for dns-server service to be ready") - svc := s.AwaitServiceReady("dns-server", 180*time.Second) + svc := s.AwaitServiceReady("dns-server", 180*time.Second, 5*time.Second) s.Require().NotNil(svc) // Ensure the annotations are set @@ -661,16 +806,16 @@ func (s *IntegrationTestSuite) TestServiceVIPAddresses() { // Deploy a TCP server that returns something s.T().Log("Creating foo deployment") - s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) + s.CreateDeployment("nginx", "docker.io/nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) // Expose the deployment using a LoadBalancer service s.ExposeDeployment("nginx", map[string]string{ "k8s.cloudscale.ch/loadbalancer-vip-addresses": fmt.Sprintf( `[{"subnet": "%s"}]`, subnet), - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) s.T().Log("Waiting for nginx service to be ready") - service := s.AwaitServiceReady("nginx", 180*time.Second) + service := s.AwaitServiceReady("nginx", 180*time.Second, 5*time.Second) s.Require().NotNil(service) // Use a worker as a jumphost to check if we get "foo" @@ -763,11 +908,11 @@ func (s *IntegrationTestSuite) TestServiceTrafficPolicyLocal() { } // Expose the deployment using a LoadBalancer service - s.ExposeDeployment("peeraddr", nil, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 3000}) + s.ExposeDeployment("peeraddr", nil, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 3000})) // Wait for the service to be ready s.T().Log("Waiting for peeraddr service to be ready") - service := s.AwaitServiceReady("peeraddr", 180*time.Second) + service := s.AwaitServiceReady("peeraddr", 180*time.Second, 5*time.Second) s.Require().NotNil(service) // In its initial state, expect a natted IP address @@ -786,7 +931,7 @@ func (s *IntegrationTestSuite) TestServiceTrafficPolicyLocal() { ) s.Require().NoError(err) - service = s.AwaitServiceReady("peeraddr", 1*time.Second) + service = s.AwaitServiceReady("peeraddr", 1*time.Second, 0) s.Require().NotNil(service) // Now expect to see an IP address from the node's private network @@ -803,7 +948,7 @@ func (s *IntegrationTestSuite) TestServiceTrafficPolicyLocal() { ) s.Require().NoError(err) - service = s.AwaitServiceReady("peeraddr", 1*time.Second) + service = s.AwaitServiceReady("peeraddr", 1*time.Second, 0) s.Require().NotNil(service) assertPrefix(addr, &cluster_policy_prefix) @@ -827,17 +972,17 @@ func (s *IntegrationTestSuite) RunTestServiceWithFloatingIP( // Deploy a TCP server that returns the hostname s.T().Log("Creating nginx deployment") - s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) + s.CreateDeployment("nginx", "docker.io/nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) // Expose the deployment using a LoadBalancer service with Floating IP s.ExposeDeployment("nginx", map[string]string{ "k8s.cloudscale.ch/loadbalancer-floating-ips": fmt.Sprintf( `["%s"]`, fip.Network), - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) // Wait for the service to be ready s.T().Log("Waiting for nginx service to be ready") - service := s.AwaitServiceReady("nginx", 180*time.Second) + service := s.AwaitServiceReady("nginx", 180*time.Second, 5*time.Second) s.Require().NotNil(service) // Ensure that we get responses from two different pods (round-robin) @@ -889,17 +1034,17 @@ func (s *IntegrationTestSuite) TestFloatingIPConflicts() { // Deploy a TCP server that returns the hostname s.T().Log("Creating nginx deployment") - s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) + s.CreateDeployment("nginx", "docker.io/nginxdemos/hello:plain-text", 2, v1.ProtocolTCP, 80) // Expose the deployment using a LoadBalancer service with Floating IP s.ExposeDeployment("nginx", map[string]string{ "k8s.cloudscale.ch/loadbalancer-floating-ips": fmt.Sprintf( `["%s"]`, regional.Network), - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) // Wait for the service to be ready s.T().Log("Waiting for nginx service to be ready") - service := s.AwaitServiceReady("nginx", 180*time.Second) + service := s.AwaitServiceReady("nginx", 180*time.Second, 5*time.Second) s.Require().NotNil(service) // Configure a second service with the same floating IP @@ -908,7 +1053,7 @@ func (s *IntegrationTestSuite) TestFloatingIPConflicts() { s.ExposeDeployment("service-2", map[string]string{ "k8s.cloudscale.ch/loadbalancer-floating-ips": fmt.Sprintf( `["%s"]`, regional.Network), - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) // Wait for a moment before checking the log time.Sleep(5 * time.Second) @@ -929,13 +1074,19 @@ func (s *IntegrationTestSuite) TestServiceProxyProtocol() { // Deploy our http-echo server to check for proxy connections s.T().Log("Creating http-echo deployment", "branch", branch) - s.CreateDeployment("http-echo", "golang", 2, v1.ProtocolTCP, 80, "bash", "-c", fmt.Sprintf(` - git clone https://github.com/cloudscale-ch/cloudscale-cloud-controller-manager ccm; - cd ccm; - git checkout %s || exit 1; - cd cmd/http-echo; - go run main.go -host 0.0.0.0 -port 80 - `, branch)) + s.CreateDeployment("http-echo", "docker.io/golang", 2, v1.ProtocolTCP, 80, func(spec *appsv1.DeploymentSpec) { + spec.Template.Spec.Containers[0].Command = []string{"bash"} + spec.Template.Spec.Containers[0].Args = []string{ + "-c", + fmt.Sprintf(` + git clone https://github.com/cloudscale-ch/cloudscale-cloud-controller-manager ccm; + cd ccm; + git checkout %s || exit 1; + cd cmd/http-echo; + go run main.go -host 0.0.0.0 -port 80 + `, branch), + } + }) // Expose the deployment using a LoadBalancer service s.ExposeDeployment("http-echo", map[string]string{ @@ -944,11 +1095,11 @@ func (s *IntegrationTestSuite) TestServiceProxyProtocol() { // Make sure to get the default behavior of older Kubernetes releases, // even on newer releases. "k8s.cloudscale.ch/loadbalancer-ip-mode": "VIP", - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) // Wait for the service to be ready s.T().Log("Waiting for http-echo service to be ready") - service := s.AwaitServiceReady("http-echo", 180*time.Second) + service := s.AwaitServiceReady("http-echo", 180*time.Second, 5*time.Second) s.Require().NotNil(service) addr := service.Status.LoadBalancer.Ingress[0].IP @@ -992,7 +1143,7 @@ func (s *IntegrationTestSuite) TestServiceProxyProtocol() { "%s.cust.cloudscale.ch", strings.ReplaceAll(addr, ".", "-"), ), - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) s.T().Log("Testing PROXY protocol from inside with workaround") used = s.RunJob("curlimages/curl", 90*time.Second, "curl", "-s", url) @@ -1005,7 +1156,7 @@ func (s *IntegrationTestSuite) TestServiceProxyProtocol() { if newer { s.ExposeDeployment("http-echo", map[string]string{ "k8s.cloudscale.ch/loadbalancer-pool-protocol": "proxy", - }, ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80}) + }, WithServicePort(ServicePortSpec{Protocol: v1.ProtocolTCP, Port: 80, TargetPort: 80})) s.T().Log("Testing PROXY protocol on newer Kubernetes releases") used = s.RunJob("curlimages/curl", 90*time.Second, "curl", "-s", url) diff --git a/pkg/internal/testkit/http.go b/pkg/internal/testkit/http.go index 9658654..c01cf40 100644 --- a/pkg/internal/testkit/http.go +++ b/pkg/internal/testkit/http.go @@ -19,9 +19,8 @@ type HelloResponse struct { } func HelloNginx(addr string, port uint16) (*HelloResponse, error) { - body, err := HTTPRead( - "http://" + net.JoinHostPort(addr, strconv.FormatUint( - uint64(port), 10))) + body, err := HTTPRead("http://" + net.JoinHostPort(addr, strconv.FormatUint( + uint64(port), 10))) if err != nil { return nil, err }