Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@ package org.apache.pekko.stream.testkit

import java.util.concurrent.TimeUnit

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration

import org.apache.pekko
import pekko.actor.{ ActorRef, ActorSystem }
import pekko.actor.ActorSystem
import pekko.stream.Materializer
import pekko.stream.impl.PhasedFusingActorMaterializer
import pekko.stream.impl.StreamSupervisor
import pekko.stream.snapshot.{ MaterializerState, StreamSnapshotImpl }
import pekko.stream.testkit.scaladsl.StreamTestKit.{ assertNoChildren, stopAllChildren }
import pekko.testkit.{ PekkoSpec, TestProbe }
import pekko.stream.testkit.scaladsl.StreamTestKit.{ assertNoChildren, printDebugDump, stopAllChildren }
import pekko.testkit.PekkoSpec
import pekko.testkit.TestKitUtils

import org.scalatest.Failed
Expand All @@ -49,31 +46,22 @@ abstract class StreamSpec(_system: ActorSystem) extends PekkoSpec(_system) with
override def withFixture(test: NoArgTest) = {
super.withFixture(test) match {
case failed: Failed =>
implicit val ec = system.dispatcher
val probe = TestProbe()(system)
// FIXME I don't think it always runs under /user anymore (typed)
// FIXME correction - I'm not sure this works at _all_ - supposed to dump stream state if test fails
val streamSupervisors = system.actorSelection("/user/" + StreamSupervisor.baseName + "*")
streamSupervisors.tell(StreamSupervisor.GetChildren, probe.ref)
val children: Seq[ActorRef] = probe
.receiveWhile(2.seconds) {
case StreamSupervisor.Children(children) => children
}
.flatten
println("--- Stream actors debug dump ---")
if (children.isEmpty) println("Stream is completed. No debug information is available")
else {
println("Stream actors alive: " + children)
Future
.sequence(children.map(MaterializerState.requestFromChild))
.foreach(snapshots =>
snapshots.foreach(s =>
pekko.stream.testkit.scaladsl.StreamTestKit.snapshotString(s.asInstanceOf[StreamSnapshotImpl])))
Materializer(_system) match {
case impl: PhasedFusingActorMaterializer =>
implicit val ec = impl.system.dispatcher
println("--- Stream actors debug dump (only works for tests using system materializer) ---")
printDebugDump(impl.supervisor)
println("--- Stream actors debug dump end ---")
stopAllChildren(impl.system, impl.supervisor)
case _ =>
}
failed
case other =>
Materializer(_system) match {
case impl: PhasedFusingActorMaterializer =>
// Note that this is different from assertAllStages stopped since it tries to
// *kill* all streams first, before checking if any is stuck. It also does not
// work for tests starting their own materializers.
stopAllChildren(impl.system, impl.supervisor)
val result = test.apply()
assertNoChildren(impl.system, impl.supervisor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ import pekko.stream.scaladsl.{ FileIO, Keep, Source }
import pekko.stream.testkit._
import pekko.stream.testkit.Utils._
import pekko.util.ByteString
import pekko.stream.SystemMaterializer

import org.scalatest.concurrent.ScalaFutures

@nowarn
class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures {

val settings = ActorMaterializerSettings(system).withDispatcher("pekko.actor.default-dispatcher")
implicit val materializer: Materializer = ActorMaterializer(settings)
val fs = Jimfs.newFileSystem("FileSinkSpec", Configuration.unix())

val TestLines = {
Expand Down Expand Up @@ -171,7 +170,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures
targetFile { f =>
val forever = Source.maybe.toMat(FileIO.toPath(f))(Keep.left).run()
try {
materializer
SystemMaterializer(system).materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
Expand All @@ -195,7 +194,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures
Keep.left)
.run()
try {
materializer
SystemMaterializer(system).materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.concurrent.duration._
import com.google.common.jimfs.{ Configuration, Jimfs }

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream._
import pekko.stream.IOResult._
import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
Expand All @@ -35,6 +34,7 @@ import pekko.stream.testkit._
import pekko.stream.testkit.Utils._
import pekko.stream.testkit.scaladsl.TestSink
import pekko.util.ByteString
import pekko.stream.SystemMaterializer

object FileSourceSpec {
final case class Settings(chunkSize: Int, readAhead: Int)
Expand All @@ -43,9 +43,6 @@ object FileSourceSpec {
@nowarn
class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {

val settings = ActorMaterializerSettings(system).withDispatcher("pekko.actor.default-dispatcher")
implicit val materializer: Materializer = ActorMaterializer(settings)

val fs = Jimfs.newFileSystem("FileSourceSpec", Configuration.unix())

val TestText = {
Expand Down Expand Up @@ -261,39 +258,30 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
}

"use dedicated blocking-io-dispatcher by default" in {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
try {
val p = FileIO.fromPath(manyLines).runWith(TestSink())(materializer)

materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
finally p.cancel()
} finally shutdown(sys)
val p = FileIO.fromPath(manyLines).runWith(TestSink())

SystemMaterializer(system).materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
finally p.cancel()
}

"allow overriding the dispatcher using Attributes" in {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)

try {
val p = FileIO
.fromPath(manyLines)
.addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher"))
.runWith(TestSink())(materializer)

materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
try assertDispatcher(ref, "pekko.actor.default-dispatcher")
finally p.cancel()
} finally shutdown(sys)
val p = FileIO
.fromPath(manyLines)
.addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher"))
.runWith(TestSink())

SystemMaterializer(system).materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
try assertDispatcher(ref, "pekko.actor.default-dispatcher")
finally p.cancel()
}

"not signal onComplete more than once" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import pekko.Done
import pekko.stream.ActorAttributes
import pekko.stream.Materializer
import pekko.stream.Supervision
import pekko.stream.SystemMaterializer
import pekko.stream.impl.PhasedFusingActorMaterializer
import pekko.stream.impl.StreamSupervisor
import pekko.stream.impl.StreamSupervisor.Children
Expand Down Expand Up @@ -325,17 +326,15 @@ class UnfoldResourceAsyncSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
}

"use dedicated blocking-io-dispatcher by default" in {
// use a separate materializer to ensure we know what child is our stream
implicit val materializer = Materializer(system)

Source
.unfoldResourceAsync[String, Unit](
() => Promise[Unit]().future, // never complete
_ => ???,
_ => ???)
.runWith(Sink.ignore)

materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(StreamSupervisor.GetChildren, testActor)
SystemMaterializer(system).materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(
StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSourceAsync").get
assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
}
Expand Down
Loading