From 34a667fdd599be8a25a293eaa65324222cbbb9c0 Mon Sep 17 00:00:00 2001 From: Austin Beattie Date: Tue, 14 Apr 2026 21:50:11 -0700 Subject: [PATCH 1/3] refactor: split controller.go - Split various functionality out of controller.go - Add reporting.go file to controller package containing logic related to reporting deployments to the Artifact Metadata API - Add workload package with encapsulated logic for resolving workload identity for pods. --- internal/controller/controller.go | 683 ++---------------- .../controller/controller_integration_test.go | 15 +- internal/controller/controller_test.go | 488 +------------ internal/controller/reporting.go | 319 ++++++++ internal/controller/reporting_test.go | 112 +++ internal/workload/workload.go | 293 ++++++++ internal/workload/workload_test.go | 414 +++++++++++ 7 files changed, 1214 insertions(+), 1110 deletions(-) create mode 100644 internal/controller/reporting.go create mode 100644 internal/controller/reporting_test.go create mode 100644 internal/workload/workload.go create mode 100644 internal/workload/workload_test.go diff --git a/internal/controller/controller.go b/internal/controller/controller.go index b722d3b..f1d353b 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -5,25 +5,21 @@ import ( "errors" "fmt" "log/slog" - "slices" "strings" "time" "github.com/github/deployment-tracker/internal/metadata" + "github.com/github/deployment-tracker/internal/workload" "github.com/github/deployment-tracker/pkg/deploymentrecord" "github.com/github/deployment-tracker/pkg/dtmetrics" - "github.com/github/deployment-tracker/pkg/ociutil" amcache "k8s.io/apimachinery/pkg/util/cache" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - appslisters "k8s.io/client-go/listers/apps/v1" - batchlisters "k8s.io/client-go/listers/batch/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -58,6 +54,13 @@ type podMetadataAggregator interface { BuildAggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata } +// workloadResolver is an interface for resolving the workload identity of a pod +// and determining if a workload is active. +type workloadResolver interface { + Resolve(pod *corev1.Pod) workload.Identity + IsActive(namespace string, identity workload.Identity) bool +} + // PodEvent represents a pod event to be processed. type PodEvent struct { Key string @@ -65,30 +68,15 @@ type PodEvent struct { DeletedPod *corev1.Pod // Only populated for delete events } -// workloadRef describes the top-level workload that owns a pod. -type workloadRef struct { - Name string - Kind string // "Deployment", "DaemonSet", "StatefulSet", "CronJob", or "Job" -} - // Controller is the Kubernetes controller for tracking deployments. type Controller struct { - clientset kubernetes.Interface - metadataAggregator podMetadataAggregator - podInformer cache.SharedIndexInformer - deploymentInformer cache.SharedIndexInformer - deploymentLister appslisters.DeploymentLister - daemonSetInformer cache.SharedIndexInformer - daemonSetLister appslisters.DaemonSetLister - statefulSetInformer cache.SharedIndexInformer - statefulSetLister appslisters.StatefulSetLister - jobInformer cache.SharedIndexInformer - jobLister batchlisters.JobLister - cronJobInformer cache.SharedIndexInformer - cronJobLister batchlisters.CronJobLister - workqueue workqueue.TypedRateLimitingInterface[PodEvent] - apiClient deploymentRecordPoster - cfg *Config + informerFactory informers.SharedInformerFactory + podInformer cache.SharedIndexInformer + workqueue workqueue.TypedRateLimitingInterface[PodEvent] + workloadResolver workloadResolver + metadataAggregator podMetadataAggregator + apiClient deploymentRecordPoster + cfg *Config // best effort cache to avoid redundant posts // post requests are idempotent, so if this cache fails due to // restarts or other events, nothing will break. @@ -104,19 +92,18 @@ type Controller struct { // New creates a new deployment tracker controller. func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregator, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) { // Create informer factory - factory := createInformerFactory(clientset, namespace, excludeNamespaces) - - podInformer := factory.Core().V1().Pods().Informer() - deploymentInformer := factory.Apps().V1().Deployments().Informer() - deploymentLister := factory.Apps().V1().Deployments().Lister() - daemonSetInformer := factory.Apps().V1().DaemonSets().Informer() - daemonSetLister := factory.Apps().V1().DaemonSets().Lister() - statefulSetInformer := factory.Apps().V1().StatefulSets().Informer() - statefulSetLister := factory.Apps().V1().StatefulSets().Lister() - jobInformer := factory.Batch().V1().Jobs().Informer() - jobLister := factory.Batch().V1().Jobs().Lister() - cronJobInformer := factory.Batch().V1().CronJobs().Informer() - cronJobLister := factory.Batch().V1().CronJobs().Lister() + informerFactory := createInformerFactory(clientset, namespace, excludeNamespaces) + + podInformer := informerFactory.Core().V1().Pods().Informer() + + // Create the workload resolver with listers from the factory + resolver := workload.NewResolver( + informerFactory.Apps().V1().Deployments().Lister(), + informerFactory.Apps().V1().DaemonSets().Lister(), + informerFactory.Apps().V1().StatefulSets().Lister(), + informerFactory.Batch().V1().Jobs().Lister(), + informerFactory.Batch().V1().CronJobs().Lister(), + ) // Create work queue with rate limiting queue := workqueue.NewTypedRateLimitingQueue( @@ -149,20 +136,11 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato } cntrl := &Controller{ - clientset: clientset, - metadataAggregator: metadataAggregator, + informerFactory: informerFactory, podInformer: podInformer, - deploymentInformer: deploymentInformer, - deploymentLister: deploymentLister, - daemonSetInformer: daemonSetInformer, - daemonSetLister: daemonSetLister, - statefulSetInformer: statefulSetInformer, - statefulSetLister: statefulSetLister, - jobInformer: jobInformer, - jobLister: jobLister, - cronJobInformer: cronJobInformer, - cronJobLister: cronJobLister, workqueue: queue, + workloadResolver: resolver, + metadataAggregator: metadataAggregator, apiClient: apiClient, cfg: cfg, observedDeployments: amcache.NewExpiring(), @@ -183,7 +161,7 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato // Only process pods that are running and belong // to a supported workload (Deployment, DaemonSet, StatefulSet, Job, or CronJob) - if pod.Status.Phase == corev1.PodRunning && hasSupportedOwner(pod) { + if pod.Status.Phase == corev1.PodRunning && workload.HasSupportedOwner(pod) { key, err := cache.MetaNamespaceKeyFunc(obj) // For our purposes, there are in practice @@ -199,7 +177,7 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato // Also process Job-owned pods that completed before // we observed them in Running phase (e.g. sub-second Jobs). - if isTerminalPhase(pod) && getJobOwnerName(pod) != "" { + if workload.IsTerminalPhase(pod) && workload.GetJobOwnerName(pod) != "" { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { queue.Add(PodEvent{ @@ -232,10 +210,10 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato // processed — this catches short-lived Jobs that skip Running. // We exclude Running→terminal transitions since those pods // were already enqueued when they entered Running. - isJobTerminal := !isTerminalPhase(oldPod) && isTerminalPhase(newPod) && - oldPod.Status.Phase != corev1.PodRunning && getJobOwnerName(newPod) != "" + isJobTerminal := !workload.IsTerminalPhase(oldPod) && workload.IsTerminalPhase(newPod) && + oldPod.Status.Phase != corev1.PodRunning && workload.GetJobOwnerName(newPod) != "" if !isJobTerminal { - if newPod.DeletionTimestamp != nil || !hasSupportedOwner(newPod) { + if newPod.DeletionTimestamp != nil || !workload.HasSupportedOwner(newPod) { return } } @@ -287,7 +265,7 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato } // Only process pods that belong to a supported workload - if !hasSupportedOwner(pod) { + if !workload.HasSupportedOwner(pod) { return } @@ -318,33 +296,24 @@ func (c *Controller) Run(ctx context.Context, workers int) error { slog.Info("Starting informers") - // Start the informers - go c.podInformer.Run(ctx.Done()) - go c.deploymentInformer.Run(ctx.Done()) - go c.daemonSetInformer.Run(ctx.Done()) - go c.statefulSetInformer.Run(ctx.Done()) - go c.jobInformer.Run(ctx.Done()) - go c.cronJobInformer.Run(ctx.Done()) + // Start all informers via the factory + c.informerFactory.Start(ctx.Done()) // Wait for the caches to be synced slog.Info("Waiting for informer caches to sync") informerSyncCtx, cancel := context.WithTimeout(ctx, c.informerSyncTimeout) - if !cache.WaitForCacheSync(informerSyncCtx.Done(), - c.podInformer.HasSynced, - c.deploymentInformer.HasSynced, - c.daemonSetInformer.HasSynced, - c.statefulSetInformer.HasSynced, - c.jobInformer.HasSynced, - c.cronJobInformer.HasSynced, - ) { - cancel() - if ctx.Err() != nil { - return fmt.Errorf("cache sync interrupted: %w", ctx.Err()) + syncResults := c.informerFactory.WaitForCacheSync(informerSyncCtx.Done()) + cancel() + + for _, synced := range syncResults { + if !synced { + if ctx.Err() != nil { + return fmt.Errorf("cache sync interrupted: %w", ctx.Err()) + } + return errors.New("timed out waiting for caches to sync - please ensure deployment tracker has the correct kubernetes permissions") } - return errors.New("timed out waiting for caches to sync - please ensure deployment tracker has the correct kubernetes permissions") } - cancel() slog.Info("Starting workers", "count", workers, @@ -401,291 +370,6 @@ func (c *Controller) processNextItem(ctx context.Context) bool { return true } -// processEvent processes a single pod event. -func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { - var pod *corev1.Pod - var wl workloadRef - - if event.EventType == EventDeleted { - // For delete events, use the pod captured at deletion time - pod = event.DeletedPod - if pod == nil { - slog.Error("Delete event missing pod data", - "key", event.Key, - ) - return nil - } - - // Check if the parent workload still exists. - // If it does, this is just a scale-down event (or a completed - // Job pod while the CronJob is still active), skip it. - // - // If a workload changes image versions, this will not - // fire delete/decommissioned events to the remote API. - // This is as intended, as the server will keep track of - // the (cluster unique) deployment name, and just update - // the referenced image digest to the newly observed (via - // the create event). - wl = c.getWorkloadRef(pod) - if wl.Name != "" && c.workloadActive(pod.Namespace, wl) { - slog.Debug("Parent workload still exists, skipping pod delete", - "namespace", pod.Namespace, - "workload_kind", wl.Kind, - "workload_name", wl.Name, - "pod", pod.Name, - ) - return nil - } - } else { - // For create events, get the pod from the informer's cache - obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key) - if err != nil { - slog.Error("Failed to get pod from cache", - "key", event.Key, - "error", err, - ) - return nil - } - if !exists { - // Pod no longer exists in cache, skip processing - return nil - } - - var ok bool - pod, ok = obj.(*corev1.Pod) - if !ok { - slog.Error("Invalid object type in cache", - "key", event.Key, - ) - return nil - } - } - - // Resolve the workload name for the deployment record. - // For delete events, wl was already resolved above. - if wl.Name == "" { - wl = c.getWorkloadRef(pod) - } - if wl.Name == "" { - slog.Debug("Could not resolve workload name for pod, skipping", - "namespace", pod.Namespace, - "pod", pod.Name, - ) - return nil - } - - var lastErr error - - // Gather aggregate metadata for adds/updates - var aggPodMetadata *metadata.AggregatePodMetadata - if event.EventType != EventDeleted { - aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) - } - - // Record info for each container in the pod - for _, container := range pod.Spec.Containers { - if err := c.recordContainer(ctx, pod, container, event.EventType, wl.Name, aggPodMetadata); err != nil { - lastErr = err - } - } - - // Also record init containers - for _, container := range pod.Spec.InitContainers { - if err := c.recordContainer(ctx, pod, container, event.EventType, wl.Name, aggPodMetadata); err != nil { - lastErr = err - } - } - - return lastErr -} - -// deploymentExists checks if a deployment exists in the local informer cache. -func (c *Controller) deploymentExists(namespace, name string) bool { - _, err := c.deploymentLister.Deployments(namespace).Get(name) - if err != nil { - if k8serrors.IsNotFound(err) { - return false - } - slog.Warn("Failed to check if deployment exists in cache, assuming it does", - "namespace", namespace, - "deployment", name, - "error", err, - ) - return true - } - return true -} - -// recordContainer records a single container's deployment info. -func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) error { - var cacheKey string - - status := deploymentrecord.StatusDeployed - if eventType == EventDeleted { - status = deploymentrecord.StatusDecommissioned - } - - dn := getARDeploymentName(pod, container, c.cfg.Template, workloadName) - digest := getContainerDigest(pod, container.Name) - - if dn == "" || digest == "" { - slog.Debug("Skipping container: missing deployment name or digest", - "namespace", pod.Namespace, - "pod", pod.Name, - "container", container.Name, - "deployment_name", dn, - "has_digest", digest != "", - ) - return nil - } - - // Check if we've already recorded this deployment - switch status { - case deploymentrecord.StatusDeployed: - cacheKey = getCacheKey(EventCreated, dn, digest) - if _, exists := c.observedDeployments.Get(cacheKey); exists { - slog.Debug("Deployment already observed, skipping post", - "deployment_name", dn, - "digest", digest, - ) - return nil - } - case deploymentrecord.StatusDecommissioned: - cacheKey = getCacheKey(EventDeleted, dn, digest) - if _, exists := c.observedDeployments.Get(cacheKey); exists { - slog.Debug("Deployment already deleted, skipping post", - "deployment_name", dn, - "digest", digest, - ) - return nil - } - default: - return fmt.Errorf("invalid status: %s", status) - } - - // Check if this artifact was previously unknown (404 from the API) - if _, exists := c.unknownArtifacts.Get(digest); exists { - dtmetrics.PostDeploymentRecordUnknownArtifactCacheHit.Inc() - slog.Debug("Artifact previously returned 404, skipping post", - "deployment_name", dn, - "digest", digest, - ) - return nil - } - - // Extract image name and tag - imageName, version := ociutil.ExtractName(container.Image) - - // Format runtime risks and tags - var runtimeRisks []deploymentrecord.RuntimeRisk - var tags map[string]string - if aggPodMetadata != nil { - for risk := range aggPodMetadata.RuntimeRisks { - runtimeRisks = append(runtimeRisks, risk) - } - slices.Sort(runtimeRisks) - tags = aggPodMetadata.Tags - } - - // Create deployment record - record := deploymentrecord.NewDeploymentRecord( - imageName, - digest, - version, - c.cfg.LogicalEnvironment, - c.cfg.PhysicalEnvironment, - c.cfg.Cluster, - status, - dn, - runtimeRisks, - tags, - ) - - if err := c.apiClient.PostOne(ctx, record); err != nil { - // Return if no artifact is found and cache the digest - var noArtifactErr *deploymentrecord.NoArtifactError - if errors.As(err, &noArtifactErr) { - c.unknownArtifacts.Set(digest, true, unknownArtifactTTL) - slog.Info("No artifact found, digest cached as unknown", - "deployment_name", dn, - "digest", digest, - ) - return nil - } - - // Make sure to not retry on client error messages - var clientErr *deploymentrecord.ClientError - if errors.As(err, &clientErr) { - slog.Warn("Failed to post record", - "event_type", eventType, - "name", record.Name, - "deployment_name", record.DeploymentName, - "status", record.Status, - "digest", record.Digest, - "error", err, - ) - return nil - } - - slog.Error("Failed to post record", - "event_type", eventType, - "name", record.Name, - "deployment_name", record.DeploymentName, - "status", record.Status, - "digest", record.Digest, - "error", err, - ) - return err - } - - slog.Info("Posted record", - "event_type", eventType, - "name", record.Name, - "deployment_name", record.DeploymentName, - "status", record.Status, - "digest", record.Digest, - "runtime_risks", record.RuntimeRisks, - "tags", record.Tags, - ) - - // Update cache after successful post - switch status { - case deploymentrecord.StatusDeployed: - cacheKey = getCacheKey(EventCreated, dn, digest) - c.observedDeployments.Set(cacheKey, true, 2*time.Minute) - // If there was a previous delete event, remove that - cacheKey = getCacheKey(EventDeleted, dn, digest) - c.observedDeployments.Delete(cacheKey) - case deploymentrecord.StatusDecommissioned: - cacheKey = getCacheKey(EventDeleted, dn, digest) - c.observedDeployments.Set(cacheKey, true, 2*time.Minute) - // If there was a previous create event, remove that - cacheKey = getCacheKey(EventCreated, dn, digest) - c.observedDeployments.Delete(cacheKey) - default: - return fmt.Errorf("invalid status: %s", status) - } - - return nil -} - -func getCacheKey(ev, dn, digest string) string { - return ev + "||" + dn + "||" + digest -} - -// isNumeric returns true if s is non-empty and consists entirely of ASCII digits. -func isNumeric(s string) bool { - if s == "" { - return false - } - for _, c := range s { - if c < '0' || c > '9' { - return false - } - } - return true -} - // createInformerFactory creates a shared informer factory with the given resync period. // If excludeNamespaces is non-empty, it will exclude those namespaces from being watched. // If namespace is non-empty, it will only watch that namespace. @@ -735,276 +419,3 @@ func createInformerFactory(clientset kubernetes.Interface, namespace string, exc return factory } - -// getARDeploymentName converts the pod's metadata into the correct format -// for the deployment name for the artifact registry (this is not the same -// as the K8s deployment's name!) -// The deployment name must unique within logical, physical environment and -// the cluster. -func getARDeploymentName(p *corev1.Pod, c corev1.Container, tmpl, workloadName string) string { - res := tmpl - res = strings.ReplaceAll(res, TmplNS, p.Namespace) - res = strings.ReplaceAll(res, TmplDN, workloadName) - res = strings.ReplaceAll(res, TmplCN, c.Name) - return res -} - -// getContainerDigest extracts the image digest from the container status. -// The spec only contains the desired state, so any resolved digests must -// be pulled from the status field. -func getContainerDigest(pod *corev1.Pod, containerName string) string { - // Check regular container statuses - for _, status := range pod.Status.ContainerStatuses { - if status.Name == containerName { - return ociutil.ExtractDigest(status.ImageID) - } - } - - // Check init container statuses - for _, status := range pod.Status.InitContainerStatuses { - if status.Name == containerName { - return ociutil.ExtractDigest(status.ImageID) - } - } - - return "" -} - -// hasSupportedOwner returns true if the pod is owned by a supported -// workload controller (ReplicaSet for Deployments, DaemonSet, StatefulSet, or Job for Jobs/CronJobs). -func hasSupportedOwner(pod *corev1.Pod) bool { - for _, owner := range pod.OwnerReferences { - if owner.Kind == "ReplicaSet" || owner.Kind == "DaemonSet" || owner.Kind == "StatefulSet" || owner.Kind == "Job" { - return true - } - } - return false -} - -// isTerminalPhase returns true if the pod has reached a terminal phase -// (Succeeded or Failed). Used to catch short-lived Job pods that complete -// before the controller observes them in the Running phase. -func isTerminalPhase(pod *corev1.Pod) bool { - return pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed -} - -// getDeploymentName returns the deployment name for a pod, if it belongs -// to one. -func getDeploymentName(pod *corev1.Pod) string { - // Pods created by Deployments are owned by ReplicaSets - // The ReplicaSet name follows the pattern: - - for _, owner := range pod.OwnerReferences { - if owner.Kind == "ReplicaSet" { - // Extract deployment name by removing the hash suffix - // ReplicaSet name format: - - rsName := owner.Name - lastDash := strings.LastIndex(rsName, "-") - if lastDash > 0 { - return rsName[:lastDash] - } - return rsName - } - } - return "" -} - -// getJobOwnerName returns the Job name from the pod's owner references, -// if the pod is owned by a Job. -func getJobOwnerName(pod *corev1.Pod) string { - for _, owner := range pod.OwnerReferences { - if owner.Kind == "Job" { - return owner.Name - } - } - return "" -} - -// getDaemonSetName returns the DaemonSet name for a pod, if it belongs -// to one. DaemonSet pods are owned directly by the DaemonSet. -func getDaemonSetName(pod *corev1.Pod) string { - for _, owner := range pod.OwnerReferences { - if owner.Kind == "DaemonSet" { - return owner.Name - } - } - return "" -} - -// getStatefulSetName returns the StatefulSet name for a pod, if it belongs -// to one. StatefulSet pods are owned directly by the StatefulSet. -func getStatefulSetName(pod *corev1.Pod) string { - for _, owner := range pod.OwnerReferences { - if owner.Kind == "StatefulSet" { - return owner.Name - } - } - return "" -} - -// getWorkloadRef resolves the top-level workload that owns a pod. -// For Deployment-owned pods (via ReplicaSets), returns the Deployment name. -// For DaemonSet-owned pods, returns the DaemonSet name. -// For StatefulSet-owned pods, returns the StatefulSet name. -// For CronJob-owned pods (via Jobs), returns the CronJob name. -// For standalone Job-owned pods, returns the Job name. -func (c *Controller) getWorkloadRef(pod *corev1.Pod) workloadRef { - // Check for Deployment (via ReplicaSet) - if dn := getDeploymentName(pod); dn != "" { - return workloadRef{Name: dn, Kind: "Deployment"} - } - - // Check for DaemonSet (direct ownership) - if dsn := getDaemonSetName(pod); dsn != "" { - return workloadRef{Name: dsn, Kind: "DaemonSet"} - } - - // Check for StatefulSet (direct ownership) - if ssn := getStatefulSetName(pod); ssn != "" { - return workloadRef{Name: ssn, Kind: "StatefulSet"} - } - - // Check for Job - jobName := getJobOwnerName(pod) - if jobName == "" { - return workloadRef{} - } - - return c.resolveJobWorkload(pod.Namespace, jobName) -} - -// resolveJobWorkload determines whether a Job is owned by a CronJob or is standalone. -func (c *Controller) resolveJobWorkload(namespace, jobName string) workloadRef { - // Try to look up the Job to check for CronJob ownership - if c.jobLister != nil { - job, err := c.jobLister.Jobs(namespace).Get(jobName) - if err == nil { - for _, owner := range job.OwnerReferences { - if owner.Kind == "CronJob" { - return workloadRef{Name: owner.Name, Kind: "CronJob"} - } - } - return workloadRef{Name: jobName, Kind: "Job"} - } - } - - // Job not found in cache - try CronJob name derivation as fallback. - // CronJob-created Jobs follow the naming pattern: - - // where the suffix is always numeric. We validate the suffix is all digits to - // reduce false matches from standalone Jobs that coincidentally share a prefix - // with an existing CronJob. A residual false positive is still possible if a - // standalone Job is named exactly -, but the primary path - // (checking Job OwnerReferences) handles the common case; this fallback only - // fires when the Job has already been garbage-collected. - if c.cronJobLister != nil { - lastDash := strings.LastIndex(jobName, "-") - if lastDash > 0 { - suffix := jobName[lastDash+1:] - if isNumeric(suffix) { - potentialCronJobName := jobName[:lastDash] - if c.cronJobExists(namespace, potentialCronJobName) { - return workloadRef{Name: potentialCronJobName, Kind: "CronJob"} - } - } - } - } - - // Standalone Job (possibly already deleted) - return workloadRef{Name: jobName, Kind: "Job"} -} - -// workloadActive checks if the parent workload for a pod still exists -// in the local informer cache. -func (c *Controller) workloadActive(namespace string, ref workloadRef) bool { - switch ref.Kind { - case "Deployment": - return c.deploymentExists(namespace, ref.Name) - case "DaemonSet": - return c.daemonSetExists(namespace, ref.Name) - case "StatefulSet": - return c.statefulSetExists(namespace, ref.Name) - case "CronJob": - return c.cronJobExists(namespace, ref.Name) - case "Job": - return c.jobExists(namespace, ref.Name) - default: - return false - } -} - -// jobExists checks if a job exists in the local informer cache. -func (c *Controller) jobExists(namespace, name string) bool { - _, err := c.jobLister.Jobs(namespace).Get(name) - if err != nil { - if k8serrors.IsNotFound(err) { - return false - } - slog.Warn("Failed to check if job exists in cache, assuming it does", - "namespace", namespace, - "job", name, - "error", err, - ) - return true - } - return true -} - -// daemonSetExists checks if a daemonset exists in the local informer cache. -func (c *Controller) daemonSetExists(namespace, name string) bool { - _, err := c.daemonSetLister.DaemonSets(namespace).Get(name) - if err != nil { - if k8serrors.IsNotFound(err) { - return false - } - slog.Warn("Failed to check if daemonset exists in cache, assuming it does", - "namespace", namespace, - "daemonset", name, - "error", err, - ) - return true - } - return true -} - -// statefulSetExists checks if a statefulset exists in the local informer cache. -func (c *Controller) statefulSetExists(namespace, name string) bool { - _, err := c.statefulSetLister.StatefulSets(namespace).Get(name) - if err != nil { - if k8serrors.IsNotFound(err) { - return false - } - slog.Warn("Failed to check if statefulset exists in cache, assuming it does", - "namespace", namespace, - "statefulset", name, - "error", err, - ) - return true - } - return true -} - -// cronJobExists checks if a cronjob exists in the local informer cache. -func (c *Controller) cronJobExists(namespace, name string) bool { - _, err := c.cronJobLister.CronJobs(namespace).Get(name) - if err != nil { - if k8serrors.IsNotFound(err) { - return false - } - slog.Warn("Failed to check if cronjob exists in cache, assuming it does", - "namespace", namespace, - "cronjob", name, - "error", err, - ) - return true - } - return true -} - -func podToPartialMetadata(pod *corev1.Pod) *metav1.PartialObjectMetadata { - return &metav1.PartialObjectMetadata{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", - }, - ObjectMeta: pod.ObjectMeta, - } -} diff --git a/internal/controller/controller_integration_test.go b/internal/controller/controller_integration_test.go index bfc3818..fce91ed 100644 --- a/internal/controller/controller_integration_test.go +++ b/internal/controller/controller_integration_test.go @@ -21,7 +21,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" k8smetadata "k8s.io/client-go/metadata" - "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/envtest" ) @@ -120,15 +119,11 @@ func setup(t *testing.T, onlyNamespace string, excludeNamespaces string) (*kuber go func() { _ = ctrl.Run(ctx, 1) }() - if !cache.WaitForCacheSync(ctx.Done(), - ctrl.podInformer.HasSynced, - ctrl.deploymentInformer.HasSynced, - ctrl.daemonSetInformer.HasSynced, - ctrl.statefulSetInformer.HasSynced, - ctrl.jobInformer.HasSynced, - ctrl.cronJobInformer.HasSynced, - ) { - t.Fatal("timed out waiting for informer cache to sync") + syncResults := ctrl.informerFactory.WaitForCacheSync(ctx.Done()) + for _, synced := range syncResults { + if !synced { + t.Fatal("timed out waiting for informer cache to sync") + } } return clientset, mockDeploymentRecordPoster diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index e66a61b..6ef25dc 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/github/deployment-tracker/internal/workload" "github.com/github/deployment-tracker/pkg/deploymentrecord" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -40,6 +41,17 @@ func (m *mockPoster) getCalls() int { return m.calls } +// mockResolver is a test double for the workloadResolver interface. +type mockResolver struct{} + +func (m *mockResolver) Resolve(pod *corev1.Pod) workload.Identity { + return workload.Identity{} +} + +func (m *mockResolver) IsActive(_ string, _ workload.Identity) bool { + return false +} + // newTestController creates a minimal Controller suitable for unit-testing // recordContainer without a real Kubernetes cluster. func newTestController(poster *mockPoster) *Controller { @@ -51,6 +63,7 @@ func newTestController(poster *mockPoster) *Controller { PhysicalEnvironment: "test", Cluster: "test", }, + workloadResolver: &mockResolver{}, observedDeployments: amcache.NewExpiring(), unknownArtifacts: amcache.NewExpiring(), } @@ -82,463 +95,6 @@ func testPod(digest string) (*corev1.Pod, corev1.Container) { return pod, container } -func TestRecordContainer_UnknownArtifactCachePopulatedOn404(t *testing.T) { - t.Parallel() - digest := "sha256:unknown404digest" - poster := &mockPoster{ - lastErr: &deploymentrecord.NoArtifactError{}, - } - ctrl := newTestController(poster) - pod, container := testPod(digest) - - // First call should hit the API and get a 404 - err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) - require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) - - // Digest should now be in the unknown artifacts cache - _, exists := ctrl.unknownArtifacts.Get(digest) - assert.True(t, exists, "digest should be cached after 404") -} - -func TestRecordContainer_UnknownArtifactCacheSkipsAPICall(t *testing.T) { - t.Parallel() - digest := "sha256:cacheddigest" - poster := &mockPoster{ - lastErr: &deploymentrecord.NoArtifactError{}, - } - ctrl := newTestController(poster) - pod, container := testPod(digest) - - // First call — API returns 404, populates cache - err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) - require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) - - // Second call — should be served from cache, no API call - err = ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) - require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls(), "API should not be called for cached unknown artifact") -} - -func TestRecordContainer_UnknownArtifactCacheAppliesToDecommission(t *testing.T) { - t.Parallel() - digest := "sha256:decommission404" - poster := &mockPoster{ - lastErr: &deploymentrecord.NoArtifactError{}, - } - ctrl := newTestController(poster) - pod, container := testPod(digest) - - // Deploy call — 404, populates cache - err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) - require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) - - // Decommission call for same digest — should skip API - err = ctrl.recordContainer(context.Background(), pod, container, EventDeleted, "test-deployment", nil) - require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls(), "decommission should also be skipped for cached unknown artifact") -} - -func TestRecordContainer_UnknownArtifactCacheExpires(t *testing.T) { - t.Parallel() - digest := "sha256:expiringdigest" - poster := &mockPoster{ - lastErr: &deploymentrecord.NoArtifactError{}, - } - ctrl := newTestController(poster) - pod, container := testPod(digest) - - // Seed the cache with a very short TTL to test expiry - ctrl.unknownArtifacts.Set(digest, true, 50*time.Millisecond) - - // Immediately — should be cached - err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) - require.NoError(t, err) - assert.Equal(t, 0, poster.getCalls(), "should skip API while cached") - - // Wait for expiry - time.Sleep(100 * time.Millisecond) - - // After expiry — should call API again - err = ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) - require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls(), "should call API after cache expiry") -} - -func TestRecordContainer_SuccessfulPostDoesNotPopulateUnknownCache(t *testing.T) { - t.Parallel() - digest := "sha256:knowndigest" - poster := &mockPoster{lastErr: nil} // success - ctrl := newTestController(poster) - pod, container := testPod(digest) - - err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) - require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) - - // Digest should NOT be in the unknown artifacts cache - _, exists := ctrl.unknownArtifacts.Get(digest) - assert.False(t, exists, "successful post should not cache digest as unknown") -} - -func TestHasSupportedOwner(t *testing.T) { - t.Parallel() - tests := []struct { - name string - pod *corev1.Pod - expected bool - }{ - { - name: "pod owned by ReplicaSet", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "ReplicaSet", - Name: "test-rs-abc123", - }}, - }, - }, - expected: true, - }, - { - name: "pod owned by Job", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "Job", - Name: "test-job", - }}, - }, - }, - expected: true, - }, - { - name: "pod with no owner", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{}, - }, - expected: false, - }, - { - name: "pod owned by DaemonSet", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "DaemonSet", - Name: "test-ds", - }}, - }, - }, - expected: true, - }, - { - name: "pod owned by StatefulSet", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "StatefulSet", - Name: "test-ss", - }}, - }, - }, - expected: true, - }, - { - name: "pod owned by ReplicationController", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "ReplicationController", - Name: "test-rc", - }}, - }, - }, - expected: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - result := hasSupportedOwner(tt.pod) - assert.Equal(t, tt.expected, result) - }) - } -} - -func TestGetJobOwnerName(t *testing.T) { - t.Parallel() - tests := []struct { - name string - pod *corev1.Pod - expected string - }{ - { - name: "pod owned by Job", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "Job", - Name: "my-job", - }}, - }, - }, - expected: "my-job", - }, - { - name: "pod not owned by Job", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "ReplicaSet", - Name: "my-rs-abc123", - }}, - }, - }, - expected: "", - }, - { - name: "pod with no owner", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{}, - }, - expected: "", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - result := getJobOwnerName(tt.pod) - assert.Equal(t, tt.expected, result) - }) - } -} - -func TestGetWorkloadRef_Deployment(t *testing.T) { - t.Parallel() - ctrl := newTestController(&mockPoster{}) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "ReplicaSet", - Name: "my-deployment-abc123", - }}, - }, - } - wl := ctrl.getWorkloadRef(pod) - assert.Equal(t, "my-deployment", wl.Name) - assert.Equal(t, "Deployment", wl.Kind) -} - -func TestGetWorkloadRef_DaemonSet(t *testing.T) { - t.Parallel() - ctrl := newTestController(&mockPoster{}) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "DaemonSet", - Name: "my-daemonset", - }}, - }, - } - wl := ctrl.getWorkloadRef(pod) - assert.Equal(t, "my-daemonset", wl.Name) - assert.Equal(t, "DaemonSet", wl.Kind) -} - -func TestGetDaemonSetName(t *testing.T) { - t.Parallel() - tests := []struct { - name string - pod *corev1.Pod - expected string - }{ - { - name: "pod owned by DaemonSet", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "DaemonSet", - Name: "my-daemonset", - }}, - }, - }, - expected: "my-daemonset", - }, - { - name: "pod not owned by DaemonSet", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "ReplicaSet", - Name: "my-rs-abc123", - }}, - }, - }, - expected: "", - }, - { - name: "pod with no owner", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{}, - }, - expected: "", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - result := getDaemonSetName(tt.pod) - assert.Equal(t, tt.expected, result) - }) - } -} - -func TestGetStatefulSetName(t *testing.T) { - t.Parallel() - tests := []struct { - name string - pod *corev1.Pod - expected string - }{ - { - name: "pod owned by StatefulSet", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "StatefulSet", - Name: "my-statefulset", - }}, - }, - }, - expected: "my-statefulset", - }, - { - name: "pod not owned by StatefulSet", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "ReplicaSet", - Name: "my-rs-abc123", - }}, - }, - }, - expected: "", - }, - { - name: "pod with no owner", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{}, - }, - expected: "", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - result := getStatefulSetName(tt.pod) - assert.Equal(t, tt.expected, result) - }) - } -} - -func TestGetWorkloadRef_StatefulSet(t *testing.T) { - t.Parallel() - ctrl := newTestController(&mockPoster{}) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "StatefulSet", - Name: "my-statefulset", - }}, - }, - } - wl := ctrl.getWorkloadRef(pod) - assert.Equal(t, "my-statefulset", wl.Name) - assert.Equal(t, "StatefulSet", wl.Kind) -} - -func TestIsNumeric(t *testing.T) { - t.Parallel() - tests := []struct { - input string - expected bool - }{ - {"28485120", true}, - {"0", true}, - {"123456789", true}, - {"", false}, - {"abc", false}, - {"123abc", false}, - {"12-34", false}, - } - - for _, tt := range tests { - t.Run(tt.input, func(t *testing.T) { - t.Parallel() - assert.Equal(t, tt.expected, isNumeric(tt.input)) - }) - } -} - -func TestGetWorkloadRef_StandaloneJob(t *testing.T) { - t.Parallel() - // With nil listers, resolveJobWorkload falls back to standalone Job - ctrl := newTestController(&mockPoster{}) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{{ - Kind: "Job", - Name: "my-standalone-job", - }}, - }, - } - wl := ctrl.getWorkloadRef(pod) - assert.Equal(t, "my-standalone-job", wl.Name) - assert.Equal(t, "Job", wl.Kind) -} - -func TestGetWorkloadRef_NoOwner(t *testing.T) { - t.Parallel() - ctrl := newTestController(&mockPoster{}) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{}, - } - wl := ctrl.getWorkloadRef(pod) - assert.Empty(t, wl.Name) - assert.Empty(t, wl.Kind) -} - -func TestIsTerminalPhase(t *testing.T) { - t.Parallel() - tests := []struct { - name string - phase corev1.PodPhase - expected bool - }{ - {"Succeeded", corev1.PodSucceeded, true}, - {"Failed", corev1.PodFailed, true}, - {"Running", corev1.PodRunning, false}, - {"Pending", corev1.PodPending, false}, - {"Unknown", corev1.PodUnknown, false}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - pod := &corev1.Pod{ - Status: corev1.PodStatus{Phase: tt.phase}, - } - assert.Equal(t, tt.expected, isTerminalPhase(pod)) - }) - } -} - func TestRun_InformerSyncTimeout(t *testing.T) { t.Parallel() fakeClient := fake.NewSimpleClientset() @@ -552,17 +108,21 @@ func TestRun_InformerSyncTimeout(t *testing.T) { factory := createInformerFactory(fakeClient, "", "") + // Ensure the informers are registered with the factory by accessing them + factory.Core().V1().Pods().Informer() + factory.Apps().V1().Deployments().Informer() + factory.Apps().V1().DaemonSets().Informer() + factory.Apps().V1().StatefulSets().Informer() + factory.Batch().V1().Jobs().Informer() + factory.Batch().V1().CronJobs().Informer() + ctrl := &Controller{ - clientset: fakeClient, - podInformer: factory.Core().V1().Pods().Informer(), - deploymentInformer: factory.Apps().V1().Deployments().Informer(), - daemonSetInformer: factory.Apps().V1().DaemonSets().Informer(), - statefulSetInformer: factory.Apps().V1().StatefulSets().Informer(), - jobInformer: factory.Batch().V1().Jobs().Informer(), - cronJobInformer: factory.Batch().V1().CronJobs().Informer(), + informerFactory: factory, + podInformer: factory.Core().V1().Pods().Informer(), workqueue: workqueue.NewTypedRateLimitingQueue( workqueue.DefaultTypedControllerRateLimiter[PodEvent](), ), + workloadResolver: &mockResolver{}, apiClient: &mockPoster{}, cfg: &Config{}, observedDeployments: amcache.NewExpiring(), diff --git a/internal/controller/reporting.go b/internal/controller/reporting.go new file mode 100644 index 0000000..9d95b9a --- /dev/null +++ b/internal/controller/reporting.go @@ -0,0 +1,319 @@ +package controller + +import ( + "context" + "errors" + "fmt" + "log/slog" + "slices" + "strings" + "time" + + "github.com/github/deployment-tracker/internal/metadata" + "github.com/github/deployment-tracker/internal/workload" + "github.com/github/deployment-tracker/pkg/deploymentrecord" + "github.com/github/deployment-tracker/pkg/dtmetrics" + "github.com/github/deployment-tracker/pkg/ociutil" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// processEvent processes a single pod event. +func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { + var pod *corev1.Pod + var wl workload.Identity + + if event.EventType == EventDeleted { + // For delete events, use the pod captured at deletion time + pod = event.DeletedPod + if pod == nil { + slog.Error("Delete event missing pod data", + "key", event.Key, + ) + return nil + } + + // Check if the parent workload still exists. + // If it does, this is just a scale-down event (or a completed + // Job pod while the CronJob is still active), skip it. + // + // If a workload changes image versions, this will not + // fire delete/decommissioned events to the remote API. + // This is as intended, as the server will keep track of + // the (cluster unique) deployment name, and just update + // the referenced image digest to the newly observed (via + // the create event). + wl = c.workloadResolver.Resolve(pod) + if wl.Name != "" && c.workloadResolver.IsActive(pod.Namespace, wl) { + slog.Debug("Parent workload still exists, skipping pod delete", + "namespace", pod.Namespace, + "workload_kind", wl.Kind, + "workload_name", wl.Name, + "pod", pod.Name, + ) + return nil + } + } else { + // For create events, get the pod from the informer's cache + obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key) + if err != nil { + slog.Error("Failed to get pod from cache", + "key", event.Key, + "error", err, + ) + return nil + } + if !exists { + // Pod no longer exists in cache, skip processing + return nil + } + + var ok bool + pod, ok = obj.(*corev1.Pod) + if !ok { + slog.Error("Invalid object type in cache", + "key", event.Key, + ) + return nil + } + } + + // Resolve the workload name for the deployment record. + // For delete events, wl was already resolved above. + if wl.Name == "" { + wl = c.workloadResolver.Resolve(pod) + } + if wl.Name == "" { + slog.Debug("Could not resolve workload name for pod, skipping", + "namespace", pod.Namespace, + "pod", pod.Name, + ) + return nil + } + + var lastErr error + + // Gather aggregate metadata for adds/updates + var aggPodMetadata *metadata.AggregatePodMetadata + if event.EventType != EventDeleted { + aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) + } + + // Record info for each container in the pod + for _, container := range pod.Spec.Containers { + if err := c.recordContainer(ctx, pod, container, event.EventType, wl.Name, aggPodMetadata); err != nil { + lastErr = err + } + } + + // Also record init containers + for _, container := range pod.Spec.InitContainers { + if err := c.recordContainer(ctx, pod, container, event.EventType, wl.Name, aggPodMetadata); err != nil { + lastErr = err + } + } + + return lastErr +} + +// recordContainer records a single container's deployment info. +func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) error { + var cacheKey string + + status := deploymentrecord.StatusDeployed + if eventType == EventDeleted { + status = deploymentrecord.StatusDecommissioned + } + + dn := getARDeploymentName(pod, container, c.cfg.Template, workloadName) + digest := getContainerDigest(pod, container.Name) + + if dn == "" || digest == "" { + slog.Debug("Skipping container: missing deployment name or digest", + "namespace", pod.Namespace, + "pod", pod.Name, + "container", container.Name, + "deployment_name", dn, + "has_digest", digest != "", + ) + return nil + } + + // Check if we've already recorded this deployment + switch status { + case deploymentrecord.StatusDeployed: + cacheKey = getCacheKey(EventCreated, dn, digest) + if _, exists := c.observedDeployments.Get(cacheKey); exists { + slog.Debug("Deployment already observed, skipping post", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + case deploymentrecord.StatusDecommissioned: + cacheKey = getCacheKey(EventDeleted, dn, digest) + if _, exists := c.observedDeployments.Get(cacheKey); exists { + slog.Debug("Deployment already deleted, skipping post", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + default: + return fmt.Errorf("invalid status: %s", status) + } + + // Check if this artifact was previously unknown (404 from the API) + if _, exists := c.unknownArtifacts.Get(digest); exists { + dtmetrics.PostDeploymentRecordUnknownArtifactCacheHit.Inc() + slog.Debug("Artifact previously returned 404, skipping post", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + + // Extract image name and tag + imageName, version := ociutil.ExtractName(container.Image) + + // Format runtime risks and tags + var runtimeRisks []deploymentrecord.RuntimeRisk + var tags map[string]string + if aggPodMetadata != nil { + for risk := range aggPodMetadata.RuntimeRisks { + runtimeRisks = append(runtimeRisks, risk) + } + slices.Sort(runtimeRisks) + tags = aggPodMetadata.Tags + } + + // Create deployment record + record := deploymentrecord.NewDeploymentRecord( + imageName, + digest, + version, + c.cfg.LogicalEnvironment, + c.cfg.PhysicalEnvironment, + c.cfg.Cluster, + status, + dn, + runtimeRisks, + tags, + ) + + if err := c.apiClient.PostOne(ctx, record); err != nil { + // Return if no artifact is found and cache the digest + var noArtifactErr *deploymentrecord.NoArtifactError + if errors.As(err, &noArtifactErr) { + c.unknownArtifacts.Set(digest, true, unknownArtifactTTL) + slog.Info("No artifact found, digest cached as unknown", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + + // Make sure to not retry on client error messages + var clientErr *deploymentrecord.ClientError + if errors.As(err, &clientErr) { + slog.Warn("Failed to post record", + "event_type", eventType, + "name", record.Name, + "deployment_name", record.DeploymentName, + "status", record.Status, + "digest", record.Digest, + "error", err, + ) + return nil + } + + slog.Error("Failed to post record", + "event_type", eventType, + "name", record.Name, + "deployment_name", record.DeploymentName, + "status", record.Status, + "digest", record.Digest, + "error", err, + ) + return err + } + + slog.Info("Posted record", + "event_type", eventType, + "name", record.Name, + "deployment_name", record.DeploymentName, + "status", record.Status, + "digest", record.Digest, + "runtime_risks", record.RuntimeRisks, + "tags", record.Tags, + ) + + // Update cache after successful post + switch status { + case deploymentrecord.StatusDeployed: + cacheKey = getCacheKey(EventCreated, dn, digest) + c.observedDeployments.Set(cacheKey, true, 2*time.Minute) + // If there was a previous delete event, remove that + cacheKey = getCacheKey(EventDeleted, dn, digest) + c.observedDeployments.Delete(cacheKey) + case deploymentrecord.StatusDecommissioned: + cacheKey = getCacheKey(EventDeleted, dn, digest) + c.observedDeployments.Set(cacheKey, true, 2*time.Minute) + // If there was a previous create event, remove that + cacheKey = getCacheKey(EventCreated, dn, digest) + c.observedDeployments.Delete(cacheKey) + default: + return fmt.Errorf("invalid status: %s", status) + } + + return nil +} + +func getCacheKey(ev, dn, digest string) string { + return ev + "||" + dn + "||" + digest +} + +// getARDeploymentName converts the pod's metadata into the correct format +// for the deployment name for the artifact registry (this is not the same +// as the K8s deployment's name!) +// The deployment name must unique within logical, physical environment and +// the cluster. +func getARDeploymentName(p *corev1.Pod, c corev1.Container, tmpl, workloadName string) string { + res := tmpl + res = strings.ReplaceAll(res, TmplNS, p.Namespace) + res = strings.ReplaceAll(res, TmplDN, workloadName) + res = strings.ReplaceAll(res, TmplCN, c.Name) + return res +} + +// getContainerDigest extracts the image digest from the container status. +// The spec only contains the desired state, so any resolved digests must +// be pulled from the status field. +func getContainerDigest(pod *corev1.Pod, containerName string) string { + // Check regular container statuses + for _, status := range pod.Status.ContainerStatuses { + if status.Name == containerName { + return ociutil.ExtractDigest(status.ImageID) + } + } + + // Check init container statuses + for _, status := range pod.Status.InitContainerStatuses { + if status.Name == containerName { + return ociutil.ExtractDigest(status.ImageID) + } + } + + return "" +} + +func podToPartialMetadata(pod *corev1.Pod) *metav1.PartialObjectMetadata { + return &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: pod.ObjectMeta, + } +} diff --git a/internal/controller/reporting_test.go b/internal/controller/reporting_test.go new file mode 100644 index 0000000..b4073f2 --- /dev/null +++ b/internal/controller/reporting_test.go @@ -0,0 +1,112 @@ +package controller + +import ( + "context" + "testing" + "time" + + "github.com/github/deployment-tracker/pkg/deploymentrecord" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRecordContainer_UnknownArtifactCachePopulatedOn404(t *testing.T) { + t.Parallel() + digest := "sha256:unknown404digest" + poster := &mockPoster{ + lastErr: &deploymentrecord.NoArtifactError{}, + } + ctrl := newTestController(poster) + pod, container := testPod(digest) + + // First call should hit the API and get a 404 + err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) + require.NoError(t, err) + assert.Equal(t, 1, poster.getCalls()) + + // Digest should now be in the unknown artifacts cache + _, exists := ctrl.unknownArtifacts.Get(digest) + assert.True(t, exists, "digest should be cached after 404") +} + +func TestRecordContainer_UnknownArtifactCacheSkipsAPICall(t *testing.T) { + t.Parallel() + digest := "sha256:cacheddigest" + poster := &mockPoster{ + lastErr: &deploymentrecord.NoArtifactError{}, + } + ctrl := newTestController(poster) + pod, container := testPod(digest) + + // First call — API returns 404, populates cache + err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) + require.NoError(t, err) + assert.Equal(t, 1, poster.getCalls()) + + // Second call — should be served from cache, no API call + err = ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) + require.NoError(t, err) + assert.Equal(t, 1, poster.getCalls(), "API should not be called for cached unknown artifact") +} + +func TestRecordContainer_UnknownArtifactCacheAppliesToDecommission(t *testing.T) { + t.Parallel() + digest := "sha256:decommission404" + poster := &mockPoster{ + lastErr: &deploymentrecord.NoArtifactError{}, + } + ctrl := newTestController(poster) + pod, container := testPod(digest) + + // Deploy call — 404, populates cache + err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) + require.NoError(t, err) + assert.Equal(t, 1, poster.getCalls()) + + // Decommission call for same digest — should skip API + err = ctrl.recordContainer(context.Background(), pod, container, EventDeleted, "test-deployment", nil) + require.NoError(t, err) + assert.Equal(t, 1, poster.getCalls(), "decommission should also be skipped for cached unknown artifact") +} + +func TestRecordContainer_UnknownArtifactCacheExpires(t *testing.T) { + t.Parallel() + digest := "sha256:expiringdigest" + poster := &mockPoster{ + lastErr: &deploymentrecord.NoArtifactError{}, + } + ctrl := newTestController(poster) + pod, container := testPod(digest) + + // Seed the cache with a very short TTL to test expiry + ctrl.unknownArtifacts.Set(digest, true, 50*time.Millisecond) + + // Immediately — should be cached + err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) + require.NoError(t, err) + assert.Equal(t, 0, poster.getCalls(), "should skip API while cached") + + // Wait for expiry + time.Sleep(100 * time.Millisecond) + + // After expiry — should call API again + err = ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) + require.NoError(t, err) + assert.Equal(t, 1, poster.getCalls(), "should call API after cache expiry") +} + +func TestRecordContainer_SuccessfulPostDoesNotPopulateUnknownCache(t *testing.T) { + t.Parallel() + digest := "sha256:knowndigest" + poster := &mockPoster{lastErr: nil} // success + ctrl := newTestController(poster) + pod, container := testPod(digest) + + err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) + require.NoError(t, err) + assert.Equal(t, 1, poster.getCalls()) + + // Digest should NOT be in the unknown artifacts cache + _, exists := ctrl.unknownArtifacts.Get(digest) + assert.False(t, exists, "successful post should not cache digest as unknown") +} diff --git a/internal/workload/workload.go b/internal/workload/workload.go new file mode 100644 index 0000000..f6a38e1 --- /dev/null +++ b/internal/workload/workload.go @@ -0,0 +1,293 @@ +package workload + +import ( + "log/slog" + "strings" + + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + appslisters "k8s.io/client-go/listers/apps/v1" + batchlisters "k8s.io/client-go/listers/batch/v1" +) + +// Identity describes the top-level workload that owns a pod. +type Identity struct { + Name string + Kind string // "Deployment", "DaemonSet", "StatefulSet", "CronJob", or "Job" +} + +// Resolver resolves pod ownership and checks workload liveness +// using Kubernetes informer caches. +type Resolver struct { + deploymentLister appslisters.DeploymentLister + daemonSetLister appslisters.DaemonSetLister + statefulSetLister appslisters.StatefulSetLister + jobLister batchlisters.JobLister + cronJobLister batchlisters.CronJobLister +} + +// NewResolver creates a new Resolver with the given Kubernetes listers. +func NewResolver( + deploymentLister appslisters.DeploymentLister, + daemonSetLister appslisters.DaemonSetLister, + statefulSetLister appslisters.StatefulSetLister, + jobLister batchlisters.JobLister, + cronJobLister batchlisters.CronJobLister, +) *Resolver { + return &Resolver{ + deploymentLister: deploymentLister, + daemonSetLister: daemonSetLister, + statefulSetLister: statefulSetLister, + jobLister: jobLister, + cronJobLister: cronJobLister, + } +} + +// Resolve returns the top-level workload that owns the pod. +func (r *Resolver) Resolve(pod *corev1.Pod) Identity { + // Check for Deployment (via ReplicaSet) + if dn := GetDeploymentName(pod); dn != "" { + return Identity{Name: dn, Kind: "Deployment"} + } + + // Check for DaemonSet (direct ownership) + if dsn := GetDaemonSetName(pod); dsn != "" { + return Identity{Name: dsn, Kind: "DaemonSet"} + } + + // Check for StatefulSet (direct ownership) + if ssn := GetStatefulSetName(pod); ssn != "" { + return Identity{Name: ssn, Kind: "StatefulSet"} + } + + // Check for Job + jobName := GetJobOwnerName(pod) + if jobName == "" { + return Identity{} + } + + return r.resolveJobWorkload(pod.Namespace, jobName) +} + +// IsActive checks if the parent workload for a pod still exists +// in the local informer cache. +func (r *Resolver) IsActive(namespace string, ref Identity) bool { + switch ref.Kind { + case "Deployment": + return r.deploymentExists(namespace, ref.Name) + case "DaemonSet": + return r.daemonSetExists(namespace, ref.Name) + case "StatefulSet": + return r.statefulSetExists(namespace, ref.Name) + case "CronJob": + return r.cronJobExists(namespace, ref.Name) + case "Job": + return r.jobExists(namespace, ref.Name) + default: + return false + } +} + +// resolveJobWorkload determines whether a Job is owned by a CronJob or is standalone. +func (r *Resolver) resolveJobWorkload(namespace, jobName string) Identity { + // Try to look up the Job to check for CronJob ownership + if r.jobLister != nil { + job, err := r.jobLister.Jobs(namespace).Get(jobName) + if err == nil { + for _, owner := range job.OwnerReferences { + if owner.Kind == "CronJob" { + return Identity{Name: owner.Name, Kind: "CronJob"} + } + } + return Identity{Name: jobName, Kind: "Job"} + } + } + + // Job not found in cache - try CronJob name derivation as fallback. + // CronJob-created Jobs follow the naming pattern: - + // where the suffix is always numeric. We validate the suffix is all digits to + // reduce false matches from standalone Jobs that coincidentally share a prefix + // with an existing CronJob. A residual false positive is still possible if a + // standalone Job is named exactly -, but the primary path + // (checking Job OwnerReferences) handles the common case; this fallback only + // fires when the Job has already been garbage-collected. + if r.cronJobLister != nil { + lastDash := strings.LastIndex(jobName, "-") + if lastDash > 0 { + suffix := jobName[lastDash+1:] + if isNumeric(suffix) { + potentialCronJobName := jobName[:lastDash] + if r.cronJobExists(namespace, potentialCronJobName) { + return Identity{Name: potentialCronJobName, Kind: "CronJob"} + } + } + } + } + + // Standalone Job (possibly already deleted) + return Identity{Name: jobName, Kind: "Job"} +} + +func (r *Resolver) deploymentExists(namespace, name string) bool { + _, err := r.deploymentLister.Deployments(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + slog.Warn("Failed to check if deployment exists in cache, assuming it does", + "namespace", namespace, + "deployment", name, + "error", err, + ) + return true + } + return true +} + +func (r *Resolver) jobExists(namespace, name string) bool { + _, err := r.jobLister.Jobs(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + slog.Warn("Failed to check if job exists in cache, assuming it does", + "namespace", namespace, + "job", name, + "error", err, + ) + return true + } + return true +} + +func (r *Resolver) daemonSetExists(namespace, name string) bool { + _, err := r.daemonSetLister.DaemonSets(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + slog.Warn("Failed to check if daemonset exists in cache, assuming it does", + "namespace", namespace, + "daemonset", name, + "error", err, + ) + return true + } + return true +} + +func (r *Resolver) statefulSetExists(namespace, name string) bool { + _, err := r.statefulSetLister.StatefulSets(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + slog.Warn("Failed to check if statefulset exists in cache, assuming it does", + "namespace", namespace, + "statefulset", name, + "error", err, + ) + return true + } + return true +} + +func (r *Resolver) cronJobExists(namespace, name string) bool { + _, err := r.cronJobLister.CronJobs(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + slog.Warn("Failed to check if cronjob exists in cache, assuming it does", + "namespace", namespace, + "cronjob", name, + "error", err, + ) + return true + } + return true +} + +// HasSupportedOwner returns true if the pod is owned by a supported +// workload controller (ReplicaSet for Deployments, DaemonSet, StatefulSet, or Job for Jobs/CronJobs). +func HasSupportedOwner(pod *corev1.Pod) bool { + for _, owner := range pod.OwnerReferences { + if owner.Kind == "ReplicaSet" || owner.Kind == "DaemonSet" || owner.Kind == "StatefulSet" || owner.Kind == "Job" { + return true + } + } + return false +} + +// IsTerminalPhase returns true if the pod has reached a terminal phase +// (Succeeded or Failed). Used to catch short-lived Job pods that complete +// before the controller observes them in the Running phase. +func IsTerminalPhase(pod *corev1.Pod) bool { + return pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed +} + +// GetDeploymentName returns the deployment name for a pod, if it belongs +// to one. +func GetDeploymentName(pod *corev1.Pod) string { + // Pods created by Deployments are owned by ReplicaSets + // The ReplicaSet name follows the pattern: - + for _, owner := range pod.OwnerReferences { + if owner.Kind == "ReplicaSet" { + // Extract deployment name by removing the hash suffix + // ReplicaSet name format: - + rsName := owner.Name + lastDash := strings.LastIndex(rsName, "-") + if lastDash > 0 { + return rsName[:lastDash] + } + return rsName + } + } + return "" +} + +// GetJobOwnerName returns the Job name from the pod's owner references, +// if the pod is owned by a Job. +func GetJobOwnerName(pod *corev1.Pod) string { + for _, owner := range pod.OwnerReferences { + if owner.Kind == "Job" { + return owner.Name + } + } + return "" +} + +// GetDaemonSetName returns the DaemonSet name for a pod, if it belongs +// to one. DaemonSet pods are owned directly by the DaemonSet. +func GetDaemonSetName(pod *corev1.Pod) string { + for _, owner := range pod.OwnerReferences { + if owner.Kind == "DaemonSet" { + return owner.Name + } + } + return "" +} + +// GetStatefulSetName returns the StatefulSet name for a pod, if it belongs +// to one. StatefulSet pods are owned directly by the StatefulSet. +func GetStatefulSetName(pod *corev1.Pod) string { + for _, owner := range pod.OwnerReferences { + if owner.Kind == "StatefulSet" { + return owner.Name + } + } + return "" +} + +// isNumeric returns true if s is non-empty and consists entirely of ASCII digits. +func isNumeric(s string) bool { + if s == "" { + return false + } + for _, c := range s { + if c < '0' || c > '9' { + return false + } + } + return true +} diff --git a/internal/workload/workload_test.go b/internal/workload/workload_test.go new file mode 100644 index 0000000..cb526c0 --- /dev/null +++ b/internal/workload/workload_test.go @@ -0,0 +1,414 @@ +package workload + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestHasSupportedOwner(t *testing.T) { + t.Parallel() + tests := []struct { + name string + pod *corev1.Pod + expected bool + }{ + { + name: "pod owned by ReplicaSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicaSet", + Name: "test-rs-abc123", + }}, + }, + }, + expected: true, + }, + { + name: "pod owned by Job", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Job", + Name: "test-job", + }}, + }, + }, + expected: true, + }, + { + name: "pod with no owner", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + }, + expected: false, + }, + { + name: "pod owned by DaemonSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "DaemonSet", + Name: "test-ds", + }}, + }, + }, + expected: true, + }, + { + name: "pod owned by StatefulSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "StatefulSet", + Name: "test-ss", + }}, + }, + }, + expected: true, + }, + { + name: "pod owned by ReplicationController", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicationController", + Name: "test-rc", + }}, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result := HasSupportedOwner(tt.pod) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetJobOwnerName(t *testing.T) { + t.Parallel() + tests := []struct { + name string + pod *corev1.Pod + expected string + }{ + { + name: "pod owned by Job", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Job", + Name: "my-job", + }}, + }, + }, + expected: "my-job", + }, + { + name: "pod not owned by Job", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicaSet", + Name: "my-rs-abc123", + }}, + }, + }, + expected: "", + }, + { + name: "pod with no owner", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + }, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result := GetJobOwnerName(tt.pod) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetDeploymentName(t *testing.T) { + t.Parallel() + tests := []struct { + name string + pod *corev1.Pod + expected string + }{ + { + name: "pod owned by ReplicaSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicaSet", + Name: "my-deployment-abc123", + }}, + }, + }, + expected: "my-deployment", + }, + { + name: "pod not owned by ReplicaSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "DaemonSet", + Name: "my-ds", + }}, + }, + }, + expected: "", + }, + { + name: "pod with no owner", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + }, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result := GetDeploymentName(tt.pod) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetDaemonSetName(t *testing.T) { + t.Parallel() + tests := []struct { + name string + pod *corev1.Pod + expected string + }{ + { + name: "pod owned by DaemonSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "DaemonSet", + Name: "my-daemonset", + }}, + }, + }, + expected: "my-daemonset", + }, + { + name: "pod not owned by DaemonSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicaSet", + Name: "my-rs-abc123", + }}, + }, + }, + expected: "", + }, + { + name: "pod with no owner", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + }, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result := GetDaemonSetName(tt.pod) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetStatefulSetName(t *testing.T) { + t.Parallel() + tests := []struct { + name string + pod *corev1.Pod + expected string + }{ + { + name: "pod owned by StatefulSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "StatefulSet", + Name: "my-statefulset", + }}, + }, + }, + expected: "my-statefulset", + }, + { + name: "pod not owned by StatefulSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicaSet", + Name: "my-rs-abc123", + }}, + }, + }, + expected: "", + }, + { + name: "pod with no owner", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + }, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result := GetStatefulSetName(tt.pod) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestIsTerminalPhase(t *testing.T) { + t.Parallel() + tests := []struct { + name string + phase corev1.PodPhase + expected bool + }{ + {"Succeeded", corev1.PodSucceeded, true}, + {"Failed", corev1.PodFailed, true}, + {"Running", corev1.PodRunning, false}, + {"Pending", corev1.PodPending, false}, + {"Unknown", corev1.PodUnknown, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + pod := &corev1.Pod{ + Status: corev1.PodStatus{Phase: tt.phase}, + } + assert.Equal(t, tt.expected, IsTerminalPhase(pod)) + }) + } +} + +func TestIsNumeric(t *testing.T) { + t.Parallel() + tests := []struct { + input string + expected bool + }{ + {"28485120", true}, + {"0", true}, + {"123456789", true}, + {"", false}, + {"abc", false}, + {"123abc", false}, + {"12-34", false}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.expected, isNumeric(tt.input)) + }) + } +} + +func TestResolve_Deployment(t *testing.T) { + t.Parallel() + r := &Resolver{} + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicaSet", + Name: "my-deployment-abc123", + }}, + }, + } + ref := r.Resolve(pod) + assert.Equal(t, "my-deployment", ref.Name) + assert.Equal(t, "Deployment", ref.Kind) +} + +func TestResolve_DaemonSet(t *testing.T) { + t.Parallel() + r := &Resolver{} + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "DaemonSet", + Name: "my-daemonset", + }}, + }, + } + ref := r.Resolve(pod) + assert.Equal(t, "my-daemonset", ref.Name) + assert.Equal(t, "DaemonSet", ref.Kind) +} + +func TestResolve_StatefulSet(t *testing.T) { + t.Parallel() + r := &Resolver{} + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "StatefulSet", + Name: "my-statefulset", + }}, + }, + } + ref := r.Resolve(pod) + assert.Equal(t, "my-statefulset", ref.Name) + assert.Equal(t, "StatefulSet", ref.Kind) +} + +func TestResolve_StandaloneJob(t *testing.T) { + t.Parallel() + // With nil listers, resolveJobWorkload falls back to standalone Job + r := &Resolver{} + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Job", + Name: "my-standalone-job", + }}, + }, + } + ref := r.Resolve(pod) + assert.Equal(t, "my-standalone-job", ref.Name) + assert.Equal(t, "Job", ref.Kind) +} + +func TestResolve_NoOwner(t *testing.T) { + t.Parallel() + r := &Resolver{} + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + } + ref := r.Resolve(pod) + assert.Empty(t, ref.Name) + assert.Empty(t, ref.Kind) +} From ffc019f366f76d2a72da9a6f4e4392b460e9857c Mon Sep 17 00:00:00 2001 From: Austin Beattie Date: Tue, 14 Apr 2026 22:46:04 -0700 Subject: [PATCH 2/3] fix: address lint and PR comment issues --- internal/controller/controller_test.go | 4 ++-- internal/workload/workload.go | 15 +++++++++++++++ internal/workload/workload_test.go | 17 +++++++++-------- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 6ef25dc..2bba0f9 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -44,11 +44,11 @@ func (m *mockPoster) getCalls() int { // mockResolver is a test double for the workloadResolver interface. type mockResolver struct{} -func (m *mockResolver) Resolve(pod *corev1.Pod) workload.Identity { +func (*mockResolver) Resolve(_ *corev1.Pod) workload.Identity { return workload.Identity{} } -func (m *mockResolver) IsActive(_ string, _ workload.Identity) bool { +func (*mockResolver) IsActive(_ string, _ workload.Identity) bool { return false } diff --git a/internal/workload/workload.go b/internal/workload/workload.go index f6a38e1..8712cb1 100644 --- a/internal/workload/workload.go +++ b/internal/workload/workload.go @@ -74,14 +74,29 @@ func (r *Resolver) Resolve(pod *corev1.Pod) Identity { func (r *Resolver) IsActive(namespace string, ref Identity) bool { switch ref.Kind { case "Deployment": + if r.deploymentLister == nil { + return false + } return r.deploymentExists(namespace, ref.Name) case "DaemonSet": + if r.daemonSetLister == nil { + return false + } return r.daemonSetExists(namespace, ref.Name) case "StatefulSet": + if r.statefulSetLister == nil { + return false + } return r.statefulSetExists(namespace, ref.Name) case "CronJob": + if r.cronJobLister == nil { + return false + } return r.cronJobExists(namespace, ref.Name) case "Job": + if r.jobLister == nil { + return false + } return r.jobExists(namespace, ref.Name) default: return false diff --git a/internal/workload/workload_test.go b/internal/workload/workload_test.go index cb526c0..93867e7 100644 --- a/internal/workload/workload_test.go +++ b/internal/workload/workload_test.go @@ -317,20 +317,21 @@ func TestIsTerminalPhase(t *testing.T) { func TestIsNumeric(t *testing.T) { t.Parallel() tests := []struct { + name string input string expected bool }{ - {"28485120", true}, - {"0", true}, - {"123456789", true}, - {"", false}, - {"abc", false}, - {"123abc", false}, - {"12-34", false}, + {"digits", "28485120", true}, + {"zero", "0", true}, + {"long digits", "123456789", true}, + {"empty string", "", false}, + {"letters", "abc", false}, + {"mixed", "123abc", false}, + {"hyphenated", "12-34", false}, } for _, tt := range tests { - t.Run(tt.input, func(t *testing.T) { + t.Run(tt.name, func(t *testing.T) { t.Parallel() assert.Equal(t, tt.expected, isNumeric(tt.input)) }) From 094c87ce1e3200148f2dc7e2c6dc84edb813669b Mon Sep 17 00:00:00 2001 From: Austin Beattie Date: Wed, 15 Apr 2026 11:23:46 -0700 Subject: [PATCH 3/3] fix: incorporate DeleteHandlingMetaNamespaceKeyFunc fix --- internal/controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index f1d353b..c80f063 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -269,7 +269,7 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato return } - key, err := cache.MetaNamespaceKeyFunc(obj) + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) // For our purposes, there are in practice // no error event we care about, so don't // bother with handling it.