From e6b3324325d4e9c04bd683a51a60c4a2c4f278cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 15:35:55 -0500 Subject: [PATCH 01/17] Add AsyncSubscriber --- .../effect/std/flow/AsyncSubscriber.scala | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala diff --git a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala b/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala new file mode 100644 index 0000000000..fc08c6de6e --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std.flow + +import java.util.Objects.requireNonNull +import java.util.concurrent.Flow.{Subscriber, Subscription} + +/** + * Implementation of a [[Subscriber]]. + * + * This is used to obtain an effect from an upstream reactive-streams system. + * + * @see + * [[https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code]] + */ +private[flow] final class AsyncSubscriber[F[_], A]( + cb: Either[Throwable, Option[A]] => Unit +) extends Subscriber[A] { + private var subscription: Subscription = null + + /** + * Receives a subscription from the upstream reactive-streams system. + */ + override def onSubscribe(sub: Subscription): Unit = { + requireNonNull( + subscription, + "The subscription provided to onSubscribe must not be null" + ) + // When subscribed, + // we store the subscription and request a single element. + subscription = sub + subscription.request(1L) + } + + /** + * Receives the next record from the upstream reactive-streams system. + */ + override def onNext(a: A): Unit = { + requireNonNull( + a, + "The element provided to onNext must not be null" + ) + // When the element is received, + // we attempt to complete the callback with it and then cancel the upstream system. + cb(Right(Some(a))) + subscription.cancel() + } + + /** + * Called by the upstream reactive-streams system when it fails. + */ + override def onError(ex: Throwable): Unit = { + requireNonNull( + ex, + "The throwable provided to onError must not be null" + ) + // When an error is received, + // we attempt to complete the callback with it and then cancel the upstream system. + cb(Left(ex)) + subscription.cancel() + } + + /** + * Called by the upstream reactive-streams system when it has finished sending records. + */ + override def onComplete(): Unit = { + // When a completion signal is received, + // we attempt to complete the callback with a None. + cb(Right(None)) + } +} From 29d7a10fa1866eef0d2fa1d2c69cc66332676adb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 15:42:39 -0500 Subject: [PATCH 02/17] Add std.flow package --- .../scala/cats/effect/std/flow/package.scala | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 std/shared/src/main/scala/cats/effect/std/flow/package.scala diff --git a/std/shared/src/main/scala/cats/effect/std/flow/package.scala b/std/shared/src/main/scala/cats/effect/std/flow/package.scala new file mode 100644 index 0000000000..5b613f527a --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/flow/package.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +/** + * Implementation of the reactive-streams protocol for cats-effect; based on Java Flow. + * + * @example + * {{{ + * import cats.effect.{IO, Resource} + * import std.flow.syntax._ scala> + * import java.util.concurrent.Flow.Publisher + * import cats.effect.unsafe.implicits.global + * val upstream: IO[Int] = IO.pure(1) + * val publisher: Resource[IO, Publisher[Int]] = upstream.toPublisher + * val downstream: IO[Int] = publisher.use(_.toEffect[IO]) + * downstream.unsafeRunSync() + * // res0: Int = 1 + * }}} + * + * @see + * [[java.util.concurrent.Flow]] + */ +package object flow {} From 1d836df9b99dda228112f2820723cbae17c34890 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 15:54:44 -0500 Subject: [PATCH 03/17] Add the fundamental std.flow.fromPublisher function --- .../scala/cats/effect/std/flow/package.scala | 53 ++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/std/shared/src/main/scala/cats/effect/std/flow/package.scala b/std/shared/src/main/scala/cats/effect/std/flow/package.scala index 5b613f527a..8480928b91 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/package.scala +++ b/std/shared/src/main/scala/cats/effect/std/flow/package.scala @@ -16,6 +16,11 @@ package cats.effect.std +import cats.effect.kernel.Async +import cats.syntax.all._ + +import java.util.concurrent.Flow.Subscriber + /** * Implementation of the reactive-streams protocol for cats-effect; based on Java Flow. * @@ -25,6 +30,7 @@ package cats.effect.std * import std.flow.syntax._ scala> * import java.util.concurrent.Flow.Publisher * import cats.effect.unsafe.implicits.global + * * val upstream: IO[Int] = IO.pure(1) * val publisher: Resource[IO, Publisher[Int]] = upstream.toPublisher * val downstream: IO[Int] = publisher.use(_.toEffect[IO]) @@ -35,4 +41,49 @@ package cats.effect.std * @see * [[java.util.concurrent.Flow]] */ -package object flow {} +package object flow { + + /** + * Creates an effect from a `subscribe` function; analogous to a `Publisher`, but effectual. + * + * This function is useful when you actually need to provide a subscriber to a third-party. + * + * @example + * {{{ + * import cats.effect.IO + * import java.util.concurrent.Flow.{Publisher, Subscriber} + * + * def thirdPartyLibrary(subscriber: Subscriber[Int]): Unit = { + * val somePublisher: Publisher[Int] = ??? + * somePublisher.subscribe(subscriber) + * } + * + * // Interop with the third party library. + * cats.effect.std.flow.fromPublisher[IO, Int] { subscriber => + * IO.println("Subscribing!") >> + * IO.delay(thirdPartyLibrary(subscriber)) >> + * IO.println("Subscribed!") + * } + * res0: IO[Int] = IO(..) + * }}} + * + * @note + * The subscribe function will not be executed until the effect is run. + * + * @see + * the overload that only requires a [[Publisher]]. + * + * @param subscribe + * The effectual function that will be used to initiate the consumption process, it receives + * a [[Subscriber]] that should be used to subscribe to a [[Publisher]]. The `subscribe` + * operation must be called exactly once. + */ + def fromPublisher[F[_], A]( + subscribe: Subscriber[A] => F[Unit] + )( + implicit F: Async[F] + ): F[Option[A]] = + F.async { cb => + F.delay(new AsyncSubscriber(cb)).flatMap { subscriber => subscribe(subscriber).as(None) } + } +} From 011a7b42400524958f213e96b667b156cbf9f6a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 17:09:48 -0500 Subject: [PATCH 04/17] Properly implement AsyncSubscriber --- .../effect/std/flow/AsyncSubscriber.scala | 237 ++++++++++++++++-- .../scala/cats/effect/std/flow/package.scala | 6 +- 2 files changed, 223 insertions(+), 20 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala b/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala index fc08c6de6e..e1ed44a107 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala +++ b/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala @@ -16,8 +16,12 @@ package cats.effect.std.flow +import cats.effect.kernel.Sync + import java.util.Objects.requireNonNull import java.util.concurrent.Flow.{Subscriber, Subscription} +import java.util.concurrent.atomic.AtomicReference +import scala.util.control.NoStackTrace /** * Implementation of a [[Subscriber]]. @@ -27,23 +31,27 @@ import java.util.concurrent.Flow.{Subscriber, Subscription} * @see * [[https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code]] */ -private[flow] final class AsyncSubscriber[F[_], A]( - cb: Either[Throwable, Option[A]] => Unit +private[flow] final class AsyncSubscriber[F[_], A] private ( + cb: Either[Throwable, Option[A]] => Unit, + currentState: AtomicReference[(AsyncSubscriber.State, () => Unit)] +)( + implicit F: Sync[F] ) extends Subscriber[A] { - private var subscription: Subscription = null + import AsyncSubscriber.State._ + import AsyncSubscriber.Input._ + import AsyncSubscriber.InvalidStateException + + // Subscriber API. /** * Receives a subscription from the upstream reactive-streams system. */ - override def onSubscribe(sub: Subscription): Unit = { + override def onSubscribe(subscription: Subscription): Unit = { requireNonNull( subscription, "The subscription provided to onSubscribe must not be null" ) - // When subscribed, - // we store the subscription and request a single element. - subscription = sub - subscription.request(1L) + nextState(input = Subscribe(subscription)) } /** @@ -54,10 +62,7 @@ private[flow] final class AsyncSubscriber[F[_], A]( a, "The element provided to onNext must not be null" ) - // When the element is received, - // we attempt to complete the callback with it and then cancel the upstream system. - cb(Right(Some(a))) - subscription.cancel() + nextState(input = Next(a)) } /** @@ -68,18 +73,212 @@ private[flow] final class AsyncSubscriber[F[_], A]( ex, "The throwable provided to onError must not be null" ) - // When an error is received, - // we attempt to complete the callback with it and then cancel the upstream system. - cb(Left(ex)) - subscription.cancel() + nextState(input = Error(ex)) } /** * Called by the upstream reactive-streams system when it has finished sending records. */ override def onComplete(): Unit = { - // When a completion signal is received, - // we attempt to complete the callback with a None. - cb(Right(None)) + nextState(input = Complete) } + + // Downstream cancellation. + + /** + * Allow downstream to cancel this subscriber. + */ + val cancel: F[Unit] = + F.delay(nextState(input = Cancel)) + + // Finite state machine. + + /** + * Helper to reduce noise when creating unary functions. + */ + private def run(block: => Unit): () => Unit = () => block + + /** + * Helper to reduce noise when failing with an InvalidStateException. + */ + private def invalidState(operation: String, state: State): Unit = + cb(Left(new InvalidStateException(operation, state))) + + /** + * Runs a single step of the state machine. + */ + private def step(input: Input): State => (State, () => Unit) = + input match { + case Subscribe(s) => { + // When subscribed: + // If we are in the initial state: + // We request a single element. + // Otherwise if we are in the canceled state: + // We cancel the subscription. + // Otherwise: + // We cancel the subscription, + // and attempt to complete the callback with an InvalidStateException. + case Initial => + Waiting(s) -> run { + s.request(1L) + } + + case Canceled => + Canceled -> run { + s.cancel() + } + + case state => + Completed -> run { + s.cancel() + invalidState( + operation = "Received subscription", + state + ) + } + } + + case Next(a) => { + // When the element is received: + // If we are in the waiting state: + // We attempt to complete the callback with the element, + // and cancel the upstream system. + // Otherwise: + // We attempt to complete the callback with an InvalidStateException. + case Waiting(s) => + Completed -> run { + s.cancel() + cb(Right(Some(a.asInstanceOf[A]))) + } + + case state => + Completed -> run { + invalidState( + operation = s"Received element [${a}]", + state + ) + } + } + + case Error(ex) => { + // When an error is received: + // If we are in the waiting state: + // We attempt to complete the callback with the error. + // Otherwise: + // We attempt to complete the callback with an InvalidStateException. + case Waiting(_) => + Completed -> run { + cb(Left(ex)) + } + + case state => + Completed -> run { + invalidState( + operation = s"Received error [${ex}]", + state + ) + } + } + + case Complete => { + // When a completion signal is received: + // If we are in the waiting state: + // We attempt to complete the callback with a None, + // and cancel the upstream system. + // Otherwise: + // We attempt to complete the callback with an InvalidStateException. + case Waiting(s) => + Completed -> run { + s.cancel() + cb(Right(None)) + } + + case state => + Completed -> run { + invalidState( + operation = s"Received completion signal", + state + ) + } + } + + case Cancel => { + // When a downstream cancellation signal is received: + // If we are in the waiting state: + // We cancel the upstream system. + // Otherwise: + // We put ourselves in the Canceled state. + case Waiting(s) => + Canceled -> run { + s.cancel() + } + + case _ => + Canceled -> (() => ()) + } + } + + /** + * Runs the next step of fsm. + * + * This function is concurrent safe, because the reactive-streams specs mention that all the + * on methods are to be called sequentially. Additionally, `Dequeue` and `Next` can't never + * happen concurrently, since they are tied together. Thus, these are the only concurrent + * options and all are covered: + `Subscribe` & `Dequeue`: No matter the order in which they + * are processed, we will end with a request call and a null buffer. + `Error` & `Dequeue`: No + * matter the order in which they are processed, we will complete the callback with the error. + * + cancellation & any other thing: Worst case, we will lose some data that we not longer + * care about; and eventually reach `Terminal`. + */ + private def nextState(input: Input): Unit = { + val (_, effect) = currentState.updateAndGet { + case (state, _) => + step(input)(state) + } + // Only run the effect after the state update took place. + effect() + } +} + +object AsyncSubscriber { + def apply[F[_], A]( + cb: Either[Throwable, Option[A]] => Unit + )( + implicit F: Sync[F] + ): F[AsyncSubscriber[F, A]] = + F.delay { + new AsyncSubscriber( + cb, + currentState = new AtomicReference( + (State.Initial, () => ()) + ) + ) + } + + private sealed trait State + private object State { + type State = AsyncSubscriber.State + + case object Initial extends State + final case class Waiting(s: Subscription) extends State + case object Completed extends State + case object Canceled extends State + } + + private sealed trait Input + private object Input { + type Input = AsyncSubscriber.Input + + final case class Subscribe(s: Subscription) extends Input + final case class Next(a: Any) extends Input + final case class Error(ex: Throwable) extends Input + case object Complete extends Input + case object Cancel extends Input + } + + private final class InvalidStateException(operation: String, state: State) + extends IllegalStateException( + s"${operation} in invalid state [${state}]" + ) + with NoStackTrace } diff --git a/std/shared/src/main/scala/cats/effect/std/flow/package.scala b/std/shared/src/main/scala/cats/effect/std/flow/package.scala index 8480928b91..de37efc7ca 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/package.scala +++ b/std/shared/src/main/scala/cats/effect/std/flow/package.scala @@ -84,6 +84,10 @@ package object flow { implicit F: Async[F] ): F[Option[A]] = F.async { cb => - F.delay(new AsyncSubscriber(cb)).flatMap { subscriber => subscribe(subscriber).as(None) } + AsyncSubscriber(cb).flatMap { subscriber => + subscribe(subscriber).as( + Some(subscriber.cancel) + ) + } } } From 5653cffd86ac4e0ea423897d0f222924c6210579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 17:18:37 -0500 Subject: [PATCH 05/17] Add the simplified std.flow.fromPublisher function --- .../scala/cats/effect/std/flow/package.scala | 37 +++++++++++++++++++ .../scala/cats/effect/std/flow/syntax.scala | 32 ++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 std/shared/src/main/scala/cats/effect/std/flow/syntax.scala diff --git a/std/shared/src/main/scala/cats/effect/std/flow/package.scala b/std/shared/src/main/scala/cats/effect/std/flow/package.scala index de37efc7ca..110acb40f0 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/package.scala +++ b/std/shared/src/main/scala/cats/effect/std/flow/package.scala @@ -46,6 +46,10 @@ package object flow { /** * Creates an effect from a `subscribe` function; analogous to a `Publisher`, but effectual. * + * This effect will request a single item from the [[Publisher]] and then cancel it. The + * return value is an [[Option]] because the [[Publisher]] may complete without providing a + * single value. + * * This function is useful when you actually need to provide a subscriber to a third-party. * * @example @@ -90,4 +94,37 @@ package object flow { ) } } + + /** + * Creates an effect from a [[Publisher]]. + * + * This effect will request a single item from the [[Publisher]] and then cancel it. The + * return value is an [[Option]] because the [[Publisher]] may complete without providing a + * single value. + * + * @example + * {{{ + * import cats.effect.IO + * import java.util.concurrent.Flow.Publisher + * + * def getThirdPartyPublisher(): Publisher[Int] = ??? + * + * // Interop with the third party library. + * IO.delay(getThirdPartyPublisher()).flatMap { publisher => + * cats.effect.std.flow.fromPublisher[IO](publisher) + * } + * res0: IO[Int] = IO(..) + * }}} + * + * @note + * The publisher will not receive a subscriber until the effect is run. + * + * @see + * the `toEffect` extension method added to [[Publisher]]. + * + * @param publisher + * The [[Publisher]] to consume. + */ + def fromPublisher[F[_]]: syntax.FromPublisherPartiallyApplied[F] = + new syntax.FromPublisherPartiallyApplied(dummy = true) } diff --git a/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala b/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala new file mode 100644 index 0000000000..152928dbb3 --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std.flow + +import cats.effect.kernel.Async + +import java.util.concurrent.Flow.Publisher + +object syntax { + final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal { + def apply[A]( + publisher: Publisher[A] + )( + implicit F: Async[F] + ): F[Option[A]] = + fromPublisher[F, A] { subscriber => F.delay(publisher.subscribe(subscriber)) } + } +} From 43fd5cb6a4ea45bff950ba9d9f7a72346a6b10ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 17:25:54 -0500 Subject: [PATCH 06/17] Add the Publisher.toEffect extension method to std.flow.syntax --- .../scala/cats/effect/std/flow/package.scala | 2 +- .../scala/cats/effect/std/flow/syntax.scala | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/std/shared/src/main/scala/cats/effect/std/flow/package.scala b/std/shared/src/main/scala/cats/effect/std/flow/package.scala index 110acb40f0..f44a3310ac 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/package.scala +++ b/std/shared/src/main/scala/cats/effect/std/flow/package.scala @@ -117,7 +117,7 @@ package object flow { * }}} * * @note - * The publisher will not receive a subscriber until the effect is run. + * The [[Publisher]] will not receive a [[Subscriber]] until the effect is run. * * @see * the `toEffect` extension method added to [[Publisher]]. diff --git a/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala b/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala index 152928dbb3..e9f9390208 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala +++ b/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala @@ -21,6 +21,36 @@ import cats.effect.kernel.Async import java.util.concurrent.Flow.Publisher object syntax { + implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal { + + /** + * Creates an effect from this [[Publisher]]. + * + * This effect will request a single item from the [[Publisher]] and then cancel it. The + * return value is an [[Option]] because the [[Publisher]] may complete without providing a + * single value. + * + * @example + * {{{ + * import cats.effect.IO + * import java.util.concurrent.Flow.Publisher + * + * def getThirdPartyPublisher(): Publisher[Int] = ??? + * + * // Interop with the third party library. + * IO.delay(getThirdPartyPublisher()).flatMap { publisher => + * publisher.toEffect[IO] + * } + * res0: IO[Int] = IO(..) + * }}} + * + * @note + * The [[Publisher]] will not receive a [[Subscriber]] until the effect is run. + */ + def toEffect[F[_]](implicit F: Async[F]): F[Option[A]] = + fromPublisher[F](publisher) + } + final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal { def apply[A]( publisher: Publisher[A] From 37b9d8eb7ae8c02d524f3dd6b8371d6209f79ac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 18:58:31 -0500 Subject: [PATCH 07/17] Add AsyncSubscription --- .../effect/std/flow/AsyncSubscriber.scala | 10 +- .../effect/std/flow/AsyncSubscription.scala | 140 ++++++++++++++++++ 2 files changed, 145 insertions(+), 5 deletions(-) create mode 100644 std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscription.scala diff --git a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala b/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala index e1ed44a107..a6aeb42514 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala +++ b/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala @@ -46,7 +46,7 @@ private[flow] final class AsyncSubscriber[F[_], A] private ( /** * Receives a subscription from the upstream reactive-streams system. */ - override def onSubscribe(subscription: Subscription): Unit = { + override final def onSubscribe(subscription: Subscription): Unit = { requireNonNull( subscription, "The subscription provided to onSubscribe must not be null" @@ -57,7 +57,7 @@ private[flow] final class AsyncSubscriber[F[_], A] private ( /** * Receives the next record from the upstream reactive-streams system. */ - override def onNext(a: A): Unit = { + override final def onNext(a: A): Unit = { requireNonNull( a, "The element provided to onNext must not be null" @@ -68,7 +68,7 @@ private[flow] final class AsyncSubscriber[F[_], A] private ( /** * Called by the upstream reactive-streams system when it fails. */ - override def onError(ex: Throwable): Unit = { + override final def onError(ex: Throwable): Unit = { requireNonNull( ex, "The throwable provided to onError must not be null" @@ -79,7 +79,7 @@ private[flow] final class AsyncSubscriber[F[_], A] private ( /** * Called by the upstream reactive-streams system when it has finished sending records. */ - override def onComplete(): Unit = { + override final def onComplete(): Unit = { nextState(input = Complete) } @@ -240,7 +240,7 @@ private[flow] final class AsyncSubscriber[F[_], A] private ( } } -object AsyncSubscriber { +private[flow] object AsyncSubscriber { def apply[F[_], A]( cb: Either[Throwable, Option[A]] => Unit )( diff --git a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscription.scala b/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscription.scala new file mode 100644 index 0000000000..e42e6436d4 --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscription.scala @@ -0,0 +1,140 @@ +package cats.effect.std.flow + +import cats.effect.kernel.{Async, Outcome} +import cats.effect.kernel.syntax.all._ +import cats.syntax.all._ + +import java.util.concurrent.CancellationException +import java.util.concurrent.Flow.{Subscription, Subscriber} +import java.util.concurrent.atomic.AtomicReference + +/** + * Implementation of a [[Subscription]]. + * + * This is used by the [[AsyncPublisher]] to send the result of an effect to a downstream + * reactive-streams system. + * + * @see + * [[https://github.com/reactive-streams/reactive-streams-jvm#3-subscription-code]] + */ +private[flow] final class AsyncSubscription[F[_], A] private ( + fa: F[A], + subscriber: Subscriber[A], + start: AtomicReference[() => Unit], + canceled: AtomicReference[() => Unit] +)( + implicit F: Async[F] +) extends Subscription { + import AsyncSubscription.Sentinel + + // Ensure we are on a terminal state; i.e. call `cancel`, before signaling the subscriber. + private def onError(ex: Throwable): Unit = { + cancel() + subscriber.onError(ex) + } + + private def onComplete(): Unit = { + cancel() + subscriber.onComplete() + } + + val run: F[Unit] = { + val cancellation = F.asyncCheckAttempt[Unit] { cb => + F.delay { + // Check if we were already cancelled before calling run. + if (!canceled.compareAndSet(Sentinel, () => cb.apply(Either.unit))) { + Either.unit + } else { + Left(Some(F.unit)) + } + } + } + + val waitForRequest = F.async_[Unit] { cb => start.set(() => cb.apply(Either.unit)) } + + (waitForRequest >> fa) + .race(cancellation) + .guaranteeCase { + case Outcome.Succeeded(result) => + result.flatMap { + // The effect finished normally. + case Left(a) => + F.delay { + subscriber.onNext(a) + onComplete() + } + + case Right(()) => + // The effect was canceled by downstream. + F.unit + } + + case Outcome.Errored(ex) => + // The effect failed with an error. + F.delay(onError(ex)) + + case Outcome.Canceled() => + // The effect was canceled by upstream. + F.delay(onError(ex = new CancellationException("AsyncSubscription.run was canceled"))) + } + .void + } + + override final def cancel(): Unit = { + val cancelCB = canceled.getAndSet(null) + if (cancelCB ne null) { + cancelCB.apply() + } + } + + override final def request(n: Long): Unit = { + // First, confirm we are not yet cancelled. + if (canceled.get() ne null) { + // Second, ensure we were requested a positive number of elements. + if (n <= 0) { + // Otherwise, we raise an error according to the spec. + onError(ex = new IllegalArgumentException(s"Invalid number of elements [${n}]")) + } else { + // Then, we attempt to complete the start callback. + val startCB = start.getAndSet(null) + if (startCB ne null) { + startCB.apply() + } + } + } + } +} + +private[flow] object AsyncSubscription { + private final val Sentinel = () => () + + // Mostly for testing purposes. + def apply[F[_], A]( + fa: F[A], + subscriber: Subscriber[A] + )( + implicit F: Async[F] + ): F[AsyncSubscription[F, A]] = + F.delay { + val start = new AtomicReference(Sentinel) + val canceled = new AtomicReference(Sentinel) + + new AsyncSubscription( + fa, + subscriber, + start, + canceled + ) + } + + def subscribe[F[_], A]( + fa: F[A], + subscriber: Subscriber[A] + )( + implicit F: Async[F] + ): F[Unit] = + apply(fa, subscriber).flatMap { subscription => + F.delay(subscriber.onSubscribe(subscription)) >> + subscription.run + } +} From e58025e6e3caa4378b1beff596cba294ee3dac0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 19:23:29 -0500 Subject: [PATCH 08/17] Add AsyncPublisher --- .../cats/effect/std/flow/AsyncPublisher.scala | 67 +++++++++++++++++++ .../effect/std/flow/AsyncSubscription.scala | 6 +- 2 files changed, 70 insertions(+), 3 deletions(-) create mode 100644 std/shared/src/main/scala/cats/effect/std/flow/AsyncPublisher.scala diff --git a/std/shared/src/main/scala/cats/effect/std/flow/AsyncPublisher.scala b/std/shared/src/main/scala/cats/effect/std/flow/AsyncPublisher.scala new file mode 100644 index 0000000000..4533518c18 --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/flow/AsyncPublisher.scala @@ -0,0 +1,67 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std.flow + +import cats.effect.kernel.{Async, Resource} +import cats.effect.std.Dispatcher + +import java.util.Objects.requireNonNull +import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription} +import scala.util.control.NoStackTrace + +private[flow] final class AsyncPublisher[F[_], A] private ( + fa: F[A], + startDispatcher: Dispatcher[F] +)( + implicit F: Async[F] +) extends Publisher[A] { + override final def subscribe(subscriber: Subscriber[_ >: A]): Unit = { + requireNonNull( + subscriber, + "The subscriber provided to subscribe must not be null" + ) + try + startDispatcher.unsafeRunAndForget( + AsyncSubscription.subscribe(fa, subscriber) + ) + catch { + case _: IllegalStateException => + subscriber.onSubscribe(new Subscription { + override def cancel(): Unit = () + override def request(x$1: Long): Unit = () + }) + subscriber.onError(AsyncPublisher.CanceledStreamPublisherException) + } + } +} + +private[flow] object AsyncPublisher { + def apply[F[_], A]( + fa: F[A] + )( + implicit F: Async[F] + ): Resource[F, AsyncPublisher[F, A]] = + Dispatcher.parallel[F](await = false).map { startDispatcher => + new AsyncPublisher(fa, startDispatcher) + } + + private object CanceledStreamPublisherException + extends IllegalStateException( + "This StreamPublisher is not longer accepting subscribers" + ) + with NoStackTrace +} diff --git a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscription.scala b/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscription.scala index e42e6436d4..b43d6c5a51 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscription.scala +++ b/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscription.scala @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.AtomicReference */ private[flow] final class AsyncSubscription[F[_], A] private ( fa: F[A], - subscriber: Subscriber[A], + subscriber: Subscriber[_ >: A], start: AtomicReference[() => Unit], canceled: AtomicReference[() => Unit] )( @@ -111,7 +111,7 @@ private[flow] object AsyncSubscription { // Mostly for testing purposes. def apply[F[_], A]( fa: F[A], - subscriber: Subscriber[A] + subscriber: Subscriber[_ >: A] )( implicit F: Async[F] ): F[AsyncSubscription[F, A]] = @@ -129,7 +129,7 @@ private[flow] object AsyncSubscription { def subscribe[F[_], A]( fa: F[A], - subscriber: Subscriber[A] + subscriber: Subscriber[_ >: A] )( implicit F: Async[F] ): F[Unit] = From 70aff9326e0afc03b3cefc6534133b55d8fa1944 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 19:33:46 -0500 Subject: [PATCH 09/17] Add std.flow.toPublisher and subscribeEffect functions --- .../scala/cats/effect/std/flow/package.scala | 48 ++++++++++++++++++- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/flow/package.scala b/std/shared/src/main/scala/cats/effect/std/flow/package.scala index f44a3310ac..73da77c3e3 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/package.scala +++ b/std/shared/src/main/scala/cats/effect/std/flow/package.scala @@ -16,10 +16,10 @@ package cats.effect.std -import cats.effect.kernel.Async +import cats.effect.kernel.{Async, Resource} import cats.syntax.all._ -import java.util.concurrent.Flow.Subscriber +import java.util.concurrent.Flow.{Publisher, Subscriber} /** * Implementation of the reactive-streams protocol for cats-effect; based on Java Flow. @@ -127,4 +127,48 @@ package object flow { */ def fromPublisher[F[_]]: syntax.FromPublisherPartiallyApplied[F] = new syntax.FromPublisherPartiallyApplied(dummy = true) + + /** + * Creates a [[Publisher]] from an effect. + * + * The effect is only ran when elements are requested. Closing the [[Resource]] means + * gracefully shutting down all active subscriptions. Thus, no more elements will be + * published. + * + * @note + * This [[Publisher]] can be reused for multiple [[Subscribers]], each [[Subscription]] will + * re-run the effect. + * + * @see + * [[subscribeEffect]] for a simpler version that only requires a [[Subscriber]]. + * + * @param fa + * The effect to transform. + */ + def toPublisher[F[_], A]( + fa: F[A] + )( + implicit F: Async[F] + ): Resource[F, Publisher[A]] = + AsyncPublisher(fa) + + /** + * Allows subscribing a [[Subscriber]] to an effect. + * + * The returned program will run the passed effect, then send the result to the + * [[Subscriber]], and finally complete the subscription. Cancelling this program will + * gracefully cancel the subscription. + * + * @param fa + * the effect that will be consumed by the subscriber. + * @param subscriber + * the [[Subscriber]] that will receive the result of the effect. + */ + def subscribeEffect[F[_], A]( + fa: F[A], + subscriber: Subscriber[_ >: A] + )( + implicit F: Async[F] + ): F[Unit] = + AsyncSubscription.subscribe(fa, subscriber) } From def8f341a6183ee33e55975d4e04e2424721dac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 19:40:06 -0500 Subject: [PATCH 10/17] Add the fa.toPublisher and fa.subscribe extension methods to std.flow.syntax --- .../scala/cats/effect/std/flow/syntax.scala | 44 +++++++++++++++++-- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala b/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala index e9f9390208..f2ec8371bb 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala +++ b/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala @@ -14,11 +14,12 @@ * limitations under the License. */ -package cats.effect.std.flow +package cats.effect.std +package flow -import cats.effect.kernel.Async +import cats.effect.kernel.{Async, Resource} -import java.util.concurrent.Flow.Publisher +import java.util.concurrent.Flow.{Publisher, Subscriber} object syntax { implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal { @@ -51,6 +52,43 @@ object syntax { fromPublisher[F](publisher) } + implicit final class FOps[F[_], A](private val fa: F[A]) extends AnyVal { + + /** + * Creates a [[Publisher]] from this effect. + * + * The effect is only ran when elements are requested. Closing the [[Resource]] means + * gracefully shutting down all active subscriptions. Thus, no more elements will be + * published. + * + * @note + * The [[Publisher]] can be reused for multiple [[Subscribers]], each [[Subscription]] + * will re-run the effect. + * + * @see + * [[subscribe]] for a simpler version that only requires a [[Subscriber]]. + */ + def toPublisher(implicit F: Async[F]): Resource[F, Publisher[A]] = + flow.toPublisher(fa) + + /** + * Allows subscribing a [[Subscriber]] to this effect. + * + * The returned program will run this effect, then send the result to the [[Subscriber]], + * and finally complete the subscription. Cancelling this program will gracefully cancel the + * subscription. + * + * @param subscriber + * the [[Subscriber]] that will receive the result of this effect. + */ + def subscribeEffect( + subscriber: Subscriber[_ >: A] + )( + implicit F: Async[F] + ): F[Unit] = + flow.subscribeEffect(fa, subscriber) + } + final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal { def apply[A]( publisher: Publisher[A] From 168130f52c5cdedbd172b46cd1e5493b0d87e22a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 19:46:45 -0500 Subject: [PATCH 11/17] Move flow interop to core --- .../main/scala/cats/effect/interop}/flow/AsyncPublisher.scala | 2 +- .../main/scala/cats/effect/interop}/flow/AsyncSubscriber.scala | 2 +- .../scala/cats/effect/interop}/flow/AsyncSubscription.scala | 2 +- .../src/main/scala/cats/effect/interop}/flow/package.scala | 2 +- .../src/main/scala/cats/effect/interop}/flow/syntax.scala | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) rename {std/shared/src/main/scala/cats/effect/std => core/shared/src/main/scala/cats/effect/interop}/flow/AsyncPublisher.scala (98%) rename {std/shared/src/main/scala/cats/effect/std => core/shared/src/main/scala/cats/effect/interop}/flow/AsyncSubscriber.scala (99%) rename {std/shared/src/main/scala/cats/effect/std => core/shared/src/main/scala/cats/effect/interop}/flow/AsyncSubscription.scala (99%) rename {std/shared/src/main/scala/cats/effect/std => core/shared/src/main/scala/cats/effect/interop}/flow/package.scala (99%) rename {std/shared/src/main/scala/cats/effect/std => core/shared/src/main/scala/cats/effect/interop}/flow/syntax.scala (99%) diff --git a/std/shared/src/main/scala/cats/effect/std/flow/AsyncPublisher.scala b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala similarity index 98% rename from std/shared/src/main/scala/cats/effect/std/flow/AsyncPublisher.scala rename to core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala index 4533518c18..2eaf1fd6e2 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/AsyncPublisher.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package cats.effect.std.flow +package cats.effect.interop.flow import cats.effect.kernel.{Async, Resource} import cats.effect.std.Dispatcher diff --git a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscriber.scala similarity index 99% rename from std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala rename to core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscriber.scala index a6aeb42514..e1f7258a17 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscriber.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscriber.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package cats.effect.std.flow +package cats.effect.interop.flow import cats.effect.kernel.Sync diff --git a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscription.scala b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala similarity index 99% rename from std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscription.scala rename to core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala index b43d6c5a51..6010fe59ca 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/AsyncSubscription.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala @@ -1,4 +1,4 @@ -package cats.effect.std.flow +package cats.effect.interop.flow import cats.effect.kernel.{Async, Outcome} import cats.effect.kernel.syntax.all._ diff --git a/std/shared/src/main/scala/cats/effect/std/flow/package.scala b/core/shared/src/main/scala/cats/effect/interop/flow/package.scala similarity index 99% rename from std/shared/src/main/scala/cats/effect/std/flow/package.scala rename to core/shared/src/main/scala/cats/effect/interop/flow/package.scala index 73da77c3e3..3eabe178bb 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/package.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/package.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package cats.effect.std +package cats.effect.interop import cats.effect.kernel.{Async, Resource} import cats.syntax.all._ diff --git a/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala b/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala similarity index 99% rename from std/shared/src/main/scala/cats/effect/std/flow/syntax.scala rename to core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala index f2ec8371bb..94379b2cdb 100644 --- a/std/shared/src/main/scala/cats/effect/std/flow/syntax.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package cats.effect.std +package cats.effect.interop package flow import cats.effect.kernel.{Async, Resource} From b7f075308242344d6541af94eae8b09945d75e3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 19:51:53 -0500 Subject: [PATCH 12/17] Add AsyncPublisher.unsafe --- .../effect/interop/flow/AsyncPublisher.scala | 50 +++++++++++++++---- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala index 2eaf1fd6e2..12a8a182fc 100644 --- a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala @@ -16,52 +16,84 @@ package cats.effect.interop.flow +import cats.effect.IO import cats.effect.kernel.{Async, Resource} import cats.effect.std.Dispatcher +import cats.effect.unsafe.IORuntime import java.util.Objects.requireNonNull import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription} +import java.util.concurrent.RejectedExecutionException import scala.util.control.NoStackTrace -private[flow] final class AsyncPublisher[F[_], A] private ( - fa: F[A], - startDispatcher: Dispatcher[F] +private[flow] sealed abstract class AsyncPublisher[F[_], A] private ( + fa: F[A] )( implicit F: Async[F] ) extends Publisher[A] { + protected def runSubscription(subscribe: F[Unit]): Unit + override final def subscribe(subscriber: Subscriber[_ >: A]): Unit = { requireNonNull( subscriber, "The subscriber provided to subscribe must not be null" ) try - startDispatcher.unsafeRunAndForget( + runSubscription( AsyncSubscription.subscribe(fa, subscriber) ) catch { - case _: IllegalStateException => + case _: IllegalStateException | _: RejectedExecutionException => subscriber.onSubscribe(new Subscription { override def cancel(): Unit = () override def request(x$1: Long): Unit = () }) - subscriber.onError(AsyncPublisher.CanceledStreamPublisherException) + subscriber.onError(AsyncPublisher.CanceledAsyncPublisherException) } } } private[flow] object AsyncPublisher { + private final class DispatcherAsyncPublisher[F[_], A]( + fa: F[A], + startDispatcher: Dispatcher[F] + )( + implicit F: Async[F] + ) extends AsyncPublisher[F, A](fa) { + override protected final def runSubscription(subscribe: F[Unit]): Unit = { + startDispatcher.unsafeRunAndForget(subscribe) + } + } + + private final class IORuntimeAsyncPublisher[A]( + ioa: IO[A] + )( + implicit runtime: IORuntime + ) extends AsyncPublisher[IO, A](ioa) { + override protected final def runSubscription(subscribe: IO[Unit]): Unit = { + subscribe.unsafeRunAndForget() + } + } + def apply[F[_], A]( fa: F[A] )( implicit F: Async[F] ): Resource[F, AsyncPublisher[F, A]] = Dispatcher.parallel[F](await = false).map { startDispatcher => - new AsyncPublisher(fa, startDispatcher) + new DispatcherAsyncPublisher(fa, startDispatcher) } - private object CanceledStreamPublisherException + def unsafe[A]( + fa: IO[A] + )( + implicit runtime: IORuntime + ): AsyncPublisher[IO, A] = + new IORuntimeAsyncPublisher(fa) + + private object CanceledAsyncPublisherException extends IllegalStateException( - "This StreamPublisher is not longer accepting subscribers" + "This AsyncPublisher is not longer accepting subscribers" ) with NoStackTrace } From d3d0e368541ea14962e8ea15bef9dd8299faa9af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 19:56:24 -0500 Subject: [PATCH 13/17] Add the cats.effect.interop.flow.unsafeToPublisher function --- .../effect/interop/flow/AsyncPublisher.scala | 4 +-- .../cats/effect/interop/flow/package.scala | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala index 12a8a182fc..257d43cfd7 100644 --- a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala @@ -85,11 +85,11 @@ private[flow] object AsyncPublisher { } def unsafe[A]( - fa: IO[A] + ioa: IO[A] )( implicit runtime: IORuntime ): AsyncPublisher[IO, A] = - new IORuntimeAsyncPublisher(fa) + new IORuntimeAsyncPublisher(ioa) private object CanceledAsyncPublisherException extends IllegalStateException( diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/package.scala b/core/shared/src/main/scala/cats/effect/interop/flow/package.scala index 3eabe178bb..bcee0aa0b6 100644 --- a/core/shared/src/main/scala/cats/effect/interop/flow/package.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/package.scala @@ -16,7 +16,9 @@ package cats.effect.interop +import cats.effect.IO import cats.effect.kernel.{Async, Resource} +import cats.effect.unsafe.IORuntime import cats.syntax.all._ import java.util.concurrent.Flow.{Publisher, Subscriber} @@ -140,6 +142,9 @@ package object flow { * re-run the effect. * * @see + * [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]]. + * + * @see * [[subscribeEffect]] for a simpler version that only requires a [[Subscriber]]. * * @param fa @@ -152,6 +157,28 @@ package object flow { ): Resource[F, Publisher[A]] = AsyncPublisher(fa) + /** + * Creates a [[Publisher]] from an effect. + * + * The effect is only ran when elements are requested. + * + * @note + * This [[Publisher]] can be reused for multiple [[Subscribers]], each [[Subscription]] will + * re-run the effect. + * + * @see + * [[toPublisher]] for a safe version that returns a [[Resource]]. + * + * @param fa + * The effect to transform. + */ + def unsafeToPublisher[A]( + ioa: IO[A] + )( + implicit runtime: IORuntime + ): Publisher[A] = + AsyncPublisher.unsafe(ioa) + /** * Allows subscribing a [[Subscriber]] to an effect. * From eb9a0ed5a8cef22d73fa8e1163755f0e2065af26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 20:00:30 -0500 Subject: [PATCH 14/17] Add io.unsafeToPublisher() extension method to interop.flow.syntax --- .../cats/effect/interop/flow/package.scala | 10 +++++----- .../cats/effect/interop/flow/syntax.scala | 20 +++++++++++++++++++ 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/package.scala b/core/shared/src/main/scala/cats/effect/interop/flow/package.scala index bcee0aa0b6..5ca8a92ec3 100644 --- a/core/shared/src/main/scala/cats/effect/interop/flow/package.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/package.scala @@ -158,19 +158,19 @@ package object flow { AsyncPublisher(fa) /** - * Creates a [[Publisher]] from an effect. + * Creates a [[Publisher]] from an [[IO]]. * - * The effect is only ran when elements are requested. + * The [[IO]] is only ran when elements are requested. * * @note * This [[Publisher]] can be reused for multiple [[Subscribers]], each [[Subscription]] will - * re-run the effect. + * re-run the [[IO]]. * * @see * [[toPublisher]] for a safe version that returns a [[Resource]]. * - * @param fa - * The effect to transform. + * @param ioa + * The [[IO]] to transform. */ def unsafeToPublisher[A]( ioa: IO[A] diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala b/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala index 94379b2cdb..48b8871953 100644 --- a/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala @@ -17,7 +17,9 @@ package cats.effect.interop package flow +import cats.effect.IO import cats.effect.kernel.{Async, Resource} +import cats.effect.unsafe.IORuntime import java.util.concurrent.Flow.{Publisher, Subscriber} @@ -89,6 +91,24 @@ object syntax { flow.subscribeEffect(fa, subscriber) } + implicit final class IOOps[A](private val ioa: IO[A]) extends AnyVal { + + /** + * Creates a [[Publisher]] from this [[IO]]. + * + * The [[IO]] is only ran when elements are requested. + * + * @note + * The [[Publisher]] can be reused for multiple [[Subscribers]], each [[Subscription]] + * will re-run the [[IO]]. + * + * @see + * [[toPublisher]] for a safe version that returns a [[Resource]]. + */ + def unsafeToPublisher()(implicit runtime: IORuntime): Publisher[A] = + flow.unsafeToPublisher(ioa) + } + final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal { def apply[A]( publisher: Publisher[A] From 8e81c9ac65ed3dfca0df46b7be442d745b07d50c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 20:21:33 -0500 Subject: [PATCH 15/17] Add license header to AsyncSubscription.scala --- .../effect/interop/flow/AsyncSubscription.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala index 6010fe59ca..6998855a12 100644 --- a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cats.effect.interop.flow import cats.effect.kernel.{Async, Outcome} From f6e6f0a839df8a03f6630582f338e210e4b692b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Fri, 24 Nov 2023 10:38:24 -0500 Subject: [PATCH 16/17] Make AsyncSubscription.run a def --- .../scala/cats/effect/interop/flow/AsyncSubscription.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala index 6998855a12..64d9349177 100644 --- a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala @@ -54,7 +54,9 @@ private[flow] final class AsyncSubscription[F[_], A] private ( subscriber.onComplete() } - val run: F[Unit] = { + // This is a def rather than a val, because it is only used once. + // And having fields increase the instantiation cost and delay garbage collection. + def run: F[Unit] = { val cancellation = F.asyncCheckAttempt[Unit] { cb => F.delay { // Check if we were already cancelled before calling run. From 270cc6d02009c8e728b2ad48bae349e50519b528 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Fri, 24 Nov 2023 11:12:35 -0500 Subject: [PATCH 17/17] Wait for Dispatcher in AsyncPublisher --- .../scala/cats/effect/interop/flow/AsyncPublisher.scala | 2 +- .../src/main/scala/cats/effect/interop/flow/package.scala | 7 ++++--- .../src/main/scala/cats/effect/interop/flow/syntax.scala | 7 ++++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala index 257d43cfd7..cffd5854c1 100644 --- a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala @@ -80,7 +80,7 @@ private[flow] object AsyncPublisher { )( implicit F: Async[F] ): Resource[F, AsyncPublisher[F, A]] = - Dispatcher.parallel[F](await = false).map { startDispatcher => + Dispatcher.parallel[F](await = true).map { startDispatcher => new DispatcherAsyncPublisher(fa, startDispatcher) } diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/package.scala b/core/shared/src/main/scala/cats/effect/interop/flow/package.scala index 5ca8a92ec3..f1a396b2f2 100644 --- a/core/shared/src/main/scala/cats/effect/interop/flow/package.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/package.scala @@ -133,9 +133,10 @@ package object flow { /** * Creates a [[Publisher]] from an effect. * - * The effect is only ran when elements are requested. Closing the [[Resource]] means - * gracefully shutting down all active subscriptions. Thus, no more elements will be - * published. + * The effect is only ran when elements are requested. Closing the [[Resource]] means not + * accepting new subscriptions, but waiting for all active ones to finish consuming. Canceling + * the [[Resource.use]] means gracefully shutting down all active subscriptions. Thus, no more + * elements will be published. * * @note * This [[Publisher]] can be reused for multiple [[Subscribers]], each [[Subscription]] will diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala b/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala index 48b8871953..8f5a120060 100644 --- a/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala +++ b/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala @@ -59,9 +59,10 @@ object syntax { /** * Creates a [[Publisher]] from this effect. * - * The effect is only ran when elements are requested. Closing the [[Resource]] means - * gracefully shutting down all active subscriptions. Thus, no more elements will be - * published. + * The effect is only ran when elements are requested. Closing the [[Resource]] means not + * accepting new subscriptions, but waiting for all active ones to finish consuming. + * Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions. + * Thus, no more elements will be published. * * @note * The [[Publisher]] can be reused for multiple [[Subscribers]], each [[Subscription]]