diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala new file mode 100644 index 0000000000..cffd5854c1 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncPublisher.scala @@ -0,0 +1,99 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.interop.flow + +import cats.effect.IO +import cats.effect.kernel.{Async, Resource} +import cats.effect.std.Dispatcher +import cats.effect.unsafe.IORuntime + +import java.util.Objects.requireNonNull +import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription} +import java.util.concurrent.RejectedExecutionException +import scala.util.control.NoStackTrace + +private[flow] sealed abstract class AsyncPublisher[F[_], A] private ( + fa: F[A] +)( + implicit F: Async[F] +) extends Publisher[A] { + protected def runSubscription(subscribe: F[Unit]): Unit + + override final def subscribe(subscriber: Subscriber[_ >: A]): Unit = { + requireNonNull( + subscriber, + "The subscriber provided to subscribe must not be null" + ) + try + runSubscription( + AsyncSubscription.subscribe(fa, subscriber) + ) + catch { + case _: IllegalStateException | _: RejectedExecutionException => + subscriber.onSubscribe(new Subscription { + override def cancel(): Unit = () + override def request(x$1: Long): Unit = () + }) + subscriber.onError(AsyncPublisher.CanceledAsyncPublisherException) + } + } +} + +private[flow] object AsyncPublisher { + private final class DispatcherAsyncPublisher[F[_], A]( + fa: F[A], + startDispatcher: Dispatcher[F] + )( + implicit F: Async[F] + ) extends AsyncPublisher[F, A](fa) { + override protected final def runSubscription(subscribe: F[Unit]): Unit = { + startDispatcher.unsafeRunAndForget(subscribe) + } + } + + private final class IORuntimeAsyncPublisher[A]( + ioa: IO[A] + )( + implicit runtime: IORuntime + ) extends AsyncPublisher[IO, A](ioa) { + override protected final def runSubscription(subscribe: IO[Unit]): Unit = { + subscribe.unsafeRunAndForget() + } + } + + def apply[F[_], A]( + fa: F[A] + )( + implicit F: Async[F] + ): Resource[F, AsyncPublisher[F, A]] = + Dispatcher.parallel[F](await = true).map { startDispatcher => + new DispatcherAsyncPublisher(fa, startDispatcher) + } + + def unsafe[A]( + ioa: IO[A] + )( + implicit runtime: IORuntime + ): AsyncPublisher[IO, A] = + new IORuntimeAsyncPublisher(ioa) + + private object CanceledAsyncPublisherException + extends IllegalStateException( + "This AsyncPublisher is not longer accepting subscribers" + ) + with NoStackTrace +} diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscriber.scala b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscriber.scala new file mode 100644 index 0000000000..e1f7258a17 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscriber.scala @@ -0,0 +1,284 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.interop.flow + +import cats.effect.kernel.Sync + +import java.util.Objects.requireNonNull +import java.util.concurrent.Flow.{Subscriber, Subscription} +import java.util.concurrent.atomic.AtomicReference +import scala.util.control.NoStackTrace + +/** + * Implementation of a [[Subscriber]]. + * + * This is used to obtain an effect from an upstream reactive-streams system. + * + * @see + * [[https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code]] + */ +private[flow] final class AsyncSubscriber[F[_], A] private ( + cb: Either[Throwable, Option[A]] => Unit, + currentState: AtomicReference[(AsyncSubscriber.State, () => Unit)] +)( + implicit F: Sync[F] +) extends Subscriber[A] { + import AsyncSubscriber.State._ + import AsyncSubscriber.Input._ + import AsyncSubscriber.InvalidStateException + + // Subscriber API. + + /** + * Receives a subscription from the upstream reactive-streams system. + */ + override final def onSubscribe(subscription: Subscription): Unit = { + requireNonNull( + subscription, + "The subscription provided to onSubscribe must not be null" + ) + nextState(input = Subscribe(subscription)) + } + + /** + * Receives the next record from the upstream reactive-streams system. + */ + override final def onNext(a: A): Unit = { + requireNonNull( + a, + "The element provided to onNext must not be null" + ) + nextState(input = Next(a)) + } + + /** + * Called by the upstream reactive-streams system when it fails. + */ + override final def onError(ex: Throwable): Unit = { + requireNonNull( + ex, + "The throwable provided to onError must not be null" + ) + nextState(input = Error(ex)) + } + + /** + * Called by the upstream reactive-streams system when it has finished sending records. + */ + override final def onComplete(): Unit = { + nextState(input = Complete) + } + + // Downstream cancellation. + + /** + * Allow downstream to cancel this subscriber. + */ + val cancel: F[Unit] = + F.delay(nextState(input = Cancel)) + + // Finite state machine. + + /** + * Helper to reduce noise when creating unary functions. + */ + private def run(block: => Unit): () => Unit = () => block + + /** + * Helper to reduce noise when failing with an InvalidStateException. + */ + private def invalidState(operation: String, state: State): Unit = + cb(Left(new InvalidStateException(operation, state))) + + /** + * Runs a single step of the state machine. + */ + private def step(input: Input): State => (State, () => Unit) = + input match { + case Subscribe(s) => { + // When subscribed: + // If we are in the initial state: + // We request a single element. + // Otherwise if we are in the canceled state: + // We cancel the subscription. + // Otherwise: + // We cancel the subscription, + // and attempt to complete the callback with an InvalidStateException. + case Initial => + Waiting(s) -> run { + s.request(1L) + } + + case Canceled => + Canceled -> run { + s.cancel() + } + + case state => + Completed -> run { + s.cancel() + invalidState( + operation = "Received subscription", + state + ) + } + } + + case Next(a) => { + // When the element is received: + // If we are in the waiting state: + // We attempt to complete the callback with the element, + // and cancel the upstream system. + // Otherwise: + // We attempt to complete the callback with an InvalidStateException. + case Waiting(s) => + Completed -> run { + s.cancel() + cb(Right(Some(a.asInstanceOf[A]))) + } + + case state => + Completed -> run { + invalidState( + operation = s"Received element [${a}]", + state + ) + } + } + + case Error(ex) => { + // When an error is received: + // If we are in the waiting state: + // We attempt to complete the callback with the error. + // Otherwise: + // We attempt to complete the callback with an InvalidStateException. + case Waiting(_) => + Completed -> run { + cb(Left(ex)) + } + + case state => + Completed -> run { + invalidState( + operation = s"Received error [${ex}]", + state + ) + } + } + + case Complete => { + // When a completion signal is received: + // If we are in the waiting state: + // We attempt to complete the callback with a None, + // and cancel the upstream system. + // Otherwise: + // We attempt to complete the callback with an InvalidStateException. + case Waiting(s) => + Completed -> run { + s.cancel() + cb(Right(None)) + } + + case state => + Completed -> run { + invalidState( + operation = s"Received completion signal", + state + ) + } + } + + case Cancel => { + // When a downstream cancellation signal is received: + // If we are in the waiting state: + // We cancel the upstream system. + // Otherwise: + // We put ourselves in the Canceled state. + case Waiting(s) => + Canceled -> run { + s.cancel() + } + + case _ => + Canceled -> (() => ()) + } + } + + /** + * Runs the next step of fsm. + * + * This function is concurrent safe, because the reactive-streams specs mention that all the + * on methods are to be called sequentially. Additionally, `Dequeue` and `Next` can't never + * happen concurrently, since they are tied together. Thus, these are the only concurrent + * options and all are covered: + `Subscribe` & `Dequeue`: No matter the order in which they + * are processed, we will end with a request call and a null buffer. + `Error` & `Dequeue`: No + * matter the order in which they are processed, we will complete the callback with the error. + * + cancellation & any other thing: Worst case, we will lose some data that we not longer + * care about; and eventually reach `Terminal`. + */ + private def nextState(input: Input): Unit = { + val (_, effect) = currentState.updateAndGet { + case (state, _) => + step(input)(state) + } + // Only run the effect after the state update took place. + effect() + } +} + +private[flow] object AsyncSubscriber { + def apply[F[_], A]( + cb: Either[Throwable, Option[A]] => Unit + )( + implicit F: Sync[F] + ): F[AsyncSubscriber[F, A]] = + F.delay { + new AsyncSubscriber( + cb, + currentState = new AtomicReference( + (State.Initial, () => ()) + ) + ) + } + + private sealed trait State + private object State { + type State = AsyncSubscriber.State + + case object Initial extends State + final case class Waiting(s: Subscription) extends State + case object Completed extends State + case object Canceled extends State + } + + private sealed trait Input + private object Input { + type Input = AsyncSubscriber.Input + + final case class Subscribe(s: Subscription) extends Input + final case class Next(a: Any) extends Input + final case class Error(ex: Throwable) extends Input + case object Complete extends Input + case object Cancel extends Input + } + + private final class InvalidStateException(operation: String, state: State) + extends IllegalStateException( + s"${operation} in invalid state [${state}]" + ) + with NoStackTrace +} diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala new file mode 100644 index 0000000000..64d9349177 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/interop/flow/AsyncSubscription.scala @@ -0,0 +1,158 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.interop.flow + +import cats.effect.kernel.{Async, Outcome} +import cats.effect.kernel.syntax.all._ +import cats.syntax.all._ + +import java.util.concurrent.CancellationException +import java.util.concurrent.Flow.{Subscription, Subscriber} +import java.util.concurrent.atomic.AtomicReference + +/** + * Implementation of a [[Subscription]]. + * + * This is used by the [[AsyncPublisher]] to send the result of an effect to a downstream + * reactive-streams system. + * + * @see + * [[https://github.com/reactive-streams/reactive-streams-jvm#3-subscription-code]] + */ +private[flow] final class AsyncSubscription[F[_], A] private ( + fa: F[A], + subscriber: Subscriber[_ >: A], + start: AtomicReference[() => Unit], + canceled: AtomicReference[() => Unit] +)( + implicit F: Async[F] +) extends Subscription { + import AsyncSubscription.Sentinel + + // Ensure we are on a terminal state; i.e. call `cancel`, before signaling the subscriber. + private def onError(ex: Throwable): Unit = { + cancel() + subscriber.onError(ex) + } + + private def onComplete(): Unit = { + cancel() + subscriber.onComplete() + } + + // This is a def rather than a val, because it is only used once. + // And having fields increase the instantiation cost and delay garbage collection. + def run: F[Unit] = { + val cancellation = F.asyncCheckAttempt[Unit] { cb => + F.delay { + // Check if we were already cancelled before calling run. + if (!canceled.compareAndSet(Sentinel, () => cb.apply(Either.unit))) { + Either.unit + } else { + Left(Some(F.unit)) + } + } + } + + val waitForRequest = F.async_[Unit] { cb => start.set(() => cb.apply(Either.unit)) } + + (waitForRequest >> fa) + .race(cancellation) + .guaranteeCase { + case Outcome.Succeeded(result) => + result.flatMap { + // The effect finished normally. + case Left(a) => + F.delay { + subscriber.onNext(a) + onComplete() + } + + case Right(()) => + // The effect was canceled by downstream. + F.unit + } + + case Outcome.Errored(ex) => + // The effect failed with an error. + F.delay(onError(ex)) + + case Outcome.Canceled() => + // The effect was canceled by upstream. + F.delay(onError(ex = new CancellationException("AsyncSubscription.run was canceled"))) + } + .void + } + + override final def cancel(): Unit = { + val cancelCB = canceled.getAndSet(null) + if (cancelCB ne null) { + cancelCB.apply() + } + } + + override final def request(n: Long): Unit = { + // First, confirm we are not yet cancelled. + if (canceled.get() ne null) { + // Second, ensure we were requested a positive number of elements. + if (n <= 0) { + // Otherwise, we raise an error according to the spec. + onError(ex = new IllegalArgumentException(s"Invalid number of elements [${n}]")) + } else { + // Then, we attempt to complete the start callback. + val startCB = start.getAndSet(null) + if (startCB ne null) { + startCB.apply() + } + } + } + } +} + +private[flow] object AsyncSubscription { + private final val Sentinel = () => () + + // Mostly for testing purposes. + def apply[F[_], A]( + fa: F[A], + subscriber: Subscriber[_ >: A] + )( + implicit F: Async[F] + ): F[AsyncSubscription[F, A]] = + F.delay { + val start = new AtomicReference(Sentinel) + val canceled = new AtomicReference(Sentinel) + + new AsyncSubscription( + fa, + subscriber, + start, + canceled + ) + } + + def subscribe[F[_], A]( + fa: F[A], + subscriber: Subscriber[_ >: A] + )( + implicit F: Async[F] + ): F[Unit] = + apply(fa, subscriber).flatMap { subscription => + F.delay(subscriber.onSubscribe(subscription)) >> + subscription.run + } +} diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/package.scala b/core/shared/src/main/scala/cats/effect/interop/flow/package.scala new file mode 100644 index 0000000000..f1a396b2f2 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/interop/flow/package.scala @@ -0,0 +1,202 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.interop + +import cats.effect.IO +import cats.effect.kernel.{Async, Resource} +import cats.effect.unsafe.IORuntime +import cats.syntax.all._ + +import java.util.concurrent.Flow.{Publisher, Subscriber} + +/** + * Implementation of the reactive-streams protocol for cats-effect; based on Java Flow. + * + * @example + * {{{ + * import cats.effect.{IO, Resource} + * import std.flow.syntax._ scala> + * import java.util.concurrent.Flow.Publisher + * import cats.effect.unsafe.implicits.global + * + * val upstream: IO[Int] = IO.pure(1) + * val publisher: Resource[IO, Publisher[Int]] = upstream.toPublisher + * val downstream: IO[Int] = publisher.use(_.toEffect[IO]) + * downstream.unsafeRunSync() + * // res0: Int = 1 + * }}} + * + * @see + * [[java.util.concurrent.Flow]] + */ +package object flow { + + /** + * Creates an effect from a `subscribe` function; analogous to a `Publisher`, but effectual. + * + * This effect will request a single item from the [[Publisher]] and then cancel it. The + * return value is an [[Option]] because the [[Publisher]] may complete without providing a + * single value. + * + * This function is useful when you actually need to provide a subscriber to a third-party. + * + * @example + * {{{ + * import cats.effect.IO + * import java.util.concurrent.Flow.{Publisher, Subscriber} + * + * def thirdPartyLibrary(subscriber: Subscriber[Int]): Unit = { + * val somePublisher: Publisher[Int] = ??? + * somePublisher.subscribe(subscriber) + * } + * + * // Interop with the third party library. + * cats.effect.std.flow.fromPublisher[IO, Int] { subscriber => + * IO.println("Subscribing!") >> + * IO.delay(thirdPartyLibrary(subscriber)) >> + * IO.println("Subscribed!") + * } + * res0: IO[Int] = IO(..) + * }}} + * + * @note + * The subscribe function will not be executed until the effect is run. + * + * @see + * the overload that only requires a [[Publisher]]. + * + * @param subscribe + * The effectual function that will be used to initiate the consumption process, it receives + * a [[Subscriber]] that should be used to subscribe to a [[Publisher]]. The `subscribe` + * operation must be called exactly once. + */ + def fromPublisher[F[_], A]( + subscribe: Subscriber[A] => F[Unit] + )( + implicit F: Async[F] + ): F[Option[A]] = + F.async { cb => + AsyncSubscriber(cb).flatMap { subscriber => + subscribe(subscriber).as( + Some(subscriber.cancel) + ) + } + } + + /** + * Creates an effect from a [[Publisher]]. + * + * This effect will request a single item from the [[Publisher]] and then cancel it. The + * return value is an [[Option]] because the [[Publisher]] may complete without providing a + * single value. + * + * @example + * {{{ + * import cats.effect.IO + * import java.util.concurrent.Flow.Publisher + * + * def getThirdPartyPublisher(): Publisher[Int] = ??? + * + * // Interop with the third party library. + * IO.delay(getThirdPartyPublisher()).flatMap { publisher => + * cats.effect.std.flow.fromPublisher[IO](publisher) + * } + * res0: IO[Int] = IO(..) + * }}} + * + * @note + * The [[Publisher]] will not receive a [[Subscriber]] until the effect is run. + * + * @see + * the `toEffect` extension method added to [[Publisher]]. + * + * @param publisher + * The [[Publisher]] to consume. + */ + def fromPublisher[F[_]]: syntax.FromPublisherPartiallyApplied[F] = + new syntax.FromPublisherPartiallyApplied(dummy = true) + + /** + * Creates a [[Publisher]] from an effect. + * + * The effect is only ran when elements are requested. Closing the [[Resource]] means not + * accepting new subscriptions, but waiting for all active ones to finish consuming. Canceling + * the [[Resource.use]] means gracefully shutting down all active subscriptions. Thus, no more + * elements will be published. + * + * @note + * This [[Publisher]] can be reused for multiple [[Subscribers]], each [[Subscription]] will + * re-run the effect. + * + * @see + * [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]]. + * + * @see + * [[subscribeEffect]] for a simpler version that only requires a [[Subscriber]]. + * + * @param fa + * The effect to transform. + */ + def toPublisher[F[_], A]( + fa: F[A] + )( + implicit F: Async[F] + ): Resource[F, Publisher[A]] = + AsyncPublisher(fa) + + /** + * Creates a [[Publisher]] from an [[IO]]. + * + * The [[IO]] is only ran when elements are requested. + * + * @note + * This [[Publisher]] can be reused for multiple [[Subscribers]], each [[Subscription]] will + * re-run the [[IO]]. + * + * @see + * [[toPublisher]] for a safe version that returns a [[Resource]]. + * + * @param ioa + * The [[IO]] to transform. + */ + def unsafeToPublisher[A]( + ioa: IO[A] + )( + implicit runtime: IORuntime + ): Publisher[A] = + AsyncPublisher.unsafe(ioa) + + /** + * Allows subscribing a [[Subscriber]] to an effect. + * + * The returned program will run the passed effect, then send the result to the + * [[Subscriber]], and finally complete the subscription. Cancelling this program will + * gracefully cancel the subscription. + * + * @param fa + * the effect that will be consumed by the subscriber. + * @param subscriber + * the [[Subscriber]] that will receive the result of the effect. + */ + def subscribeEffect[F[_], A]( + fa: F[A], + subscriber: Subscriber[_ >: A] + )( + implicit F: Async[F] + ): F[Unit] = + AsyncSubscription.subscribe(fa, subscriber) +} diff --git a/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala b/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala new file mode 100644 index 0000000000..8f5a120060 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/interop/flow/syntax.scala @@ -0,0 +1,121 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.interop +package flow + +import cats.effect.IO +import cats.effect.kernel.{Async, Resource} +import cats.effect.unsafe.IORuntime + +import java.util.concurrent.Flow.{Publisher, Subscriber} + +object syntax { + implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal { + + /** + * Creates an effect from this [[Publisher]]. + * + * This effect will request a single item from the [[Publisher]] and then cancel it. The + * return value is an [[Option]] because the [[Publisher]] may complete without providing a + * single value. + * + * @example + * {{{ + * import cats.effect.IO + * import java.util.concurrent.Flow.Publisher + * + * def getThirdPartyPublisher(): Publisher[Int] = ??? + * + * // Interop with the third party library. + * IO.delay(getThirdPartyPublisher()).flatMap { publisher => + * publisher.toEffect[IO] + * } + * res0: IO[Int] = IO(..) + * }}} + * + * @note + * The [[Publisher]] will not receive a [[Subscriber]] until the effect is run. + */ + def toEffect[F[_]](implicit F: Async[F]): F[Option[A]] = + fromPublisher[F](publisher) + } + + implicit final class FOps[F[_], A](private val fa: F[A]) extends AnyVal { + + /** + * Creates a [[Publisher]] from this effect. + * + * The effect is only ran when elements are requested. Closing the [[Resource]] means not + * accepting new subscriptions, but waiting for all active ones to finish consuming. + * Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions. + * Thus, no more elements will be published. + * + * @note + * The [[Publisher]] can be reused for multiple [[Subscribers]], each [[Subscription]] + * will re-run the effect. + * + * @see + * [[subscribe]] for a simpler version that only requires a [[Subscriber]]. + */ + def toPublisher(implicit F: Async[F]): Resource[F, Publisher[A]] = + flow.toPublisher(fa) + + /** + * Allows subscribing a [[Subscriber]] to this effect. + * + * The returned program will run this effect, then send the result to the [[Subscriber]], + * and finally complete the subscription. Cancelling this program will gracefully cancel the + * subscription. + * + * @param subscriber + * the [[Subscriber]] that will receive the result of this effect. + */ + def subscribeEffect( + subscriber: Subscriber[_ >: A] + )( + implicit F: Async[F] + ): F[Unit] = + flow.subscribeEffect(fa, subscriber) + } + + implicit final class IOOps[A](private val ioa: IO[A]) extends AnyVal { + + /** + * Creates a [[Publisher]] from this [[IO]]. + * + * The [[IO]] is only ran when elements are requested. + * + * @note + * The [[Publisher]] can be reused for multiple [[Subscribers]], each [[Subscription]] + * will re-run the [[IO]]. + * + * @see + * [[toPublisher]] for a safe version that returns a [[Resource]]. + */ + def unsafeToPublisher()(implicit runtime: IORuntime): Publisher[A] = + flow.unsafeToPublisher(ioa) + } + + final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal { + def apply[A]( + publisher: Publisher[A] + )( + implicit F: Async[F] + ): F[Option[A]] = + fromPublisher[F, A] { subscriber => F.delay(publisher.subscribe(subscriber)) } + } +}