diff --git a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala index 6032e57c8f6..43c458f52d1 100644 --- a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala +++ b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala @@ -15,17 +15,14 @@ package org.apache.pekko.stream.testkit import java.util.concurrent.TimeUnit -import scala.concurrent.Future -import scala.concurrent.duration._ +import scala.concurrent.duration.FiniteDuration import org.apache.pekko -import pekko.actor.{ ActorRef, ActorSystem } +import pekko.actor.ActorSystem import pekko.stream.Materializer import pekko.stream.impl.PhasedFusingActorMaterializer -import pekko.stream.impl.StreamSupervisor -import pekko.stream.snapshot.{ MaterializerState, StreamSnapshotImpl } -import pekko.stream.testkit.scaladsl.StreamTestKit.{ assertNoChildren, stopAllChildren } -import pekko.testkit.{ PekkoSpec, TestProbe } +import pekko.stream.testkit.scaladsl.StreamTestKit.{ assertNoChildren, printDebugDump, stopAllChildren } +import pekko.testkit.PekkoSpec import pekko.testkit.TestKitUtils import org.scalatest.Failed @@ -49,31 +46,22 @@ abstract class StreamSpec(_system: ActorSystem) extends PekkoSpec(_system) with override def withFixture(test: NoArgTest) = { super.withFixture(test) match { case failed: Failed => - implicit val ec = system.dispatcher - val probe = TestProbe()(system) - // FIXME I don't think it always runs under /user anymore (typed) - // FIXME correction - I'm not sure this works at _all_ - supposed to dump stream state if test fails - val streamSupervisors = system.actorSelection("/user/" + StreamSupervisor.baseName + "*") - streamSupervisors.tell(StreamSupervisor.GetChildren, probe.ref) - val children: Seq[ActorRef] = probe - .receiveWhile(2.seconds) { - case StreamSupervisor.Children(children) => children - } - .flatten - println("--- Stream actors debug dump ---") - if (children.isEmpty) println("Stream is completed. No debug information is available") - else { - println("Stream actors alive: " + children) - Future - .sequence(children.map(MaterializerState.requestFromChild)) - .foreach(snapshots => - snapshots.foreach(s => - pekko.stream.testkit.scaladsl.StreamTestKit.snapshotString(s.asInstanceOf[StreamSnapshotImpl]))) + Materializer(_system) match { + case impl: PhasedFusingActorMaterializer => + implicit val ec = impl.system.dispatcher + println("--- Stream actors debug dump (only works for tests using system materializer) ---") + printDebugDump(impl.supervisor) + println("--- Stream actors debug dump end ---") + stopAllChildren(impl.system, impl.supervisor) + case _ => } failed case other => Materializer(_system) match { case impl: PhasedFusingActorMaterializer => + // Note that this is different from assertAllStages stopped since it tries to + // *kill* all streams first, before checking if any is stuck. It also does not + // work for tests starting their own materializers. stopAllChildren(impl.system, impl.supervisor) val result = test.apply() assertNoChildren(impl.system, impl.supervisor, diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala index 8dfd06209c9..30e208af68d 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala @@ -32,14 +32,13 @@ import pekko.stream.scaladsl.{ FileIO, Keep, Source } import pekko.stream.testkit._ import pekko.stream.testkit.Utils._ import pekko.util.ByteString +import pekko.stream.SystemMaterializer import org.scalatest.concurrent.ScalaFutures @nowarn class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures { - val settings = ActorMaterializerSettings(system).withDispatcher("pekko.actor.default-dispatcher") - implicit val materializer: Materializer = ActorMaterializer(settings) val fs = Jimfs.newFileSystem("FileSinkSpec", Configuration.unix()) val TestLines = { @@ -171,7 +170,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures targetFile { f => val forever = Source.maybe.toMat(FileIO.toPath(f))(Keep.left).run() try { - materializer + SystemMaterializer(system).materializer .asInstanceOf[PhasedFusingActorMaterializer] .supervisor .tell(StreamSupervisor.GetChildren, testActor) @@ -195,7 +194,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures Keep.left) .run() try { - materializer + SystemMaterializer(system).materializer .asInstanceOf[PhasedFusingActorMaterializer] .supervisor .tell(StreamSupervisor.GetChildren, testActor) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala index 08724f9b0a7..8f6a417568c 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala @@ -24,7 +24,6 @@ import scala.concurrent.duration._ import com.google.common.jimfs.{ Configuration, Jimfs } import org.apache.pekko -import pekko.actor.ActorSystem import pekko.stream._ import pekko.stream.IOResult._ import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } @@ -35,6 +34,7 @@ import pekko.stream.testkit._ import pekko.stream.testkit.Utils._ import pekko.stream.testkit.scaladsl.TestSink import pekko.util.ByteString +import pekko.stream.SystemMaterializer object FileSourceSpec { final case class Settings(chunkSize: Int, readAhead: Int) @@ -43,9 +43,6 @@ object FileSourceSpec { @nowarn class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { - val settings = ActorMaterializerSettings(system).withDispatcher("pekko.actor.default-dispatcher") - implicit val materializer: Materializer = ActorMaterializer(settings) - val fs = Jimfs.newFileSystem("FileSourceSpec", Configuration.unix()) val TestText = { @@ -261,39 +258,30 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { } "use dedicated blocking-io-dispatcher by default" in { - val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) - val materializer = ActorMaterializer()(sys) - try { - val p = FileIO.fromPath(manyLines).runWith(TestSink())(materializer) - - materializer - .asInstanceOf[PhasedFusingActorMaterializer] - .supervisor - .tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get - try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher) - finally p.cancel() - } finally shutdown(sys) + val p = FileIO.fromPath(manyLines).runWith(TestSink()) + + SystemMaterializer(system).materializer + .asInstanceOf[PhasedFusingActorMaterializer] + .supervisor + .tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get + try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher) + finally p.cancel() } "allow overriding the dispatcher using Attributes" in { - val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) - val materializer = ActorMaterializer()(sys) - - try { - val p = FileIO - .fromPath(manyLines) - .addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher")) - .runWith(TestSink())(materializer) - - materializer - .asInstanceOf[PhasedFusingActorMaterializer] - .supervisor - .tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get - try assertDispatcher(ref, "pekko.actor.default-dispatcher") - finally p.cancel() - } finally shutdown(sys) + val p = FileIO + .fromPath(manyLines) + .addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher")) + .runWith(TestSink()) + + SystemMaterializer(system).materializer + .asInstanceOf[PhasedFusingActorMaterializer] + .supervisor + .tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get + try assertDispatcher(ref, "pekko.actor.default-dispatcher") + finally p.cancel() } "not signal onComplete more than once" in { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala index ac261e039ce..07a42be8d84 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala @@ -26,6 +26,7 @@ import pekko.Done import pekko.stream.ActorAttributes import pekko.stream.Materializer import pekko.stream.Supervision +import pekko.stream.SystemMaterializer import pekko.stream.impl.PhasedFusingActorMaterializer import pekko.stream.impl.StreamSupervisor import pekko.stream.impl.StreamSupervisor.Children @@ -325,9 +326,6 @@ class UnfoldResourceAsyncSourceSpec extends StreamSpec(UnboundedMailboxConfig) { } "use dedicated blocking-io-dispatcher by default" in { - // use a separate materializer to ensure we know what child is our stream - implicit val materializer = Materializer(system) - Source .unfoldResourceAsync[String, Unit]( () => Promise[Unit]().future, // never complete @@ -335,7 +333,8 @@ class UnfoldResourceAsyncSourceSpec extends StreamSpec(UnboundedMailboxConfig) { _ => ???) .runWith(Sink.ignore) - materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(StreamSupervisor.GetChildren, testActor) + SystemMaterializer(system).materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell( + StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSourceAsync").get assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher) }