Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.stream.impl.fusing

import scala.concurrent.Await
import scala.concurrent.duration._

import org.apache.pekko
import pekko.stream._
import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import pekko.stream.scaladsl._
import pekko.stream.stage.GraphStageLogic
import pekko.stream.stage.InHandler
import pekko.stream.stage.OutHandler
import pekko.stream.testkit.StreamSpec
import pekko.testkit.EventFilter

class StageErrorLogThrottleSpec extends StreamSpec("""
pekko.stream.materializer.stage-errors-log-throttle-period = 2s
pekko.loglevel = DEBUG
""") {

private def mkFailingStage: SimpleLinearGraphStage[Int] = new SimpleLinearGraphStage[Int] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
override def onPush(): Unit = throw new IllegalArgumentException("test failure")
override def onPull(): Unit = pull(in)
}
}

"Stage error log throttling" must {

"suppress repeated errors and flush count on stream finish" in {
// Use Broadcast to fan-out to 5 independent failing stages sharing one interpreter.
// Only the first error should be logged; the remaining 4 are suppressed by throttling.
// The disabled test below verifies all 5 errors log without throttling.
EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 1).intercept {
val done = RunnableGraph
.fromGraph(GraphDSL.createGraph(Sink.ignore) { implicit b => sink =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Int](5, eagerCancel = false))
Source.single(1) ~> bcast
bcast.out(0) ~> b.add(mkFailingStage) ~> sink
for (i <- 1 until 5) {
bcast.out(i) ~> b.add(mkFailingStage) ~> b.add(Sink.ignore)
}
ClosedShape
})
.run()
Await.ready(done, 3.seconds)
}
}

"always log a single error even with throttling enabled" in {
EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 1).intercept {
val result = Source.single(1).via(mkFailingStage).runWith(Sink.ignore)
Await.ready(result, 3.seconds)
}
}
}
}

class StageErrorLogThrottleDisabledSpec extends StreamSpec("""
pekko.stream.materializer.stage-errors-log-throttle-period = off
""") {

private def mkFailingStage: SimpleLinearGraphStage[Int] = new SimpleLinearGraphStage[Int] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
override def onPush(): Unit = throw new IllegalArgumentException("test failure")
override def onPull(): Unit = pull(in)
}
}

"Stage error log throttling when disabled" must {

"log every error individually without suppression" in {
// With throttling disabled, each stage error should be logged individually.
// Using 5 parallel failing stages via Broadcast, all 5 errors should appear.
EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 5).intercept {
val done = RunnableGraph
.fromGraph(GraphDSL.createGraph(Sink.ignore) { implicit b => sink =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Int](5, eagerCancel = false))
Source.single(1) ~> bcast
bcast.out(0) ~> b.add(mkFailingStage) ~> sink
for (i <- 1 until 5) {
bcast.out(i) ~> b.add(mkFailingStage) ~> b.add(Sink.ignore)
}
ClosedShape
})
.run()
Await.ready(done, 3.seconds)
}
}

"log single error without suppression warning" in {
EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 1).intercept {
val result = Source.single(1).via(mkFailingStage).runWith(Sink.ignore)
Await.ready(result, 3.seconds)
}
}
}
}
6 changes: 6 additions & 0 deletions stream/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ pekko {
timeout = 5s
}

# Period for throttling stage error log messages. When set, only the first
# error within this period is logged; subsequent errors are counted and
# a summary is logged when the period expires. Set to "off" to disable
# throttling (every error is logged individually).
stage-errors-log-throttle-period = off

# Enable additional troubleshooting logging at DEBUG log level
debug-logging = off

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package org.apache.pekko.stream.impl.fusing

import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.{ ThreadLocalRandom, TimeUnit }

import scala.concurrent.Promise
import scala.util.control.NonFatal
Expand Down Expand Up @@ -246,6 +246,26 @@ import pekko.stream.stage._
private[this] var _subFusingMaterializer: Materializer = _
def subFusingMaterializer: Materializer = _subFusingMaterializer

// Throttle state for stage error logging
private[this] lazy val throttlePeriodNanos: Long = {
try {
val config = materializer.system.settings.config
val path = "pekko.stream.materializer.stage-errors-log-throttle-period"
val value = config.getString(path)
if (pekko.util.Helpers.toRootLowerCase(value) == "off") 0L
else {
val nanos = config.getDuration(path, TimeUnit.NANOSECONDS)
require(nanos > 0L, s"$path must be a positive duration or 'off', got: $value")
nanos
}
} catch {
case _: UnsupportedOperationException => 0L // NoMaterializer in tests
}
}
private[this] var lastErrorLogTime: Long = 0L
private[this] var errorLogInitialized: Boolean = false
private[this] var suppressedErrorCount: Int = 0

// An event queue implemented as a circular buffer
// FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue
private[this] val eventQueue = new Array[Connection](1 << (32 - Integer.numberOfLeadingZeros(connections.length - 1)))
Expand Down Expand Up @@ -332,6 +352,12 @@ import pekko.stream.stage._
* Finalizes the state of all operators by calling postStop() (if necessary).
*/
def finish(): Unit = {
// Flush any suppressed error count so operators know errors were throttled
if (suppressedErrorCount > 0) {
log.warning("{} additional stage error(s) were suppressed during throttle period",
suppressedErrorCount)
suppressedErrorCount = 0
}
var i = 0
while (i < logics.length) {
val logic = logics(i)
Expand Down Expand Up @@ -382,8 +408,27 @@ import pekko.stream.stage._
case Some(levels) => levels.onFailure != LogLevels.Off
case None => true
}
if (loggingEnabled)
log.error(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage)
if (loggingEnabled) {
val shouldLog = if (throttlePeriodNanos > 0L) {
val now = System.nanoTime()
if (!errorLogInitialized || now - lastErrorLogTime >= throttlePeriodNanos) {
if (suppressedErrorCount > 0) {
log.warning("{} additional stage error(s) were suppressed during throttle period",
suppressedErrorCount)
}
errorLogInitialized = true
lastErrorLogTime = now
suppressedErrorCount = 0
true
} else {
suppressedErrorCount += 1
false
}
} else true

if (shouldLog)
log.error(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage)
}
activeStage.failStage(e)

// Abort chasing
Expand Down
Loading