diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/StageErrorLogThrottleSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/StageErrorLogThrottleSpec.scala new file mode 100644 index 00000000000..e49155e0210 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/StageErrorLogThrottleSpec.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.stream.impl.fusing + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.apache.pekko +import pekko.stream._ +import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import pekko.stream.scaladsl._ +import pekko.stream.stage.GraphStageLogic +import pekko.stream.stage.InHandler +import pekko.stream.stage.OutHandler +import pekko.stream.testkit.StreamSpec +import pekko.testkit.EventFilter + +class StageErrorLogThrottleSpec extends StreamSpec(""" + pekko.stream.materializer.stage-errors-log-throttle-period = 2s + pekko.loglevel = DEBUG + """) { + + private def mkFailingStage: SimpleLinearGraphStage[Int] = new SimpleLinearGraphStage[Int] { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + setHandlers(in, out, this) + override def onPush(): Unit = throw new IllegalArgumentException("test failure") + override def onPull(): Unit = pull(in) + } + } + + "Stage error log throttling" must { + + "suppress repeated errors and flush count on stream finish" in { + // Use Broadcast to fan-out to 5 independent failing stages sharing one interpreter. + // Only the first error should be logged; the remaining 4 are suppressed by throttling. + // The disabled test below verifies all 5 errors log without throttling. + EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 1).intercept { + val done = RunnableGraph + .fromGraph(GraphDSL.createGraph(Sink.ignore) { implicit b => sink => + import GraphDSL.Implicits._ + val bcast = b.add(Broadcast[Int](5, eagerCancel = false)) + Source.single(1) ~> bcast + bcast.out(0) ~> b.add(mkFailingStage) ~> sink + for (i <- 1 until 5) { + bcast.out(i) ~> b.add(mkFailingStage) ~> b.add(Sink.ignore) + } + ClosedShape + }) + .run() + Await.ready(done, 3.seconds) + } + } + + "always log a single error even with throttling enabled" in { + EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 1).intercept { + val result = Source.single(1).via(mkFailingStage).runWith(Sink.ignore) + Await.ready(result, 3.seconds) + } + } + } +} + +class StageErrorLogThrottleDisabledSpec extends StreamSpec(""" + pekko.stream.materializer.stage-errors-log-throttle-period = off + """) { + + private def mkFailingStage: SimpleLinearGraphStage[Int] = new SimpleLinearGraphStage[Int] { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + setHandlers(in, out, this) + override def onPush(): Unit = throw new IllegalArgumentException("test failure") + override def onPull(): Unit = pull(in) + } + } + + "Stage error log throttling when disabled" must { + + "log every error individually without suppression" in { + // With throttling disabled, each stage error should be logged individually. + // Using 5 parallel failing stages via Broadcast, all 5 errors should appear. + EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 5).intercept { + val done = RunnableGraph + .fromGraph(GraphDSL.createGraph(Sink.ignore) { implicit b => sink => + import GraphDSL.Implicits._ + val bcast = b.add(Broadcast[Int](5, eagerCancel = false)) + Source.single(1) ~> bcast + bcast.out(0) ~> b.add(mkFailingStage) ~> sink + for (i <- 1 until 5) { + bcast.out(i) ~> b.add(mkFailingStage) ~> b.add(Sink.ignore) + } + ClosedShape + }) + .run() + Await.ready(done, 3.seconds) + } + } + + "log single error without suppression warning" in { + EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 1).intercept { + val result = Source.single(1).via(mkFailingStage).runWith(Sink.ignore) + Await.ready(result, 3.seconds) + } + } + } +} diff --git a/stream/src/main/resources/reference.conf b/stream/src/main/resources/reference.conf index e2ba88232c5..ff40f9bfd2e 100644 --- a/stream/src/main/resources/reference.conf +++ b/stream/src/main/resources/reference.conf @@ -51,6 +51,12 @@ pekko { timeout = 5s } + # Period for throttling stage error log messages. When set, only the first + # error within this period is logged; subsequent errors are counted and + # a summary is logged when the period expires. Set to "off" to disable + # throttling (every error is logged individually). + stage-errors-log-throttle-period = off + # Enable additional troubleshooting logging at DEBUG log level debug-logging = off diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala index 43a05d946b6..ac44f15ca78 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala @@ -13,7 +13,7 @@ package org.apache.pekko.stream.impl.fusing -import java.util.concurrent.ThreadLocalRandom +import java.util.concurrent.{ ThreadLocalRandom, TimeUnit } import scala.concurrent.Promise import scala.util.control.NonFatal @@ -246,6 +246,26 @@ import pekko.stream.stage._ private[this] var _subFusingMaterializer: Materializer = _ def subFusingMaterializer: Materializer = _subFusingMaterializer + // Throttle state for stage error logging + private[this] lazy val throttlePeriodNanos: Long = { + try { + val config = materializer.system.settings.config + val path = "pekko.stream.materializer.stage-errors-log-throttle-period" + val value = config.getString(path) + if (pekko.util.Helpers.toRootLowerCase(value) == "off") 0L + else { + val nanos = config.getDuration(path, TimeUnit.NANOSECONDS) + require(nanos > 0L, s"$path must be a positive duration or 'off', got: $value") + nanos + } + } catch { + case _: UnsupportedOperationException => 0L // NoMaterializer in tests + } + } + private[this] var lastErrorLogTime: Long = 0L + private[this] var errorLogInitialized: Boolean = false + private[this] var suppressedErrorCount: Int = 0 + // An event queue implemented as a circular buffer // FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue private[this] val eventQueue = new Array[Connection](1 << (32 - Integer.numberOfLeadingZeros(connections.length - 1))) @@ -332,6 +352,12 @@ import pekko.stream.stage._ * Finalizes the state of all operators by calling postStop() (if necessary). */ def finish(): Unit = { + // Flush any suppressed error count so operators know errors were throttled + if (suppressedErrorCount > 0) { + log.warning("{} additional stage error(s) were suppressed during throttle period", + suppressedErrorCount) + suppressedErrorCount = 0 + } var i = 0 while (i < logics.length) { val logic = logics(i) @@ -382,8 +408,27 @@ import pekko.stream.stage._ case Some(levels) => levels.onFailure != LogLevels.Off case None => true } - if (loggingEnabled) - log.error(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage) + if (loggingEnabled) { + val shouldLog = if (throttlePeriodNanos > 0L) { + val now = System.nanoTime() + if (!errorLogInitialized || now - lastErrorLogTime >= throttlePeriodNanos) { + if (suppressedErrorCount > 0) { + log.warning("{} additional stage error(s) were suppressed during throttle period", + suppressedErrorCount) + } + errorLogInitialized = true + lastErrorLogTime = now + suppressedErrorCount = 0 + true + } else { + suppressedErrorCount += 1 + false + } + } else true + + if (shouldLog) + log.error(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage) + } activeStage.failStage(e) // Abort chasing