diff --git a/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala b/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala index 46bf05c5ac6..bfe8d925999 100644 --- a/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -40,7 +40,7 @@ import pekko.stream.testkit.Utils.TE * INTERNAL API */ @InternalApi -private[pekko] object NoMaterializer extends Materializer { +private[pekko] case class NoMaterializer(override val system: ActorSystem) extends Materializer { override def withNamePrefix(name: String): Materializer = throw new UnsupportedOperationException("NoMaterializer cannot be named") override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = @@ -70,9 +70,6 @@ private[pekko] object NoMaterializer extends Materializer { override def isShutdown: Boolean = throw new UnsupportedOperationException("NoMaterializer cannot shutdown") - override def system: ActorSystem = - throw new UnsupportedOperationException("NoMaterializer does not have an actorsystem") - override private[pekko] def logger = throw new UnsupportedOperationException("NoMaterializer does not have a logger") override private[pekko] def supervisor = @@ -336,7 +333,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { } _interpreter = new GraphInterpreter( - NoMaterializer, + NoMaterializer(system), logger, logics, connections, diff --git a/stream/src/main/resources/reference.conf b/stream/src/main/resources/reference.conf index e2ba88232c5..bba71993ec4 100644 --- a/stream/src/main/resources/reference.conf +++ b/stream/src/main/resources/reference.conf @@ -54,6 +54,11 @@ pekko { # Enable additional troubleshooting logging at DEBUG log level debug-logging = off + # Log any stream stage error at the specified log level: "error", "warning", "info", "debug" or "off". + # If there is a `pekko.stream.Attributes.LogLevels` attribute defined for a specific stream this value is ignored + # and the `onFailure` value of the attribute is applied instead. + stage-errors-default-log-level = error + # Maximum number of elements emitted in batch if downstream signals large demand output-burst-limit = 1000 diff --git a/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala b/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala index 838eedc52b4..4f6d346c314 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala @@ -26,6 +26,7 @@ import scala.reflect.{ classTag, ClassTag } import scala.util.control.NonFatal import org.apache.pekko +import pekko.actor.ActorSystem import pekko.annotation.ApiMayChange import pekko.annotation.DoNotInherit import pekko.annotation.InternalApi @@ -33,6 +34,7 @@ import pekko.event.Logging import pekko.japi.function import pekko.stream.impl.TraversalBuilder import pekko.util.ByteString +import pekko.util.Helpers import pekko.util.LineNumbers /** @@ -720,6 +722,23 @@ object Attributes { /** Use to enable logging at DEBUG level for certain operations when configuring [[Attributes#logLevels]] */ final val Debug: Logging.LogLevel = Logging.DebugLevel + + /** INTERNAL API */ + @InternalApi + private[pekko] def defaultErrorLevel(system: ActorSystem): Logging.LogLevel = + fromString(system.settings.config.getString("pekko.stream.materializer.stage-errors-default-log-level")) + + /** INTERNAL API */ + @InternalApi + private[pekko] def fromString(str: String): Logging.LogLevel = { + Helpers.toRootLowerCase(str) match { + case "off" => Off + case "error" => Error + case "warning" => Warning + case "info" => Info + case "debug" => Debug + } + } } /** Java API: Use to disable logging on certain operations when configuring [[Attributes#createLogLevels]] */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala index 43a05d946b6..df3a36a39ec 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala @@ -22,6 +22,7 @@ import org.apache.pekko import pekko.Done import pekko.actor.ActorRef import pekko.annotation.{ InternalApi, InternalStableApi } +import pekko.event.Logging import pekko.event.LoggingAdapter import pekko.stream._ import pekko.stream.Attributes.LogLevels @@ -244,6 +245,8 @@ import pekko.stream.stage._ private[this] val finalizedMark = Array.fill(logics.length)(false) private[this] var _subFusingMaterializer: Materializer = _ + private[this] lazy val defaultErrorReportingLogLevel = LogLevels.defaultErrorLevel(materializer.system) + def subFusingMaterializer: Materializer = _subFusingMaterializer // An event queue implemented as a circular buffer @@ -378,12 +381,21 @@ import pekko.stream.stage._ def reportStageError(e: Throwable): Unit = { if (activeStage eq null) throw e else { - val loggingEnabled = activeStage.attributes.get[LogLevels] match { - case Some(levels) => levels.onFailure != LogLevels.Off - case None => true + val logAt: Logging.LogLevel = activeStage.attributes.get[LogLevels] match { + case Some(levels) => levels.onFailure + case None => defaultErrorReportingLogLevel + } + logAt match { + case Logging.ErrorLevel => + log.error(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage) + case Logging.WarningLevel => + log.warning(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage) + case Logging.InfoLevel => + log.info("Error in stage [{}]: {}", activeStage.toString, e.getMessage) + case Logging.DebugLevel => + log.debug("Error in stage [{}]: {}", activeStage.toString, e.getMessage) + case _ => // Off, nop } - if (loggingEnabled) - log.error(e, "Error in stage [{}]: {}", activeStage.toString, e.getMessage) activeStage.failStage(e) // Abort chasing diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala index 8a90aa32125..65f2ff9a4c7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala @@ -167,7 +167,9 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( private def loggingEnabled = inheritedAttributes.get[LogLevels] match { case Some(levels) => levels.onFailure != LogLevels.Off - case None => true + case None => + // Allows for system wide disable at least + LogLevels.defaultErrorLevel(materializer.system) != LogLevels.Off } /**