diff --git a/.openshift-tests-extension/openshift_payload_cluster-version-operator.json b/.openshift-tests-extension/openshift_payload_cluster-version-operator.json index e9be447240..1840d67638 100644 --- a/.openshift-tests-extension/openshift_payload_cluster-version-operator.json +++ b/.openshift-tests-extension/openshift_payload_cluster-version-operator.json @@ -72,5 +72,21 @@ "source": "openshift:payload:cluster-version-operator", "lifecycle": "blocking", "environmentSelector": {} + }, + { + "name": "[Jira:\"Cluster Version Operator\"] cluster-version-operator cvo drops invalid conditional edges", + "labels": { + "46422": {}, + "ConnectedOnly": {}, + "Lifecycle:informing": {}, + "Low": {}, + "Serial": {} + }, + "resources": { + "isolation": {} + }, + "source": "openshift:payload:cluster-version-operator", + "lifecycle": "informing", + "environmentSelector": {} } ] \ No newline at end of file diff --git a/pkg/payload/precondition/clusterversion/upgradeable.go b/pkg/payload/precondition/clusterversion/upgradeable.go index 74d4934915..ce8ba75820 100644 --- a/pkg/payload/precondition/clusterversion/upgradeable.go +++ b/pkg/payload/precondition/clusterversion/upgradeable.go @@ -180,3 +180,17 @@ func GetCurrentVersion(history []configv1.UpdateHistory) string { } return "" } + +func GetCurrentVersionAndImage(history []configv1.UpdateHistory) (string, string) { + for _, h := range history { + if h.State == configv1.CompletedUpdate { + klog.V(2).Infof("Cluster current version=%s", h.Version) + return h.Version, h.Image + } + } + // Empty history should only occur if method is called early in startup before history is populated. + if len(history) != 0 { + return history[len(history)-1].Version, history[len(history)-1].Image + } + return "", "" +} diff --git a/test/cvo/cvo.go b/test/cvo/cvo.go index 7b3bf8ed89..c5c3f80b93 100644 --- a/test/cvo/cvo.go +++ b/test/cvo/cvo.go @@ -4,15 +4,26 @@ package cvo import ( "context" + "fmt" + "strings" + "time" + "github.com/blang/semver/v4" g "github.com/onsi/ginkgo/v2" o "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + ote "github.com/openshift-eng/openshift-tests-extension/pkg/ginkgo" + configv1 "github.com/openshift/api/config/v1" + clientconfigv1 "github.com/openshift/client-go/config/clientset/versioned" + "github.com/openshift/cluster-version-operator/pkg/external" + "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion" "github.com/openshift/cluster-version-operator/test/oc" ocapi "github.com/openshift/cluster-version-operator/test/oc/api" "github.com/openshift/cluster-version-operator/test/util" @@ -99,4 +110,227 @@ var _ = g.Describe(`[Jira:"Cluster Version Operator"] cluster-version-operator`, sccAnnotation := cvoPod.Annotations["openshift.io/scc"] o.Expect(sccAnnotation).To(o.Equal("hostaccess"), "Expected the annotation 'openshift.io/scc annotation' on pod %s to have the value 'hostaccess', but got %s", cvoPod.Name, sccAnnotation) }) + + g.It("cvo drops invalid conditional edges", ote.Informing(), g.Label("46422", "Low", "ConnectedOnly", "Serial"), func() { + ctx := context.Background() + err := util.SkipIfHypershift(ctx, restCfg) + o.Expect(err).NotTo(o.HaveOccurred(), "Failed to determine if cluster is HyperShift") + err = util.SkipIfMicroshift(ctx, restCfg) + o.Expect(err).NotTo(o.HaveOccurred(), "Failed to determine if cluster is MicroShift") + err = util.SkipIfNetworkRestricted(ctx, restCfg, util.FauxinnatiAPIURL) + o.Expect(err).NotTo(o.HaveOccurred(), "Failed to check network connectivity") + + ns, err := kubeClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-ns-46422-", + }, + }, metav1.CreateOptions{}) + o.Expect(err).NotTo(o.HaveOccurred()) + defer func() { + _ = kubeClient.CoreV1().Namespaces().Delete(ctx, ns.Name, metav1.DeleteOptions{}) + }() + + clientconfigv1, err := clientconfigv1.NewForConfig(restCfg) + o.Expect(err).NotTo(o.HaveOccurred()) + cv, err := clientconfigv1.ConfigV1().ClusterVersions().Get(ctx, "version", metav1.GetOptions{}) + o.Expect(err).NotTo(o.HaveOccurred()) + o.Expect(cv).NotTo(o.BeNil()) + + oldUpstream := cv.Spec.Upstream + oldChannel := cv.Spec.Channel + defer func() { + _, err := util.PatchUpstream(ctx, clientconfigv1, string(oldUpstream), oldChannel) + o.Expect(err).NotTo(o.HaveOccurred()) + }() + + versionString, image := clusterversion.GetCurrentVersionAndImage(cv.Status.History) + o.Expect(versionString).NotTo(o.BeNil()) + o.Expect(image).NotTo(o.BeNil()) + + ver, err := semver.Make(versionString) + o.Expect(err).NotTo(o.HaveOccurred()) + channel := fmt.Sprintf("candidate-%d.%d", ver.Major, ver.Minor) + + nodes := []util.Node{ + {Version: versionString, Payload: image, Channel: channel}, + } + edges := []util.Edge{} + conditionalEdges := []util.ConditionalEdge{ + { + Edge: util.StringEdge{ + From: versionString, + To: "", + }, + Risks: []util.Risk{ + { + Url: "https://bugzilla.redhat.com/show_bug.cgi?id=123456", + Name: "Bug 123456", + Message: "Empty target node", + Rule: map[string]interface{}{"type": "Always"}, + }, + }, + }, + } + g.By("create upstream graph template with invalid conditional edges") + buf, err := util.CreateGraphTemplate(nodes, edges, conditionalEdges) + o.Expect(err).NotTo(o.HaveOccurred()) + + label := map[string]string{"app": "test-update-service"} + g.By("run update service with the graph template") + deployment1, err := util.RunUpdateService(ctx, kubeClient, ns.Name, buf.String(), label) + o.Expect(err).NotTo(o.HaveOccurred()) + logger.Info(fmt.Sprintf("deployment1: %s, %s, %d", deployment1.Spec.Template.Spec.Containers[0].Image, deployment1.Spec.Template.Spec.Containers[0].Ports[0].Name, deployment1.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort)) + defer func() { _ = util.DeleteDeployment(ctx, kubeClient, ns.Name, deployment1.Name) }() + + service1, url1, err := util.CreateService(ctx, kubeClient, ns.Name, deployment1, 46422) + o.Expect(err).NotTo(o.HaveOccurred()) + logger.Info("Update Service URL: " + url1.String()) + logger.Info(fmt.Sprintf("Service: %s, %d, %v", service1.Spec.Ports[0].Name, &service1.Spec.Ports[0].Port, service1.Spec.Ports[0].TargetPort)) + defer func() { _ = util.CleanupService(ctx, kubeClient, ns.Name, service1.Name) }() + + policyName := service1.Name + "-policy" + _, err = util.CreateNetworkPolicy(ctx, kubeClient, ns.Name, policyName, label) + o.Expect(err).NotTo(o.HaveOccurred()) + pollErr := wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { + if res, getErr := util.NetworkRestricted(ctx, restCfg, url1.String()); getErr != nil { + return false, getErr + } else if res { + return false, fmt.Errorf("expected network to be available, but it is not") + } + return true, nil + }) + o.Expect(pollErr).NotTo(o.HaveOccurred(), fmt.Sprintf("Failed to verify network connectivity to the update service at %s: %v", url1.String(), pollErr)) + defer func() { + _ = util.DeleteNetworkPolicy(ctx, kubeClient, ns.Name, policyName) + }() + + g.By("patch upstream with null target node conditional edge") + o.Expect(err).NotTo(o.HaveOccurred()) + _, err = util.PatchUpstream(ctx, clientconfigv1, url1.String(), channel) + o.Expect(err).NotTo(o.HaveOccurred()) + pollErr = wait.PollUntilContextTimeout(ctx, 5*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) { + cv, err := clientconfigv1.ConfigV1().ClusterVersions().Get(ctx, "version", metav1.GetOptions{}) + if err != nil { + return false, err + } + for _, condition := range cv.Status.Conditions { + if condition.Type == configv1.ClusterStatusConditionType("RetrievedUpdates") { + if condition.Status != configv1.ConditionFalse { + return false, nil + } + if !strings.Contains(condition.Message, "no node for conditional update") { + return false, nil + } + return true, nil + } + } + return false, nil + }) + o.Expect(pollErr).NotTo(o.HaveOccurred(), fmt.Sprintf("Failed to verify the cluster version condition for the null target node conditional edge: %v", pollErr)) + _, err = util.PatchUpstream(ctx, clientconfigv1, string(oldUpstream), oldChannel) + o.Expect(err).NotTo(o.HaveOccurred()) + defer func() { + _, err = util.PatchUpstream(ctx, clientconfigv1, string(oldUpstream), oldChannel) + o.Expect(err).NotTo(o.HaveOccurred()) + }() + err = util.CleanupService(ctx, kubeClient, ns.Name, service1.Name) + o.Expect(err).NotTo(o.HaveOccurred()) + err = util.DeleteDeployment(ctx, kubeClient, ns.Name, deployment1.Name) + o.Expect(err).NotTo(o.HaveOccurred()) + + g.By("prepare new graph file") + targetVersion := fmt.Sprintf("%d.%d.999", ver.Major, ver.Minor) + nodes = append(nodes, util.Node{ + Version: targetVersion, + Payload: "quay.io/openshift-release-dev/ocp-release@sha256:fc88d0bf145c81d9c9e5b8a1cfcaa7cbbacfa698f0c3a252fbbdbfbb1b2", + Channel: channel, + }) + edges = []util.Edge{} + conditionalEdges = []util.ConditionalEdge{ + { + Edge: util.StringEdge{ + From: versionString, + To: targetVersion, + }, + Risks: []util.Risk{ + { + Url: "// example.com", + Name: "InvalidURL", + Message: "Invalid URL.", + Rule: map[string]interface{}{"type": "PromQL", "promql": map[string]interface{}{"promql": "cluster_installer"}}, + }, + { + Url: "https://bug.example.com/b", + Name: "TypeNull", + Message: "MatchingRules type is null.", + Rule: "{\"type\": \"\"}", + }, + { + Url: "https://bug.example.com/c", + Name: "InvalidMatchingRulesType", + Message: "MatchingRules type is invalid, support Always and PromQL.", + Rule: map[string]interface{}{"type": "nonexist", "promql": map[string]interface{}{"promql": "group(cluster_version_available_updates{channel=\"buggy\"})\nor\n0 * group(cluster_version_available_updates{channel!=\"buggy\"})"}}, + }, + { + Url: "https://bug.example.com/d", + Name: "InvalidPromQLQueryReturnValue", + Message: "PromQL query return value is not supported, support 0 and 1.", + Rule: map[string]interface{}{"type": "PromQL", "promql": map[string]interface{}{"promql": "max(cluster_version)"}}, + }, + { + Url: "https://bug.example.com/d", + Name: "InvalidPromQLQuery", + Message: "Invalid PromQL Query.", + Rule: map[string]interface{}{"type": "PromQL", "promql": map[string]interface{}{"promql": "cluster_infrastructure_provider{type=~\"VSphere|None\"}"}}, + }, + }, + }, + } + g.By("create upstream graph template with multi risks conditional edges") + buf, err = util.CreateGraphTemplate(nodes, edges, conditionalEdges) + o.Expect(err).NotTo(o.HaveOccurred()) + + g.By("run update service with the multi risks graph template") + deployment2, err := util.RunUpdateService(ctx, kubeClient, ns.Name, buf.String(), label) + o.Expect(err).NotTo(o.HaveOccurred()) + defer func() { _ = util.DeleteDeployment(ctx, kubeClient, ns.Name, deployment2.Name) }() + + service2, url2, err := util.CreateService(ctx, kubeClient, ns.Name, deployment2, 46422) + o.Expect(err).NotTo(o.HaveOccurred()) + logger.Info("Update Service URL: " + url2.String()) + logger.Info(fmt.Sprintf("Service: %s, %d, %v", service2.Spec.Ports[0].Name, &service2.Spec.Ports[0].Port, service2.Spec.Ports[0].TargetPort)) + pollErr = wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { + if res, getErr := util.NetworkRestricted(ctx, restCfg, url2.String()); getErr != nil { + return false, getErr + } else if res { + return false, fmt.Errorf("expected network to be available, but it is not") + } + return true, nil + }) + o.Expect(pollErr).NotTo(o.HaveOccurred(), fmt.Sprintf("Failed to verify network connectivity to the update service at %s: %v", url2.String(), pollErr)) + defer func() { _ = util.CleanupService(ctx, kubeClient, ns.Name, service2.Name) }() + + g.By("patch upstream with multi risks conditional edge") + _, err = util.PatchUpstream(ctx, clientconfigv1, url2.String(), channel) + o.Expect(err).NotTo(o.HaveOccurred()) + pollErr = wait.PollUntilContextTimeout(ctx, 5*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) { + cv, err := clientconfigv1.ConfigV1().ClusterVersions().Get(ctx, "version", metav1.GetOptions{}) + if err != nil { + return false, err + } + if cv.Status.AvailableUpdates != nil { + return false, nil + } + for _, condition := range cv.Status.Conditions { + if condition.Type == configv1.ClusterStatusConditionType("RetrievedUpdates") { + if condition.Status != configv1.ConditionTrue { + return false, nil + } + return true, nil + } + } + return true, nil + }) + o.Expect(pollErr).NotTo(o.HaveOccurred(), fmt.Sprintf("Failed to verify the cluster version condition for the multi risks target node conditional edge: %v", pollErr)) + }) }) diff --git a/test/util/util.go b/test/util/util.go index 0ebfc395fd..676aace521 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -3,17 +3,26 @@ package util import ( "bytes" "context" + "encoding/json" "fmt" + "net" + "net/url" + "strconv" "strings" + "text/template" "time" g "github.com/onsi/ginkgo/v2" o "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -27,6 +36,36 @@ import ( "github.com/openshift/cluster-version-operator/pkg/external" ) +// Define test waiting time const +const ( + defaultMaxWaitingTime = 200 * time.Second + defaultPollingTime = 2 * time.Second +) + +type Node struct { + Version string + Payload string + Channel string +} +type Edge struct { + From uint64 + To uint64 +} +type StringEdge struct { + From string + To string +} +type ConditionalEdge struct { + Edge StringEdge + Risks []Risk +} +type Risk struct { + Url string + Name string + Message string + Rule interface{} +} + // IsHypershift checks if running on a HyperShift hosted cluster // Refer to https://github.com/openshift/origin/blob/31704414237b8bd5c66ad247c105c94abc9470b1/test/extended/util/framework.go#L2301 func IsHypershift(ctx context.Context, restConfig *rest.Config) (bool, error) { @@ -212,3 +251,287 @@ func SkipIfNetworkRestricted(ctx context.Context, restConfig *rest.Config, urls } return nil } + +// WaitForDeploymentReady waits for the deployment become ready +func WaitForDeploymentReady(c kubernetes.Interface, deployName, namespace string, revision int64) error { + return WaitForDeploymentReadyWithTimeout(c, deployName, namespace, revision, defaultMaxWaitingTime) +} + +// WaitForDeploymentReadyWithTimeout waits for the deployment become ready with defined timeout +func WaitForDeploymentReadyWithTimeout(c kubernetes.Interface, deployName, namespace string, revision int64, timeout time.Duration) error { + var ( + deployment *appsv1.Deployment + getErr error + ) + pollErr := wait.PollUntilContextTimeout(context.Background(), defaultPollingTime, timeout, true, func(context.Context) (isReady bool, err error) { + deployment, getErr = c.AppsV1().Deployments(namespace).Get(context.Background(), deployName, metav1.GetOptions{}) + if getErr != nil { + return false, nil + } + if deployment.Status.AvailableReplicas == *deployment.Spec.Replicas { + return true, nil + } + if revision >= 0 && deployment.Status.ObservedGeneration != revision { + return false, nil + } + return false, nil + }) + + return pollErr +} + +// CreateGraphTemplate returns a graph template string which can be patch as upstream +// for example: +// +// nodes := []util.Node{ +// {Version: "4.22.0-0.nightly-2026-03-17-033403", Payload: "4.22.0-0.nightly-2023-0", Channel: "stable-4.22"}, +// {Version: "4.22.0-ec.2", Payload: "quay.io/openshift-release-dev/ocp-release@sha256:fc88d0bf145c81989a4116bbb0e3d2724d9ab937efb7d217a10e7d7ff3031c50", Channel: "stable-4.22"}, +// {Version: "4.22.0-ec.3", Payload: "quay.io/openshift-release-dev/ocp-release@sha256:58b98da1492b3f4af6129c4684b8e8cde4f2dc197e4b483bb6025971d59f92a5", Channel: "stable-4.22"}, +// } +// +// edges := []util.Edge{ +// {From: 0, To: 1}, +// } +// +// conditionalEdges := []util.ConditionalEdge{ +// { +// Edge: util.StringEdge{ +// From: versionString, +// To: "", +// }, +// Risks: []util.Risk{ +// { +// Url: "https://bugzilla.redhat.com/show_bug.cgi?id=123456", +// Name: "Bug 123456", +// Message: "Empty target node", +// Rule: map[string]interface{}{"type": "Always"}, +// }, +// }, +// }, +// } +// +// buf, err := util.CreateGraphTemplate(nodes, edges, conditionalEdges) +func CreateGraphTemplate( + graphNodes []Node, + graphEdges []Edge, + graphConditionalEdges []ConditionalEdge) (*strings.Builder, error) { + + t := template.Must(template.New("text").Funcs(template.FuncMap{ + "marshal": func(v interface{}) (string, error) { + var ruleStr string + switch v := v.(type) { + case string: + if !json.Valid([]byte(v)) { + promqlRule := map[string]interface{}{ + "type": "PromQL", + "promql": map[string]string{ + "promql": v, + }, + } + bytes, err := json.Marshal(promqlRule) + if err != nil { + return "", fmt.Errorf("failed to marshal promql rule: %w", err) + } + ruleStr = string(bytes) + } else { + ruleStr = v + } + default: + bytes, err := json.Marshal(v) + if err != nil { + return "", fmt.Errorf("failed to marshal rule: %w", err) + } + ruleStr = string(bytes) + } + return ruleStr, nil + }, + }).Parse(strings.TrimSpace(` +{ + "nodes": [{{range $index, $node := .Nodes}}{{if $index}},{{end}} + {"version": "{{$node.Version}}", "payload": "{{$node.Payload}}", "metadata": {"io.openshift.upgrades.graph.release.channels": "{{$node.Channel}}"}}{{end}} + ], + "edges": [{{range $index, $edge := .Edges}}{{if $index}},{{end}} + [{{$edge.From}}, {{$edge.To}}]{{end}} + ], + "conditionalEdges": [{{range $outerindex, $ce := .ConditionalEdges}}{{if $outerindex}},{{end}} + { + "edges": [{"from": "{{$ce.Edge.From}}", "to": "{{$ce.Edge.To}}"}], + "risks": [{{range $index, $risk := $ce.Risks}}{{if $index}},{{end}} + { + "url": "{{$risk.Url}}", + "name": "{{$risk.Name}}", + "message": "{{$risk.Message}}", + "matchingRules": [{{marshal $risk.Rule}}] + }{{end}} + ] + }{{end}} + ] +} +`))) + var buf strings.Builder + err := t.Execute(&buf, struct { + Nodes []Node + Edges []Edge + ConditionalEdges []ConditionalEdge + }{Nodes: graphNodes, Edges: graphEdges, ConditionalEdges: graphConditionalEdges}) + + if err != nil { + return nil, err + } + + return &buf, nil +} + +func RunUpdateService(ctx context.Context, c kubernetes.Interface, namespace string, graph string, label map[string]string) (*appsv1.Deployment, error) { + deployment, err := c.AppsV1().Deployments(namespace).Create(ctx, + &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-update-service-", + Labels: label, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: label, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: label, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "update-service", + Image: "image-registry.openshift-image-registry.svc:5000/openshift/tools:latest", + Args: []string{ + "/bin/sh", + "-c", + fmt.Sprintf(`DIR="$(mktemp -d)" && +cd "${DIR}" && +printf '%%s' '%s' >graph && +python3 -m http.server 8000 +`, graph), + }, + Ports: []corev1.ContainerPort{{ + Name: "update-service", + ContainerPort: 8000, + }}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10m"), + corev1.ResourceMemory: resource.MustParse("20Mi"), + }, + }, + }}, + }, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + + if err = WaitForDeploymentReady(c, deployment.Name, namespace, -1); err != nil { + return nil, err + } + return deployment, nil +} + +func CreateService(ctx context.Context, c kubernetes.Interface, namespace string, deployment *appsv1.Deployment, port int32) (*corev1.Service, *url.URL, error) { + service, err := c.CoreV1().Services(namespace).Create(ctx, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: deployment.Name, + }, + Spec: corev1.ServiceSpec{ + Selector: deployment.Spec.Template.Labels, + Ports: []corev1.ServicePort{{ + Name: deployment.Spec.Template.Spec.Containers[0].Ports[0].Name, + Protocol: corev1.ProtocolTCP, + Port: port, + TargetPort: intstr.IntOrString{IntVal: deployment.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort}, + }}, + Type: corev1.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + }, + }, metav1.CreateOptions{}) + if err != nil { + return nil, nil, err + } + + // wait for ClusterIP to be assigned + var createdSvc *corev1.Service + if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 10*time.Second, true, func(ctx context.Context) (bool, error) { + svc, err := c.CoreV1().Services(namespace).Get(ctx, service.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if svc.Spec.ClusterIP != "" && svc.Spec.ClusterIP != "None" { + createdSvc = svc + return true, nil + } + return false, nil + }); err != nil { + return nil, nil, fmt.Errorf("timed out waiting for ClusterIP for service %s/%s: %w", namespace, service.Name, err) + } + + return createdSvc, &url.URL{ + Scheme: "http", + Host: net.JoinHostPort(createdSvc.Spec.ClusterIP, strconv.Itoa(int(createdSvc.Spec.Ports[0].Port))), + Path: "graph", + }, nil +} + +func CreateNetworkPolicy(ctx context.Context, c kubernetes.Interface, namespace string, policyName string, MatchLabels map[string]string) (*networkingv1.NetworkPolicy, error) { + policy, err := c.NetworkingV1().NetworkPolicies(namespace).Create(ctx, + &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + }, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{ + MatchLabels: MatchLabels, + }, + PolicyTypes: []networkingv1.PolicyType{networkingv1.PolicyTypeIngress}, + Ingress: []networkingv1.NetworkPolicyIngressRule{{}}, + }, + }, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + + return policy, nil +} + +func DeleteNetworkPolicy(ctx context.Context, c kubernetes.Interface, namespace string, policyName string) error { + err := c.NetworkingV1().NetworkPolicies(namespace).Delete(ctx, policyName, metav1.DeleteOptions{}) + return err +} + +func CleanupService(ctx context.Context, c kubernetes.Interface, namespace string, serviceName string) error { + err := c.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{}) + return err +} + +func DeleteDeployment(ctx context.Context, c kubernetes.Interface, namespace string, deploymentName string) error { + err := c.AppsV1().Deployments(namespace).Delete(ctx, deploymentName, metav1.DeleteOptions{}) + return err +} + +func PatchUpstream(ctx context.Context, c *clientconfigv1.Clientset, url string, channel string) (*configv1.ClusterVersion, error) { + obj, err := c.ConfigV1().ClusterVersions().Get(ctx, "version", metav1.GetOptions{}) + + if err != nil { + return nil, err + } + + if obj == nil { + return nil, fmt.Errorf("ClusterVersion object not found") + } + + exists := obj.DeepCopy() + exists.Spec.Upstream = configv1.URL(url) + exists.Spec.Channel = channel + + cv, err := c.ConfigV1().ClusterVersions().Update(ctx, exists, metav1.UpdateOptions{}) + + return cv, err +}