Skip to content

Commit 7cde7dd

Browse files
authored
Add informer sync timeout (#76)
* add informer sync timeout Signed-off-by: Eric Pickard <piceri@github.com> * fix linting issue Signed-off-by: Eric Pickard <piceri@github.com> * minor fixes Signed-off-by: Eric Pickard <piceri@github.com> * address comments Signed-off-by: Eric Pickard <piceri@github.com> * fix linting Signed-off-by: Eric Pickard <piceri@github.com> --------- Signed-off-by: Eric Pickard <piceri@github.com>
1 parent 8a1993f commit 7cde7dd

File tree

2 files changed

+70
-2
lines changed

2 files changed

+70
-2
lines changed

internal/controller/controller.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ const (
3838
// deployment record API. Once an artifact is known to be missing,
3939
// we suppress further API calls for this duration.
4040
unknownArtifactTTL = 1 * time.Hour
41+
42+
// informerSyncTimeoutDuration is the maximum duration of time allowed
43+
// for the informers to sync to prevent the controller from hanging indefinitely.
44+
informerSyncTimeoutDuration = 60 * time.Second
4145
)
4246

4347
type ttlCache interface {
@@ -92,6 +96,9 @@ type Controller struct {
9296
// best effort cache to suppress API calls for artifacts that
9397
// returned a 404 (no artifact found). Keyed by image digest.
9498
unknownArtifacts ttlCache
99+
// informerSyncTimeout is the maximum time allowed for all informers to sync
100+
// and prevents sync from hanging indefinitely.
101+
informerSyncTimeout time.Duration
95102
}
96103

97104
// New creates a new deployment tracker controller.
@@ -160,6 +167,7 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato
160167
cfg: cfg,
161168
observedDeployments: amcache.NewExpiring(),
162169
unknownArtifacts: amcache.NewExpiring(),
170+
informerSyncTimeout: informerSyncTimeoutDuration,
163171
}
164172

165173
// Add event handlers to the informer
@@ -320,16 +328,23 @@ func (c *Controller) Run(ctx context.Context, workers int) error {
320328

321329
// Wait for the caches to be synced
322330
slog.Info("Waiting for informer caches to sync")
323-
if !cache.WaitForCacheSync(ctx.Done(),
331+
informerSyncCtx, cancel := context.WithTimeout(ctx, c.informerSyncTimeout)
332+
333+
if !cache.WaitForCacheSync(informerSyncCtx.Done(),
324334
c.podInformer.HasSynced,
325335
c.deploymentInformer.HasSynced,
326336
c.daemonSetInformer.HasSynced,
327337
c.statefulSetInformer.HasSynced,
328338
c.jobInformer.HasSynced,
329339
c.cronJobInformer.HasSynced,
330340
) {
331-
return errors.New("timed out waiting for caches to sync")
341+
cancel()
342+
if ctx.Err() != nil {
343+
return fmt.Errorf("cache sync interrupted: %w", ctx.Err())
344+
}
345+
return errors.New("timed out waiting for caches to sync - please ensure deployment tracker has the correct kubernetes permissions")
332346
}
347+
cancel()
333348

334349
slog.Info("Starting workers",
335350
"count", workers,

internal/controller/controller_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package controller
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"sync"
78
"testing"
@@ -12,7 +13,11 @@ import (
1213
"github.com/stretchr/testify/require"
1314
corev1 "k8s.io/api/core/v1"
1415
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16+
"k8s.io/apimachinery/pkg/runtime"
1517
amcache "k8s.io/apimachinery/pkg/util/cache"
18+
"k8s.io/client-go/kubernetes/fake"
19+
k8stesting "k8s.io/client-go/testing"
20+
"k8s.io/client-go/util/workqueue"
1621
)
1722

1823
// mockPoster records all PostOne calls and returns a configurable error.
@@ -533,3 +538,51 @@ func TestIsTerminalPhase(t *testing.T) {
533538
})
534539
}
535540
}
541+
542+
func TestRun_InformerSyncTimeout(t *testing.T) {
543+
t.Parallel()
544+
fakeClient := fake.NewSimpleClientset()
545+
blocker := make(chan struct{})
546+
fakeClient.PrependReactor("list", "*", func(_ k8stesting.Action) (bool, runtime.Object, error) {
547+
// Block until the test completes.
548+
<-blocker
549+
return true, nil, errors.New("fail")
550+
})
551+
defer close(blocker)
552+
553+
factory := createInformerFactory(fakeClient, "", "")
554+
555+
ctrl := &Controller{
556+
clientset: fakeClient,
557+
podInformer: factory.Core().V1().Pods().Informer(),
558+
deploymentInformer: factory.Apps().V1().Deployments().Informer(),
559+
daemonSetInformer: factory.Apps().V1().DaemonSets().Informer(),
560+
statefulSetInformer: factory.Apps().V1().StatefulSets().Informer(),
561+
jobInformer: factory.Batch().V1().Jobs().Informer(),
562+
cronJobInformer: factory.Batch().V1().CronJobs().Informer(),
563+
workqueue: workqueue.NewTypedRateLimitingQueue(
564+
workqueue.DefaultTypedControllerRateLimiter[PodEvent](),
565+
),
566+
apiClient: &mockPoster{},
567+
cfg: &Config{},
568+
observedDeployments: amcache.NewExpiring(),
569+
unknownArtifacts: amcache.NewExpiring(),
570+
informerSyncTimeout: 2 * time.Second,
571+
}
572+
573+
ctx, cancel := context.WithCancel(context.Background())
574+
defer cancel()
575+
576+
errCh := make(chan error, 1)
577+
go func() {
578+
errCh <- ctrl.Run(ctx, 1)
579+
}()
580+
581+
select {
582+
case err := <-errCh:
583+
require.Error(t, err)
584+
assert.Contains(t, err.Error(), "timed out waiting for caches to sync")
585+
case <-time.After(5 * time.Second):
586+
t.Fatal("Run did not return within 5 seconds — informer sync timeout was 2 seconds")
587+
}
588+
}

0 commit comments

Comments
 (0)