Skip to content

Queue.take.timeout(...) loses elements on cancelation #4571

@TomasMikula

Description

@TomasMikula

While queue.take.timeout(...) by itself does not lose elements on timeout, it does lose elements when the fiber running it is canceled.

Minimized code

import cats.effect.std.Queue
import cats.effect.{ExitCode, IO, IOApp}

import scala.concurrent.TimeoutException
import scala.concurrent.duration.DurationInt

object QueueTakeTimeoutCancelTest1 extends IOApp {
  val delay = 1.microsecond

  override def run(args: List[String]): IO[ExitCode] =
    Queue
      .unbounded[IO, Int]
      .flatMap { queue =>
        val check: IO[Unit] =
          for {
            // producer will enqueue an element after the given `delay`
            _ <- (IO.sleep(delay) *> queue.offer(42)).start

            // consumer will wait for an elem for up to the given `delay`
            elem <- queue.take
              .timeout(delay)
              .timeout(delay) // THIS ONE CAUSES TROUBLE (without this line, the test passes)
              .recover { case _: TimeoutException => -1 }

            _ <- IO.whenA(elem == -1) {
              // Timed out, the element must not have been dequeued.
              // 1 second should be plenty of time to dequeue it (TimeoutException means element was lost)
              queue.take.timeout(1.second)
                .adaptError { case _: TimeoutException => new RuntimeException("Element was lost!") }
                .void
            }
          } yield ()

        check.replicateA_(10000)
      }
      .as(ExitCode.Success)
}

Output

java.lang.RuntimeException: Element was lost!
	at QueueTakeTimeoutCancelTest1$$anonfun$$nestedInanonfun$run$4$1.applyOrElse(QueueTakeTimeoutCancelTest1.scala:29)
	at QueueTakeTimeoutCancelTest1$$anonfun$$nestedInanonfun$run$4$1.applyOrElse(QueueTakeTimeoutCancelTest1.scala:29)
	at scala.PartialFunction$AndThen.applyOrElse(PartialFunction.scala:299)
	at timeout @ QueueTakeTimeoutCancelTest1$.$anonfun$run$2(QueueTakeTimeoutCancelTest1.scala:21)
	at main$ @ QueueTakeTimeoutCancelTest1$.main(QueueTakeTimeoutCancelTest1.scala:7)
	at main$ @ QueueTakeTimeoutCancelTest1$.main(QueueTakeTimeoutCancelTest1.scala:7)

More realistic example

This example uses explicit cancelation (instead of nested timeouts) of a consumer fiber that's calling queue.take.timeout(...).

This mimics my actual use case, namely a consumer that:

  • blocks for a limited time when attempting to dequeue an element
  • never loses a dequeued element
  • can itself be canceled from outside, incl. when waiting on the queue
import cats.effect.kernel.{Deferred, Outcome}
import cats.effect.std.Queue
import cats.effect.{ExitCode, IO, IOApp}

import scala.concurrent.TimeoutException
import scala.concurrent.duration.DurationInt

object QueueTakeTimeoutCancelTest2 extends IOApp {

  override def run(args: List[String]): IO[ExitCode] =
    Queue
      .unbounded[IO, Int]
      .flatMap { queue =>
        val check: IO[Unit] =
          for {
            sink <- Deferred[IO, Int]

            // Goal:
            //  - Make sure that whenever an elem is dequeued, it is handled (at least via release).
            //  - Consumer must be cancelable, incl. when in `.take.timeout(...)`.
            consumer =
              IO.bracketFull(acquire = poll =>
                poll(queue.take.timeout(1.day)) // for the sake of this test, wait effectively forever
              )(
                use = elem => sink.complete(elem)
              )(
                release = (elem, _) => sink.complete(-elem).void
              )

            fib <- consumer.start

            // produce an element
            _ <- queue.offer(42)

            // and quickly cancel the consumer
            _ <- fib.cancel

            // inspect the outcome
            oc <- fib.join
            _ <- oc match {
              case Outcome.Succeeded(_) =>
                // succeeded to dequeue before cancel, check the value
                sink.tryGet.map(x => assert(x == Some(42)))
              case Outcome.Canceled() =>
                // Consumer was canceled.
                sink.tryGet.flatMap {
                  case Some(i) =>
                    // Consumer managed to dequeue the elem and it was not lost.
                    // sanity-check the value
                    IO.raiseWhen(i != 42 && i != -42) { new AssertionError(s"Unexpected value $i") }
                  case None =>
                    // Assuming consumer does not lose elements, the element must not have been dequeued.
                    // 1 second should be plenty of time to dequeue it (TimeoutException means element was lost)
                    queue.take.timeout(1.second)
                      .adaptError { case _: TimeoutException => new RuntimeException("Element was lost!") }
                      .void
                }
              case Outcome.Errored(e) =>
                IO.raiseError(new AssertionError("should never happen", e))
            }
          } yield ()

        check.replicateA_(10000)
      }
      .as(ExitCode.Success)
}

Output

java.lang.RuntimeException: Element was lost!
	at QueueTakeTimeoutCancelTest2$$anonfun$$nestedInanonfun$run$11$1.applyOrElse(QueueTakeTimeoutCancelTest2.scala:55)
	at QueueTakeTimeoutCancelTest2$$anonfun$$nestedInanonfun$run$11$1.applyOrElse(QueueTakeTimeoutCancelTest2.scala:55)
	at scala.PartialFunction$AndThen.applyOrElse(PartialFunction.scala:299)
	at timeout @ QueueTakeTimeoutCancelTest2$.$anonfun$run$3(QueueTakeTimeoutCancelTest2.scala:23)
	at main$ @ QueueTakeTimeoutCancelTest2$.main(QueueTakeTimeoutCancelTest2.scala:8)
	at main$ @ QueueTakeTimeoutCancelTest2$.main(QueueTakeTimeoutCancelTest2.scala:8)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions