diff --git a/core/js/src/main/scala/cats/effect/IOApp.scala b/core/js/src/main/scala/cats/effect/IOApp.scala index 878d40281e..c838720be1 100644 --- a/core/js/src/main/scala/cats/effect/IOApp.scala +++ b/core/js/src/main/scala/cats/effect/IOApp.scala @@ -143,9 +143,7 @@ import scala.util.Try * @see * [[IOApp.Simple]] */ -trait IOApp { - - private[this] var _runtime: unsafe.IORuntime = null +trait IOApp extends IOAppPlatform { /** * The runtime which will be used by `IOApp` to evaluate the [[IO]] produced by the `run` @@ -160,7 +158,7 @@ trait IOApp { * * This value is guaranteed to be equal to [[unsafe.IORuntime.global]]. */ - protected def runtime: unsafe.IORuntime = _runtime + protected def runtime: unsafe.IORuntime = installedRuntime /** * The configuration used to initialize the [[runtime]] which will evaluate the [[IO]] @@ -202,34 +200,7 @@ trait IOApp { def run(args: List[String]): IO[ExitCode] final def main(args: Array[String]): Unit = { - val installed = if (runtime == null) { - import unsafe.IORuntime - - val installed = IORuntime installGlobal { - val compute = IORuntime.createBatchingMacrotaskExecutor(reportFailure = t => - reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime)) - - IORuntime( - compute, - compute, - IORuntime.defaultScheduler, - () => IORuntime.resetGlobal(), - runtimeConfig) - } - - _runtime = IORuntime.global - - installed - } else { - unsafe.IORuntime.installGlobal(runtime) - } - - if (!installed) { - System - .err - .println( - "WARNING: Cats Effect global runtime already initialized; custom configurations will be ignored") - } + setupGlobalRuntime() if (LinkingInfo.developmentMode && isStackTracing) { val listener: js.Function0[Unit] = () => diff --git a/core/js/src/main/scala/cats/effect/IOAppPlatform.scala b/core/js/src/main/scala/cats/effect/IOAppPlatform.scala new file mode 100644 index 0000000000..20551b1ab7 --- /dev/null +++ b/core/js/src/main/scala/cats/effect/IOAppPlatform.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import cats.effect.unsafe.IORuntime + +trait IOAppPlatform extends IOAppCommon { + this: IOApp => + + private[effect] def defaultGlobalRuntime: IORuntime = { + val compute = IORuntime.createBatchingMacrotaskExecutor(reportFailure = t => + reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime)) + + IORuntime( + compute, + compute, + IORuntime.defaultScheduler, + () => IORuntime.resetGlobal(), + runtimeConfig) + } +} diff --git a/core/jvm-native/src/main/scala/cats/effect/IOAppMultiThreaded.scala b/core/jvm-native/src/main/scala/cats/effect/IOAppMultiThreaded.scala new file mode 100644 index 0000000000..f9cdb74408 --- /dev/null +++ b/core/jvm-native/src/main/scala/cats/effect/IOAppMultiThreaded.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import scala.concurrent.ExecutionContext + +import java.util.concurrent.ArrayBlockingQueue + +trait IOAppMultiThreaded { + // arbitrary constant is arbitrary + private[effect] lazy val queue = new ArrayBlockingQueue[AnyRef](32) + + private[effect] def handleTerminalFailure(t: Throwable): Unit = { + queue.clear() + queue.put(t) + } + + private[effect] def defaultMainThread: ExecutionContext +} diff --git a/core/jvm/src/main/scala/cats/effect/IOApp.scala b/core/jvm/src/main/scala/cats/effect/IOApp.scala index 0ae9bc9df5..0ad165df8a 100644 --- a/core/jvm/src/main/scala/cats/effect/IOApp.scala +++ b/core/jvm/src/main/scala/cats/effect/IOApp.scala @@ -25,7 +25,7 @@ import cats.syntax.all._ import scala.concurrent.{blocking, CancellationException, ExecutionContext} import scala.concurrent.duration._ -import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch} +import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger /** @@ -140,9 +140,7 @@ import java.util.concurrent.atomic.AtomicInteger * @see * [[IOApp.Simple]] */ -trait IOApp { - - private[this] var _runtime: unsafe.IORuntime = null +trait IOApp extends IOAppPlatform { /** * The runtime which will be used by `IOApp` to evaluate the [[IO]] produced by the `run` @@ -157,7 +155,7 @@ trait IOApp { * * This value is guaranteed to be equal to [[unsafe.IORuntime.global]]. */ - protected def runtime: unsafe.IORuntime = _runtime + protected def runtime: unsafe.IORuntime = installedRuntime /** * The configuration used to initialize the [[runtime]] which will evaluate the [[IO]] @@ -198,14 +196,6 @@ trait IOApp { protected def computeWorkerThreadCount: Int = Math.max(2, Runtime.getRuntime().availableProcessors()) - // arbitrary constant is arbitrary - private[this] lazy val queue = new ArrayBlockingQueue[AnyRef](32) - - private[this] def handleTerminalFailure(t: Throwable): Unit = { - queue.clear() - queue.put(t) - } - /** * Executes the provided actions on the JVM's `main` thread. Note that this is, by definition, * a single-threaded executor, and should not be used for anything which requires a meaningful @@ -221,27 +211,7 @@ trait IOApp { * calling thread (for example, LWJGL). In these scenarios, it is recommended that the * absolute minimum possible amount of work is handed off to the main thread. */ - protected def MainThread: ExecutionContext = - if (queue eq queue) - new ExecutionContext { - def reportFailure(t: Throwable): Unit = - t match { - case t if UnsafeNonFatal(t) => - IOApp.this.reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime) - - case t => - handleTerminalFailure(t) - } - - def execute(r: Runnable): Unit = - if (!queue.offer(r)) { - runtime.blocking.execute(() => queue.put(r)) - } - } - else - throw new UnsupportedOperationException( - "Your IOApp's super class has not been recompiled against Cats Effect 3.4.0+." - ) + protected def MainThread: ExecutionContext = defaultMainThread /** * Configures the action to perform when unhandled errors are caught by the runtime. An @@ -395,52 +365,7 @@ trait IOApp { val isForked = Thread.currentThread().getId() == 1 if (!isForked) onNonMainThreadDetected() - val installed = if (runtime == null) { - import unsafe.IORuntime - - val installed = IORuntime installGlobal { - val (compute, poller, compDown) = - IORuntime.createWorkStealingComputeThreadPool( - threads = computeWorkerThreadCount, - reportFailure = t => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime), - blockedThreadDetectionEnabled = blockedThreadDetectionEnabled, - pollingSystem = pollingSystem, - uncaughtExceptionHandler = (_, t) => handleTerminalFailure(t) - ) - - val (blocking, blockDown) = - IORuntime.createDefaultBlockingExecutionContext( - threadPrefix = "io-blocking", - reportFailure = - (t: Throwable) => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime) - ) - - IORuntime( - compute, - blocking, - compute, - List(poller), - { () => - compDown() - blockDown() - IORuntime.resetGlobal() - }, - runtimeConfig) - } - - _runtime = IORuntime.global - - installed - } else { - unsafe.IORuntime.installGlobal(runtime) - } - - if (!installed) { - System - .err - .println( - "WARNING: Cats Effect global runtime already initialized; custom configurations will be ignored") - } + setupGlobalRuntime() if (isStackTracing) { val liveFiberSnapshotSignal = sys diff --git a/core/jvm/src/main/scala/cats/effect/IOAppPlatform.scala b/core/jvm/src/main/scala/cats/effect/IOAppPlatform.scala new file mode 100644 index 0000000000..b862a5a4f2 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/IOAppPlatform.scala @@ -0,0 +1,78 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import cats.effect.unsafe.{IORuntime, UnsafeNonFatal} + +import scala.concurrent.ExecutionContext + +trait IOAppPlatform extends IOAppCommon with IOAppMultiThreaded { + this: IOApp => + + private[effect] def defaultMainThread: ExecutionContext = { + if (queue eq queue) + new ExecutionContext { + def reportFailure(t: Throwable): Unit = + t match { + case t if UnsafeNonFatal(t) => + IOAppPlatform.this.reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime) + + case t => + handleTerminalFailure(t) + } + + def execute(r: Runnable): Unit = + if (!queue.offer(r)) { + runtime.blocking.execute(() => queue.put(r)) + } + } + else + throw new UnsupportedOperationException( + "Your IOApp's super class has not been recompiled against Cats Effect 3.4.0+." + ) + } + + private[effect] def defaultGlobalRuntime: IORuntime = { + val (compute, poller, compDown) = + IORuntime.createWorkStealingComputeThreadPool( + threads = computeWorkerThreadCount, + reportFailure = t => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime), + blockedThreadDetectionEnabled = blockedThreadDetectionEnabled, + pollingSystem = pollingSystem, + uncaughtExceptionHandler = (_, t) => handleTerminalFailure(t) + ) + + val (blocking, blockDown) = + IORuntime.createDefaultBlockingExecutionContext( + threadPrefix = "io-blocking", + reportFailure = + (t: Throwable) => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime) + ) + + IORuntime( + compute, + blocking, + compute, + List(poller), + { () => + compDown() + blockDown() + IORuntime.resetGlobal() + }, + runtimeConfig) + } +} diff --git a/core/native/src/main/scala/cats/effect/IOApp.scala b/core/native/src/main/scala/cats/effect/IOApp.scala index 293db2207c..c783373e70 100644 --- a/core/native/src/main/scala/cats/effect/IOApp.scala +++ b/core/native/src/main/scala/cats/effect/IOApp.scala @@ -24,7 +24,6 @@ import cats.syntax.all._ import scala.concurrent.{blocking, CancellationException, ExecutionContext} import scala.scalanative.meta.LinktimeInfo._ -import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.atomic.AtomicInteger /** @@ -139,9 +138,7 @@ import java.util.concurrent.atomic.AtomicInteger * @see * [[IOApp.Simple]] */ -trait IOApp { - - private[this] var _runtime: unsafe.IORuntime = null +trait IOApp extends IOAppPlatform { /** * The runtime which will be used by `IOApp` to evaluate the [[IO]] produced by the `run` @@ -156,7 +153,7 @@ trait IOApp { * * This value is guaranteed to be equal to [[unsafe.IORuntime.global]]. */ - protected def runtime: unsafe.IORuntime = _runtime + protected def runtime: unsafe.IORuntime = installedRuntime /** * The configuration used to initialize the [[runtime]] which will evaluate the [[IO]] @@ -197,14 +194,6 @@ trait IOApp { protected def computeWorkerThreadCount: Int = Math.max(2, Runtime.getRuntime().availableProcessors()) - // arbitrary constant is arbitrary - private[this] lazy val queue = new ArrayBlockingQueue[AnyRef](32) - - private[this] def handleTerminalFailure(t: Throwable): Unit = { - queue.clear() - queue.put(t) - } - /** * Executes the provided actions on the main thread. Note that this is, by definition, a * single-threaded executor, and should not be used for anything which requires a meaningful @@ -220,22 +209,7 @@ trait IOApp { * calling thread (for example, LWJGL). In these scenarios, it is recommended that the * absolute minimum possible amount of work is handed off to the main thread. */ - protected def MainThread: ExecutionContext = - new ExecutionContext { - def reportFailure(t: Throwable): Unit = - t match { - case t if UnsafeNonFatal(t) => - IOApp.this.reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime) - - case t => - handleTerminalFailure(t) - } - - def execute(r: Runnable): Unit = - if (!queue.offer(r)) { - runtime.blocking.execute(() => queue.put(r)) - } - } + protected def MainThread: ExecutionContext = defaultMainThread /** * Configures the action to perform when unhandled errors are caught by the runtime. An @@ -286,51 +260,7 @@ trait IOApp { def run(args: List[String]): IO[ExitCode] final def main(args: Array[String]): Unit = { - val installed = if (runtime == null) { - import unsafe.IORuntime - - val installed = IORuntime installGlobal { - val (compute, poller, compDown) = - IORuntime.createWorkStealingComputeThreadPool( - threads = computeWorkerThreadCount, - reportFailure = t => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime), - blockedThreadDetectionEnabled = false, // TODO - pollingSystem = pollingSystem - ) - - val (blocking, blockDown) = - IORuntime.createDefaultBlockingExecutionContext( - threadPrefix = "io-blocking", - reportFailure = - (t: Throwable) => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime) - ) - - IORuntime( - compute, - blocking, - compute, - List(poller), - { () => - compDown() - blockDown() - IORuntime.resetGlobal() - }, - runtimeConfig) - } - - _runtime = IORuntime.global - - installed - } else { - unsafe.IORuntime.installGlobal(runtime) - } - - if (!installed) { - System - .err - .println( - "WARNING: Cats Effect global runtime already initialized; custom configurations will be ignored") - } + setupGlobalRuntime() val counter = new AtomicInteger(1) diff --git a/core/native/src/main/scala/cats/effect/IOAppPlatform.scala b/core/native/src/main/scala/cats/effect/IOAppPlatform.scala new file mode 100644 index 0000000000..2cf689b336 --- /dev/null +++ b/core/native/src/main/scala/cats/effect/IOAppPlatform.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import cats.effect.unsafe.{IORuntime, UnsafeNonFatal} + +import scala.concurrent.ExecutionContext + +trait IOAppPlatform extends IOAppCommon with IOAppMultiThreaded { + this: IOApp => + + private[effect] def defaultMainThread: ExecutionContext = { + new ExecutionContext { + def reportFailure(t: Throwable): Unit = + t match { + case t if UnsafeNonFatal(t) => + IOAppPlatform.this.reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime) + + case t => + handleTerminalFailure(t) + } + + def execute(r: Runnable): Unit = + if (!queue.offer(r)) { + runtime.blocking.execute(() => queue.put(r)) + } + } + } + + private[effect] def defaultGlobalRuntime: IORuntime = { + val (compute, poller, compDown) = + IORuntime.createWorkStealingComputeThreadPool( + threads = computeWorkerThreadCount, + reportFailure = t => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime), + blockedThreadDetectionEnabled = false, // TODO + pollingSystem = pollingSystem + ) + + val (blocking, blockDown) = + IORuntime.createDefaultBlockingExecutionContext( + threadPrefix = "io-blocking", + reportFailure = + (t: Throwable) => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime) + ) + + IORuntime( + compute, + blocking, + compute, + List(poller), + { () => + compDown() + blockDown() + IORuntime.resetGlobal() + }, + runtimeConfig) + } +} diff --git a/core/shared/src/main/scala/cats/effect/IOAppCommon.scala b/core/shared/src/main/scala/cats/effect/IOAppCommon.scala new file mode 100644 index 0000000000..1453516d45 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/IOAppCommon.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +trait IOAppCommon { + this: IOApp => + + private[this] var _runtime: unsafe.IORuntime = null + + private[effect] def installedRuntime: unsafe.IORuntime = _runtime + + private[effect] def setupGlobalRuntime(): Unit = { + val installed = if (runtime == null) { + import unsafe.IORuntime + + val installed = IORuntime installGlobal defaultGlobalRuntime + + _runtime = IORuntime.global + + installed + } else { + unsafe.IORuntime.installGlobal(runtime) + } + + if (!installed) { + System + .err + .println( + "WARNING: Cats Effect global runtime already initialized; custom configurations will be ignored") + } + } +}