From cac0f9ac3a7f49e6840edb7993c21747823d4d7d Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 28 Mar 2026 05:32:15 +0800 Subject: [PATCH] Fix stream IO dispatcher test failures Avoid creating custom materializers in tests that verify dispatcher assignment. Custom materializers are not covered by StreamSpec's afterEach cleanup, causing test pollution and failures. Changes: - Remove custom ActorMaterializer creation in FileSinkSpec, FileSourceSpec, and UnfoldResourceAsyncSourceSpec - Use SystemMaterializer(system).materializer to access the system materializer supervisor for dispatcher assertions - Remove separate ActorSystem creation in FileSourceSpec dispatcher tests (no longer needed with system materializer) - Clean up StreamSpec debug dump: replace non-functional stream supervisor query with printDebugDump helper, add stopAllChildren on failure for cleanup Upstream: akka/akka-core@145319d86d Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../pekko/stream/testkit/StreamSpec.scala | 42 +++++--------- .../apache/pekko/stream/io/FileSinkSpec.scala | 7 +-- .../pekko/stream/io/FileSourceSpec.scala | 56 ++++++++----------- .../UnfoldResourceAsyncSourceSpec.scala | 7 +-- 4 files changed, 43 insertions(+), 69 deletions(-) diff --git a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala index 6032e57c8f6..43c458f52d1 100644 --- a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala +++ b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala @@ -15,17 +15,14 @@ package org.apache.pekko.stream.testkit import java.util.concurrent.TimeUnit -import scala.concurrent.Future -import scala.concurrent.duration._ +import scala.concurrent.duration.FiniteDuration import org.apache.pekko -import pekko.actor.{ ActorRef, ActorSystem } +import pekko.actor.ActorSystem import pekko.stream.Materializer import pekko.stream.impl.PhasedFusingActorMaterializer -import pekko.stream.impl.StreamSupervisor -import pekko.stream.snapshot.{ MaterializerState, StreamSnapshotImpl } -import pekko.stream.testkit.scaladsl.StreamTestKit.{ assertNoChildren, stopAllChildren } -import pekko.testkit.{ PekkoSpec, TestProbe } +import pekko.stream.testkit.scaladsl.StreamTestKit.{ assertNoChildren, printDebugDump, stopAllChildren } +import pekko.testkit.PekkoSpec import pekko.testkit.TestKitUtils import org.scalatest.Failed @@ -49,31 +46,22 @@ abstract class StreamSpec(_system: ActorSystem) extends PekkoSpec(_system) with override def withFixture(test: NoArgTest) = { super.withFixture(test) match { case failed: Failed => - implicit val ec = system.dispatcher - val probe = TestProbe()(system) - // FIXME I don't think it always runs under /user anymore (typed) - // FIXME correction - I'm not sure this works at _all_ - supposed to dump stream state if test fails - val streamSupervisors = system.actorSelection("/user/" + StreamSupervisor.baseName + "*") - streamSupervisors.tell(StreamSupervisor.GetChildren, probe.ref) - val children: Seq[ActorRef] = probe - .receiveWhile(2.seconds) { - case StreamSupervisor.Children(children) => children - } - .flatten - println("--- Stream actors debug dump ---") - if (children.isEmpty) println("Stream is completed. No debug information is available") - else { - println("Stream actors alive: " + children) - Future - .sequence(children.map(MaterializerState.requestFromChild)) - .foreach(snapshots => - snapshots.foreach(s => - pekko.stream.testkit.scaladsl.StreamTestKit.snapshotString(s.asInstanceOf[StreamSnapshotImpl]))) + Materializer(_system) match { + case impl: PhasedFusingActorMaterializer => + implicit val ec = impl.system.dispatcher + println("--- Stream actors debug dump (only works for tests using system materializer) ---") + printDebugDump(impl.supervisor) + println("--- Stream actors debug dump end ---") + stopAllChildren(impl.system, impl.supervisor) + case _ => } failed case other => Materializer(_system) match { case impl: PhasedFusingActorMaterializer => + // Note that this is different from assertAllStages stopped since it tries to + // *kill* all streams first, before checking if any is stuck. It also does not + // work for tests starting their own materializers. stopAllChildren(impl.system, impl.supervisor) val result = test.apply() assertNoChildren(impl.system, impl.supervisor, diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala index 8dfd06209c9..30e208af68d 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala @@ -32,14 +32,13 @@ import pekko.stream.scaladsl.{ FileIO, Keep, Source } import pekko.stream.testkit._ import pekko.stream.testkit.Utils._ import pekko.util.ByteString +import pekko.stream.SystemMaterializer import org.scalatest.concurrent.ScalaFutures @nowarn class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures { - val settings = ActorMaterializerSettings(system).withDispatcher("pekko.actor.default-dispatcher") - implicit val materializer: Materializer = ActorMaterializer(settings) val fs = Jimfs.newFileSystem("FileSinkSpec", Configuration.unix()) val TestLines = { @@ -171,7 +170,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures targetFile { f => val forever = Source.maybe.toMat(FileIO.toPath(f))(Keep.left).run() try { - materializer + SystemMaterializer(system).materializer .asInstanceOf[PhasedFusingActorMaterializer] .supervisor .tell(StreamSupervisor.GetChildren, testActor) @@ -195,7 +194,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures Keep.left) .run() try { - materializer + SystemMaterializer(system).materializer .asInstanceOf[PhasedFusingActorMaterializer] .supervisor .tell(StreamSupervisor.GetChildren, testActor) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala index 08724f9b0a7..8f6a417568c 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala @@ -24,7 +24,6 @@ import scala.concurrent.duration._ import com.google.common.jimfs.{ Configuration, Jimfs } import org.apache.pekko -import pekko.actor.ActorSystem import pekko.stream._ import pekko.stream.IOResult._ import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } @@ -35,6 +34,7 @@ import pekko.stream.testkit._ import pekko.stream.testkit.Utils._ import pekko.stream.testkit.scaladsl.TestSink import pekko.util.ByteString +import pekko.stream.SystemMaterializer object FileSourceSpec { final case class Settings(chunkSize: Int, readAhead: Int) @@ -43,9 +43,6 @@ object FileSourceSpec { @nowarn class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { - val settings = ActorMaterializerSettings(system).withDispatcher("pekko.actor.default-dispatcher") - implicit val materializer: Materializer = ActorMaterializer(settings) - val fs = Jimfs.newFileSystem("FileSourceSpec", Configuration.unix()) val TestText = { @@ -261,39 +258,30 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { } "use dedicated blocking-io-dispatcher by default" in { - val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) - val materializer = ActorMaterializer()(sys) - try { - val p = FileIO.fromPath(manyLines).runWith(TestSink())(materializer) - - materializer - .asInstanceOf[PhasedFusingActorMaterializer] - .supervisor - .tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get - try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher) - finally p.cancel() - } finally shutdown(sys) + val p = FileIO.fromPath(manyLines).runWith(TestSink()) + + SystemMaterializer(system).materializer + .asInstanceOf[PhasedFusingActorMaterializer] + .supervisor + .tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get + try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher) + finally p.cancel() } "allow overriding the dispatcher using Attributes" in { - val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) - val materializer = ActorMaterializer()(sys) - - try { - val p = FileIO - .fromPath(manyLines) - .addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher")) - .runWith(TestSink())(materializer) - - materializer - .asInstanceOf[PhasedFusingActorMaterializer] - .supervisor - .tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get - try assertDispatcher(ref, "pekko.actor.default-dispatcher") - finally p.cancel() - } finally shutdown(sys) + val p = FileIO + .fromPath(manyLines) + .addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher")) + .runWith(TestSink()) + + SystemMaterializer(system).materializer + .asInstanceOf[PhasedFusingActorMaterializer] + .supervisor + .tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get + try assertDispatcher(ref, "pekko.actor.default-dispatcher") + finally p.cancel() } "not signal onComplete more than once" in { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala index ac261e039ce..07a42be8d84 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala @@ -26,6 +26,7 @@ import pekko.Done import pekko.stream.ActorAttributes import pekko.stream.Materializer import pekko.stream.Supervision +import pekko.stream.SystemMaterializer import pekko.stream.impl.PhasedFusingActorMaterializer import pekko.stream.impl.StreamSupervisor import pekko.stream.impl.StreamSupervisor.Children @@ -325,9 +326,6 @@ class UnfoldResourceAsyncSourceSpec extends StreamSpec(UnboundedMailboxConfig) { } "use dedicated blocking-io-dispatcher by default" in { - // use a separate materializer to ensure we know what child is our stream - implicit val materializer = Materializer(system) - Source .unfoldResourceAsync[String, Unit]( () => Promise[Unit]().future, // never complete @@ -335,7 +333,8 @@ class UnfoldResourceAsyncSourceSpec extends StreamSpec(UnboundedMailboxConfig) { _ => ???) .runWith(Sink.ignore) - materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(StreamSupervisor.GetChildren, testActor) + SystemMaterializer(system).materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell( + StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSourceAsync").get assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher) }