diff --git a/pkg/leakybucket/manager_run.go b/pkg/leakybucket/manager_run.go index 29bbd7cdb88..1db8bfd7ff1 100644 --- a/pkg/leakybucket/manager_run.go +++ b/pkg/leakybucket/manager_run.go @@ -76,18 +76,14 @@ func PourItemToBucket( sigclosed := 0 failed_sent := 0 - attempts := 0 start := time.Now().UTC() - for { - attempts += 1 - /* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/ - if attempts%100000 == 0 && start.Add(100*time.Millisecond).Before(time.Now().UTC()) { - holder.logger.Warningf("stuck for %s sending event to %s (sigclosed:%d failed_sent:%d attempts:%d)", time.Since(start), - buckey, sigclosed, failed_sent, attempts) - } + // Warn if we're stuck for too long trying to pour + warnTicker := time.NewTicker(100 * time.Millisecond) + defer warnTicker.Stop() - /* check if leak routine is up */ + for { + // If bucket is dead, recreate and retry. select { case <-bucket.done: // the bucket was found and dead, get a new one and continue @@ -100,6 +96,8 @@ func PourItemToBucket( } continue // holder.logger.Tracef("Signal exists, try to pour :)") + case <-ctx.Done(): + return ctx.Err() default: // nothing to read, but not closed, try to pour // holder.logger.Tracef("Signal exists but empty, try to pour :)") @@ -131,7 +129,9 @@ func PourItemToBucket( } } } - // the bucket seems to be up & running + + // Block until we can send, or we learn it's dead/canceled, or we warn periodically. + select { case bucket.In <- parsed: // holder.logger.Tracef("Successfully sent !") @@ -141,11 +141,23 @@ func PourItemToBucket( } holder.logger.Debugf("bucket '%s' is poured", holder.Spec.Name) return nil + // XXX: bucket died while we were waiting to send. + // case <- bucket.done: + + case <-ctx.Done(): + return ctx.Err() + + case <-warnTicker.C: + // We are blocked because bucket.In isn't being read fast enough (or at all). + holder.logger.Warningf( + "stuck for %s sending event to %s (sigclosed:%d failed_sent:%d", + time.Since(start), + buckey, + sigclosed, + failed_sent, + ) + failed_sent++ default: - failed_sent += 1 - // holder.logger.Tracef("Failed to send, try again") - continue - } } }