From 619dcc0465cdbe3998d42a53bc0251da8c2e0263 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Sat, 3 Aug 2024 12:26:25 -0500 Subject: [PATCH 1/8] Add Stack interface --- .../main/scala/cats/effect/std/Stack.scala | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 std/shared/src/main/scala/cats/effect/std/Stack.scala diff --git a/std/shared/src/main/scala/cats/effect/std/Stack.scala b/std/shared/src/main/scala/cats/effect/std/Stack.scala new file mode 100644 index 0000000000..2fc2805edf --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/Stack.scala @@ -0,0 +1,128 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats +package effect +package std + +import cats.effect.kernel.{Async, GenConcurrent, Sync} + +/** + * A purely functional, concurrent data structure which allows insertion and retrieval of + * elements of type `A` in a last-in-first-out (LIFO) manner. + * + * The [[Stack#push]] operation never blocks and will always succeed. + * + * The [[Stack#pop]] operation semantically blocks when the `Stack` is empty, [[Stack#tryPop]] + * allow for use cases which want to avoid blocking a fiber. + * + * The [[Stack#peek]] operation never blocks and will always succeed, it would however not + * remove the element from the `Stack`, and there is no guarantee that a consequent `pop` would + * return the same element. + */ +abstract class Stack[F[_], A] { self => + + /** + * Pushes the given element to the top of the `Stack`. + * + * @param a + * the element to push at the top of the `Stack`. + */ + def push(a: A): F[Unit] + + /** + * Pushes the given elements to the top of the `Stack`, the last element will be the final + * top. + * + * @param as + * the elements to push at the top of the `Stack`. + */ + def pushN(as: A*): F[Unit] + + /** + * Takes the top element of `Stack`, if there is none it will semantically block until one is + * made available. If multiple fibers are waiting for an element, they will be served in order + * of arrival. + */ + def pop: F[A] + + /** + * Tries ta take the top element of `Stack`, if there is none it will return `None`. + */ + def tryPop: F[Option[A]] + + /** + * Returns the top element of the `Stack`, if there is any, without removing it. + * + * @note + * In a concurrent scenario, there is no guarantee that a `peek` followed by a `pop` or + * `tryPop` would return the same element. + */ + def peek: F[Option[A]] + + /** + * Returns the number of elements currently present in the `Stack`. + * + * @note + * In a concurrent scenario, this value must be considered stale immediately after returning + * it. There is no guarantee that doing a `pop` after seeing a value bigger than `0` will + * not block. + */ + def size: F[Int] + + /** + * Modifies the context in which this `Stack` is executed using the natural transformation + * `f`. + * + * @return + * a `Stack` in the new context obtained by mapping the current one using `f`. + */ + final def mapK[G[_]](f: F ~> G): Stack[G, A] = + new Stack[G, A] { + override def push(a: A): G[Unit] = + f(self.push(a)) + + override def pushN(as: A*): G[Unit] = + f(self.pushN(as: _*)) + + override def pop: G[A] = + f(self.pop) + + override def tryPop: G[Option[A]] = + f(self.tryPop) + + override def peek: G[Option[A]] = + f(self.peek) + + override def size: G[Int] = + f(self.size) + } +} + +object Stack { + + /** + * Creates a new `Stack`. + */ + def apply[F[_], A](implicit F: GenConcurrent[F, _]): F[Stack[F, A]] = + ??? + + /** + * Creates a new `Stack`. Like `apply` but initializes state using another effect constructor. + */ + def in[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[Stack[G, A]] = + ??? +} From 712e8e4895c79c1f2dc750e800647a11ffc9e50e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Sat, 3 Aug 2024 12:51:51 -0500 Subject: [PATCH 2/8] Add Stack docs --- docs/std/stack.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 docs/std/stack.md diff --git a/docs/std/stack.md b/docs/std/stack.md new file mode 100644 index 0000000000..4914fcd2ab --- /dev/null +++ b/docs/std/stack.md @@ -0,0 +1,27 @@ +--- +id: stack +title: Stack +--- + +A `Stack` is a concurrent data structure which allows insertion and retrieval of +elements of in a last-in-first-out (LIFO) manner. + +```scala +trait Stack[F[_], A] { + def push(a: A): F[Unit] + + def pushN(as: A*): F[Unit] + + def pop: F[A] + + def tryPop: F[Option[A]] + + def peek: F[Option[A]] +} +``` + +* `push`: Pushes an element to the top of the `Stack`, never blocks and will always succeed. +* `pushN`: Pushes many element sto the top of the `Stack`, the last element will be the final top, never blocks and will always succeed. +* `pop`: Retrieves the top element from the `Stack`, semantically blocks when the `Stack` is empty. +* `tryPop`: Similar to `pop` but rather than blocking, when empty will return `None`. +* `peek` Similar to `tryPop` but would not remove the element from the `Stack`. There is no guarantee that a consequent `pop`, `tryPop`, or `peek` would return the same element due to concurrency. From e7b3a833381fb28bc2d9bcd0f11012577924e54c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Sat, 3 Aug 2024 14:31:24 -0500 Subject: [PATCH 3/8] Add Stack tests --- .../scala/cats/effect/std/StackSpec.scala | 235 ++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 tests/shared/src/test/scala/cats/effect/std/StackSpec.scala diff --git a/tests/shared/src/test/scala/cats/effect/std/StackSpec.scala b/tests/shared/src/test/scala/cats/effect/std/StackSpec.scala new file mode 100644 index 0000000000..56ea027741 --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/std/StackSpec.scala @@ -0,0 +1,235 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats +package effect +package std + +import cats.arrow.FunctionK +import cats.syntax.all._ + +import org.specs2.specification.core.Fragments + +import scala.concurrent.duration._ + +final class StackSpec extends BaseSpec with DetectPlatform { + + final override def executionTimeout = 2.minutes + + "ConcurrentStack" should { + tests(Stack.apply[IO, Int]) + } + + "Stack with dual constructors" should { + tests(Stack.in[IO, IO, Int]) + } + + "MapK'd Stack" should { + tests(Stack[IO, Int].map(_.mapK[IO](FunctionK.id))) + } + + def tests(stack: IO[Stack[IO, Int]]): Fragments = { + "push and retrieve elements in LIFO order" in real { + val p = for { + s <- stack + _ <- s.push(0) + _ <- s.push(1) + _ <- List.range(start = 2, end = 7).traverse_(n => s.push(n)) + _ <- s.pushN(7, 8, 9, 10) + _ <- s.pushN(List.range(start = 11, end = 15): _*) + result <- s.pop.replicateA(15) + } yield result + + p mustEqual List.range(start = 0, end = 15).reverse + } + + "allow intercalate pushes and pops respecting LIFO order" in real { + val p = for { + s <- stack + _ <- s.push(0) + r1 <- s.pop + _ <- s.push(1) + _ <- s.push(3) + _ <- s.push(5) + r2 <- s.pop + r3 <- s.pop + _ <- s.push(10) + r4 <- s.pop + r5 <- s.pop + } yield List(r1, r2, r3, r4, r5) + + p mustEqual List(0, 5, 3, 10, 1) + } + + "block pop if empty" in ticked { implicit ticker => + val p = stack.flatMap(s => s.pop.void) + + p must nonTerminate + } + + "unblock pop with a push" in ticked { implicit ticker => + val p = stack.flatMap { s => + ( + s.pop, + (IO.sleep(1.second) >> s.push(1)) + ).parTupled + } + + p must completeAs((1, ())) + } + + "blocked fibers must be released in FIFO order (multiple pushes, elements in FIFO order)" in ticked { + implicit ticker => + val numbers = List.range(start = 1, end = 10) + val p = stack.flatMap { s => + val popAll = numbers.parTraverse { i => IO.sleep(i.millis) >> s.pop } + + val pushAll = IO.sleep(100.millis) >> numbers.traverse_(s.push) + + (popAll, pushAll).parTupled + } + + p must completeAs((numbers, ())) + } + + "blocked fibers must be released in FIFO order (pushN, elements in LIFO orden)" in ticked { + implicit ticker => + val numbers = List.range(start = 1, end = 10) + val p = stack.flatMap { s => + val popAll = numbers.parTraverse { i => IO.sleep(i.millis) >> s.pop } + + val pushAll = IO.sleep(100.millis) >> s.pushN(numbers: _*) + + (popAll, pushAll).parTupled + } + + p must completeAs((numbers.reverse, ())) + } + + "cancelling a blocked pop must remove it from waiting queue" in ticked { implicit ticker => + val p = for { + s <- stack + f1 <- s.pop.start + _ <- IO.sleep(1.second) + _ <- f1.cancel + f2 <- s.pop.start + _ <- IO.sleep(1.second) + f3 <- s.pop.start + _ <- IO.sleep(1.second) + f4 <- s.pop.start + _ <- IO.sleep(1.second) + f5 <- s.pop.start + _ <- IO.sleep(1.second) + f6 <- s.pop.start + _ <- IO.sleep(1.second) + f7 <- s.pop.start + _ <- IO.sleep(1.second) + _ <- f2.cancel + _ <- s.push(1) + r3 <- f3.joinWithNever + _ <- s.push(3) + r4 <- f4.joinWithNever + _ <- f6.cancel + _ <- s.push(5) + _ <- s.push(10) + r5 <- f5.joinWithNever + r7 <- f7.joinWithNever + } yield List(r3, r4, r5, r7) + + p must completeAs(List(1, 3, 5, 10)) + } + + "tryPop must not block if empty" in real { + val p = for { + s <- stack + r1 <- s.tryPop + _ <- s.push(3) + r2 <- s.tryPop + } yield List(r1, r2) + + p mustEqual List(None, Some(3)) + } + + "peek must not block and must not remove the element" in real { + val p = for { + s <- stack + r1 <- s.peek + _ <- s.push(1) + _ <- s.push(3) + r2 <- s.peek + r3 <- s.peek + r4 <- s.tryPop + r5 <- s.peek + } yield List(r1, r2, r3, r4, r5) + + p mustEqual List(None, Some(3), Some(3), Some(3), Some(1)) + } + + "size must be consistent in a non concurrent scenario" in real { + val p = for { + s <- stack + r1 <- s.size + _ <- s.push(1) + r2 <- s.size + _ <- s.pushN(2, 3, 4, 5) + r3 <- s.size + _ <- s.pop + _ <- s.pop + r4 <- s.size + } yield List(r1, r2, r3, r4) + + p mustEqual List(0, 1, 5, 3) + } + + "used concurrently" in ticked { implicit ticker => + val numbers = List.range(start = 0, end = 10) + val p = stack.flatMap { s => + ( + s.pop.parReplicateA(numbers.size).map(_.sorted), + numbers.parTraverse_(s.push) + ).parTupled + } + + p must completeAs((numbers, ())) + } + + "not lost elements when concurrently canceling a pop with a push" in ticked { + implicit timer => + val p = (stack, IO.deferred[Either[Int, Option[Int]]]).flatMapN { + case (s, df) => + val left = + IO.uncancelable(poll => poll(s.pop).flatMap(n => df.complete(Left(n)))).void + val right = + s.tryPop.flatMap(on => df.complete(Right(on))).void + + left.start.flatMap { f => + ( + IO.sleep(10.millis) >> f.cancel, + IO.sleep(10.millis) >> s.push(1) + ).parTupled >> f.joinWithUnit + } >> right >> df.get + } + + List.fill(if (isJVM) 1000 else 5)(p).forallM { result => + result.map { + case Left(1) => true + case Right(Some(1)) => true + case _ => false + } + } must completeAs(true) + } + } +} From 022511ab11e18c5974e3eff3a2efd4a0cd24165e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Mon, 25 Nov 2024 13:55:42 -0500 Subject: [PATCH 4/8] Add Stack implementation --- .../main/scala/cats/effect/std/Stack.scala | 140 ++++++++++++++++-- 1 file changed, 128 insertions(+), 12 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Stack.scala b/std/shared/src/main/scala/cats/effect/std/Stack.scala index 2fc2805edf..444dbc4fa0 100644 --- a/std/shared/src/main/scala/cats/effect/std/Stack.scala +++ b/std/shared/src/main/scala/cats/effect/std/Stack.scala @@ -18,7 +18,8 @@ package cats package effect package std -import cats.effect.kernel.{Async, GenConcurrent, Sync} +import cats.effect.kernel._ +import cats.syntax.all._ /** * A purely functional, concurrent data structure which allows insertion and retrieval of @@ -38,19 +39,19 @@ abstract class Stack[F[_], A] { self => /** * Pushes the given element to the top of the `Stack`. * - * @param a + * @param element * the element to push at the top of the `Stack`. */ - def push(a: A): F[Unit] + def push(element: A): F[Unit] /** * Pushes the given elements to the top of the `Stack`, the last element will be the final * top. * - * @param as + * @param elements * the elements to push at the top of the `Stack`. */ - def pushN(as: A*): F[Unit] + def pushN(elements: A*): F[Unit] /** * Takes the top element of `Stack`, if there is none it will semantically block until one is @@ -92,11 +93,11 @@ abstract class Stack[F[_], A] { self => */ final def mapK[G[_]](f: F ~> G): Stack[G, A] = new Stack[G, A] { - override def push(a: A): G[Unit] = - f(self.push(a)) + override def push(element: A): G[Unit] = + f(self.push(element)) - override def pushN(as: A*): G[Unit] = - f(self.pushN(as: _*)) + override def pushN(elements: A*): G[Unit] = + f(self.pushN(elements: _*)) override def pop: G[A] = f(self.pop) @@ -117,12 +118,127 @@ object Stack { /** * Creates a new `Stack`. */ - def apply[F[_], A](implicit F: GenConcurrent[F, _]): F[Stack[F, A]] = - ??? + def apply[F[_], A](implicit F: Concurrent[F]): F[Stack[F, A]] = + // Initialize the state with an empty stack. + Ref.of[F, StackState[F, A]](StackState.empty).map(state => new ConcurrentImpl(state)) /** * Creates a new `Stack`. Like `apply` but initializes state using another effect constructor. */ def in[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[Stack[G, A]] = - ??? + // Initialize the state with an empty stack. + Ref.in[F, G, StackState[G, A]](StackState.empty).map(state => new ConcurrentImpl(state)) + + private final case class StackState[F[_], A]( + elements: List[A], + waiters: collection.immutable.Queue[Deferred[F, A]] + ) { + type CopyResult = StackState[F, A] + type ModifyResult[R] = (CopyResult, R) + + def push(element: A)(implicit F: Concurrent[F]): ModifyResult[F[Unit]] = + waiters.dequeueOption match { + case Some((waiter, remainingWaiters)) => + this.copy(waiters = remainingWaiters) -> waiter.complete(element).void + + case None => + this.copy(elements = element :: this.elements) -> F.unit + } + + def pushN(elements: Seq[A])(implicit F: Concurrent[F]): ModifyResult[F[Unit]] = + if (this.waiters.isEmpty) + // If there are no waiters we just push all the elements in reverse order. + this.copy(elements = this.elements.prependedAll(elements.reverseIterator)) -> F.unit + else { + // Otherwise, if there is at least one waiter, we take all we can. + val (remaining, waitersToNotify) = + elements.reverse.align(this.waiters).partitionMap(_.unwrap) + + // We notify all the waiters we could take. + val notifyWaiters = waitersToNotify.traverse_ { + case (element, waiter) => + waiter.complete(element).void + } + + // The remaining elements are either all elements, or all waiters. + val newState = remaining.parTraverse(_.toEitherNec) match { + case Left(remainingElements) => + // If only elements remained, then we preserve all the pending waiters, + // and set the Stack elements as the remaining ones. + // This is safe because the remaining elements are already in the correct order, + // and since there was at least one waiter then we can assume there were not pending elements. + this.copy(elements = remainingElements.toList) + + case Right(remainingWaiters) => + // If only waiters remained, then we create a new Queue from them. + this.copy(waiters = collection.immutable.Queue.from(remainingWaiters)) + } + + newState -> notifyWaiters + } + + def pop( + waiter: Deferred[F, A], + poll: Poll[F] + )( + implicit F: Concurrent[F] + ): ModifyResult[F[A]] = + elements match { + case head :: tail => + this.copy(elements = tail) -> F.pure(head) + + case Nil => + this.copy(waiters = waiters.enqueue(waiter)) -> poll(waiter.get) + } + + def removeWaiter(waiter: Deferred[F, A]): CopyResult = + this.copy(waiters = this.waiters.filterNot(_ eq waiter)) + + def tryPop: ModifyResult[Option[A]] = + elements match { + case head :: tail => + this.copy(elements = tail) -> Some(head) + + case Nil => + this -> None + } + } + + private object StackState { + def empty[F[_], A]: StackState[F, A] = StackState( + elements = List.empty, + waiters = collection.immutable.Queue.empty + ) + } + + private final class ConcurrentImpl[F[_], A]( + state: Ref[F, StackState[F, A]] + )( + implicit F: Concurrent[F] + ) extends Stack[F, A] { + override def push(element: A): F[Unit] = + F.uncancelable(_ => state.flatModify(_.push(element))) + + override def pushN(elements: A*): F[Unit] = + F.uncancelable(_ => state.flatModify(_.pushN(elements))) + + override final val pop: F[A] = + F.uncancelable { poll => + for { + waiter <- Deferred[F, A] + wait <- state.modify(_.pop(waiter, poll)) + waitCancelledFinalizer = state.update(_.removeWaiter(waiter)) + result <- F.onCancel(wait, waitCancelledFinalizer) + } yield result + } + + override final val tryPop: F[Option[A]] = + F.uncancelable(_ => state.modify(_.tryPop)) + + override final val peek: F[Option[A]] = + state.get.map(_.elements.headOption) + + override final val size: F[Int] = + state.get.map(_.elements.size) + } } From 91ade26d7ddbacdd766a112fcc94e39d564e891a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Tue, 26 Nov 2024 00:38:59 -0500 Subject: [PATCH 5/8] Fix losing elements with concurrent cancellations (push & pop) --- .../main/scala/cats/effect/std/Stack.scala | 65 ++++++++++++++----- 1 file changed, 47 insertions(+), 18 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Stack.scala b/std/shared/src/main/scala/cats/effect/std/Stack.scala index 444dbc4fa0..72b7bcd4f4 100644 --- a/std/shared/src/main/scala/cats/effect/std/Stack.scala +++ b/std/shared/src/main/scala/cats/effect/std/Stack.scala @@ -136,13 +136,13 @@ object Stack { type CopyResult = StackState[F, A] type ModifyResult[R] = (CopyResult, R) - def push(element: A)(implicit F: Concurrent[F]): ModifyResult[F[Unit]] = + def push(element: A)(implicit F: Concurrent[F]): ModifyResult[F[Boolean]] = waiters.dequeueOption match { case Some((waiter, remainingWaiters)) => - this.copy(waiters = remainingWaiters) -> waiter.complete(element).void + this.copy(waiters = remainingWaiters) -> waiter.complete(element) case None => - this.copy(elements = element :: this.elements) -> F.unit + this.copy(elements = element :: this.elements) -> F.pure(true) } def pushN(elements: Seq[A])(implicit F: Concurrent[F]): ModifyResult[F[Unit]] = @@ -177,18 +177,13 @@ object Stack { newState -> notifyWaiters } - def pop( - waiter: Deferred[F, A], - poll: Poll[F] - )( - implicit F: Concurrent[F] - ): ModifyResult[F[A]] = + def pop(waiter: Deferred[F, A]): ModifyResult[Option[A]] = elements match { case head :: tail => - this.copy(elements = tail) -> F.pure(head) + this.copy(elements = tail) -> Some(head) case Nil => - this.copy(waiters = waiters.enqueue(waiter)) -> poll(waiter.get) + this.copy(waiters = waiters.enqueue(waiter)) -> None } def removeWaiter(waiter: Deferred[F, A]): CopyResult = @@ -217,19 +212,53 @@ object Stack { implicit F: Concurrent[F] ) extends Stack[F, A] { override def push(element: A): F[Unit] = - F.uncancelable(_ => state.flatModify(_.push(element))) + F.uncancelable { _ => + // Try to push an element to the Stack. + state.flatModify(_.push(element)).flatMap { + case true => + // If it worked we finish the process. + F.unit + + case false => + // If it failed, we retry. + this.push(element) + } + } override def pushN(elements: A*): F[Unit] = F.uncancelable(_ => state.flatModify(_.pushN(elements))) override final val pop: F[A] = F.uncancelable { poll => - for { - waiter <- Deferred[F, A] - wait <- state.modify(_.pop(waiter, poll)) - waitCancelledFinalizer = state.update(_.removeWaiter(waiter)) - result <- F.onCancel(wait, waitCancelledFinalizer) - } yield result + Deferred[F, A].flatMap { waiter => + // Try to pop the head of the Stack. + state.modify(_.pop(waiter)).flatMap { + case Some(head) => + // If there is one, we simply return it. + F.pure(head) + + case None => + // If there wasn't one, + // we already added our waiter at the end of the waiters queue. + // We then need to wait for it to be completed. + // However, we may be cancelled while waiting for that. + // If we are cancelled, then we will try to invalidate our waiter: + val waitCancelledFinalizer = waiter.complete(null.asInstanceOf[A]).flatMap { + case true => + // If we managed to invalidate our waiter, + // we try to remove it from the waiters queue. + state.update(_.removeWaiter(waiter)).void + + case false => + // But, if we didn't managed to invalidate it. + // Then, that means we managed to receive a pushed element. + // Thus, we have to push it again to avoid it getting lost. + waiter.get.flatMap(element => this.push(element)) + } + + F.onCancel(poll(waiter.get), waitCancelledFinalizer) + } + } } override final val tryPop: F[Option[A]] = From 32d36c763106134a5d3da61e135b2e3294103b45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Tue, 26 Nov 2024 00:53:05 -0500 Subject: [PATCH 6/8] Add 'Stack should not lost elements when concurrently canceling multiple pops with a pushN' test --- .../scala/cats/effect/std/StackSpec.scala | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/std/StackSpec.scala b/tests/shared/src/test/scala/cats/effect/std/StackSpec.scala index 56ea027741..f655fee9fd 100644 --- a/tests/shared/src/test/scala/cats/effect/std/StackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/StackSpec.scala @@ -208,7 +208,7 @@ final class StackSpec extends BaseSpec with DetectPlatform { "not lost elements when concurrently canceling a pop with a push" in ticked { implicit timer => - val p = (stack, IO.deferred[Either[Int, Option[Int]]]).flatMapN { + val task = (stack, IO.deferred[Either[Int, Option[Int]]]).flatMapN { case (s, df) => val left = IO.uncancelable(poll => poll(s.pop).flatMap(n => df.complete(Left(n)))).void @@ -223,13 +223,36 @@ final class StackSpec extends BaseSpec with DetectPlatform { } >> right >> df.get } - List.fill(if (isJVM) 1000 else 5)(p).forallM { result => + val p = List.fill(if (isJVM) 1000 else 5)(task).forallM { result => result.map { case Left(1) => true case Right(Some(1)) => true case _ => false } - } must completeAs(true) + } + + p must completeAs(true) + } + + "not lost elements when concurrently canceling multiple pops with a pushN" in ticked { + implicit timer => + val numbers = List.range(start = 0, end = 10) + + val task = for { + s <- stack + fibers <- s.pop.option.start.replicateA(5) + _ <- ( + IO.sleep(10.millis) >> fibers.parTraverse_(_.cancel), + IO.sleep(10.millis) >> s.pushN(numbers: _*) + ).parTupled + popedElements <- fibers.traverseFilter(_.joinWith(IO.none)) + remainingElements <- s.pop.replicateA(numbers.size - popedElements.size) + } yield (popedElements ++ remainingElements).sorted + + val p = + List.fill(if (isJVM) 1000 else 5)(task).forallM(result => result.map(_ == numbers)) + + p must completeAs(true) } } } From 0fb4739bcc9f83d05bb158488d68553ac246076d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Tue, 26 Nov 2024 01:13:25 -0500 Subject: [PATCH 7/8] Fix losing elements with concurrent cancellations (pushN) --- .../main/scala/cats/effect/std/Stack.scala | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Stack.scala b/std/shared/src/main/scala/cats/effect/std/Stack.scala index 72b7bcd4f4..458b0d0ddb 100644 --- a/std/shared/src/main/scala/cats/effect/std/Stack.scala +++ b/std/shared/src/main/scala/cats/effect/std/Stack.scala @@ -145,19 +145,26 @@ object Stack { this.copy(elements = element :: this.elements) -> F.pure(true) } - def pushN(elements: Seq[A])(implicit F: Concurrent[F]): ModifyResult[F[Unit]] = + def pushN(elements: Seq[A])(implicit F: Concurrent[F]): ModifyResult[F[Seq[A]]] = if (this.waiters.isEmpty) // If there are no waiters we just push all the elements in reverse order. - this.copy(elements = this.elements.prependedAll(elements.reverseIterator)) -> F.unit + this.copy( + elements = this.elements.prependedAll(elements.reverseIterator) + ) -> F.pure(Seq.empty) else { // Otherwise, if there is at least one waiter, we take all we can. val (remaining, waitersToNotify) = elements.reverse.align(this.waiters).partitionMap(_.unwrap) - // We notify all the waiters we could take. - val notifyWaiters = waitersToNotify.traverse_ { + // We try to notify all the waiters we could take. + val notifyWaiters = waitersToNotify.traverseFilter { case (element, waiter) => - waiter.complete(element).void + waiter.complete(element).map { + // If the waiter was successfully awaken, we remove the element from the Stack. + case true => None + // Otherwise, we preserve the element for retrying the push. + case false => Some(element) + } } // The remaining elements are either all elements, or all waiters. @@ -226,7 +233,18 @@ object Stack { } override def pushN(elements: A*): F[Unit] = - F.uncancelable(_ => state.flatModify(_.pushN(elements))) + F.uncancelable { _ => + // If elements is empty, do nothing. + if (elements.isEmpty) F.unit + // Optimize for the singleton case. + else if (elements.sizeIs == 1) this.push(elements.head) + else + // Otherwise try to push all the elements at once. + state.flatModify(_.pushN(elements)).flatMap { failedElements => + // For the elements we failed to push, we retry. + this.pushN(failedElements: _*) + } + } override final val pop: F[A] = F.uncancelable { poll => From aad9656691b9a5989478335d9aaab1a01c4bc535 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Tue, 26 Nov 2024 09:28:36 -0500 Subject: [PATCH 8/8] Fix pushN retry elements order. --- std/shared/src/main/scala/cats/effect/std/Stack.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Stack.scala b/std/shared/src/main/scala/cats/effect/std/Stack.scala index 458b0d0ddb..39d7f21319 100644 --- a/std/shared/src/main/scala/cats/effect/std/Stack.scala +++ b/std/shared/src/main/scala/cats/effect/std/Stack.scala @@ -242,7 +242,7 @@ object Stack { // Otherwise try to push all the elements at once. state.flatModify(_.pushN(elements)).flatMap { failedElements => // For the elements we failed to push, we retry. - this.pushN(failedElements: _*) + this.pushN(failedElements.reverse: _*) } }