Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import pekko.stream.testkit.Utils.TE
* INTERNAL API
*/
@InternalApi
private[pekko] object NoMaterializer extends Materializer {
private[pekko] case class NoMaterializer(override val system: ActorSystem) extends Materializer {
override def withNamePrefix(name: String): Materializer =
throw new UnsupportedOperationException("NoMaterializer cannot be named")
override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat =
Expand Down Expand Up @@ -70,9 +70,6 @@ private[pekko] object NoMaterializer extends Materializer {

override def isShutdown: Boolean = throw new UnsupportedOperationException("NoMaterializer cannot shutdown")

override def system: ActorSystem =
throw new UnsupportedOperationException("NoMaterializer does not have an actorsystem")

override private[pekko] def logger = throw new UnsupportedOperationException("NoMaterializer does not have a logger")

override private[pekko] def supervisor =
Expand Down Expand Up @@ -336,7 +333,7 @@ trait GraphInterpreterSpecKit extends StreamSpec {
}

_interpreter = new GraphInterpreter(
NoMaterializer,
NoMaterializer(system),
logger,
logics,
connections,
Expand Down
5 changes: 5 additions & 0 deletions stream/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ pekko {
# Enable additional troubleshooting logging at DEBUG log level
debug-logging = off

# Log any stream stage error at the specified log level: "error", "warning", "info", "debug" or "off".
# If there is a `pekko.stream.Attributes.LogLevels` attribute defined for a specific stream this value is ignored
# and the `onFailure` value of the attribute is applied instead.
stage-errors-default-log-level = error

# Maximum number of elements emitted in batch if downstream signals large demand
output-burst-limit = 1000

Expand Down
19 changes: 19 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import scala.reflect.{ classTag, ClassTag }
import scala.util.control.NonFatal

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.annotation.ApiMayChange
import pekko.annotation.DoNotInherit
import pekko.annotation.InternalApi
import pekko.event.Logging
import pekko.japi.function
import pekko.stream.impl.TraversalBuilder
import pekko.util.ByteString
import pekko.util.Helpers
import pekko.util.LineNumbers

/**
Expand Down Expand Up @@ -720,6 +722,23 @@ object Attributes {

/** Use to enable logging at DEBUG level for certain operations when configuring [[Attributes#logLevels]] */
final val Debug: Logging.LogLevel = Logging.DebugLevel

/** INTERNAL API */
@InternalApi
private[pekko] def defaultErrorLevel(system: ActorSystem): Logging.LogLevel =
fromString(system.settings.config.getString("pekko.stream.materializer.stage-errors-default-log-level"))

/** INTERNAL API */
@InternalApi
private[pekko] def fromString(str: String): Logging.LogLevel = {
Helpers.toRootLowerCase(str) match {
case "off" => Off
case "error" => Error
case "warning" => Warning
case "info" => Info
case "debug" => Debug
}
}
}

/** Java API: Use to disable logging on certain operations when configuring [[Attributes#createLogLevels]] */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.pekko
import pekko.Done
import pekko.actor.ActorRef
import pekko.annotation.{ InternalApi, InternalStableApi }
import pekko.event.Logging
import pekko.event.LoggingAdapter
import pekko.stream._
import pekko.stream.Attributes.LogLevels
Expand Down Expand Up @@ -244,6 +245,8 @@ import pekko.stream.stage._
private[this] val finalizedMark = Array.fill(logics.length)(false)

private[this] var _subFusingMaterializer: Materializer = _
private[this] lazy val defaultErrorReportingLogLevel = LogLevels.defaultErrorLevel(materializer.system)

def subFusingMaterializer: Materializer = _subFusingMaterializer

// An event queue implemented as a circular buffer
Expand Down Expand Up @@ -378,12 +381,21 @@ import pekko.stream.stage._
def reportStageError(e: Throwable): Unit = {
if (activeStage eq null) throw e
else {
val loggingEnabled = activeStage.attributes.get[LogLevels] match {
case Some(levels) => levels.onFailure != LogLevels.Off
case None => true
val logAt: Logging.LogLevel = activeStage.attributes.get[LogLevels] match {
case Some(levels) => levels.onFailure
case None => defaultErrorReportingLogLevel
}
logAt match {
case Logging.ErrorLevel =>
log.error(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage)
case Logging.WarningLevel =>
log.warning(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage)
case Logging.InfoLevel =>
log.info("Error in stage [{}]: {}", activeStage.toString, e.getMessage)
case Logging.DebugLevel =>
log.debug("Error in stage [{}]: {}", activeStage.toString, e.getMessage)
case _ => // Off, nop
}
if (loggingEnabled)
log.error(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage)
activeStage.failStage(e)

// Abort chasing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ private abstract class RestartWithBackoffLogic[S <: Shape](

private def loggingEnabled = inheritedAttributes.get[LogLevels] match {
case Some(levels) => levels.onFailure != LogLevels.Off
case None => true
case None =>
// Allows for system wide disable at least
LogLevels.defaultErrorLevel(materializer.system) != LogLevels.Off
}

/**
Expand Down
Loading