Skip to content

Listener onCancelled callback not being invoked in catch-up subscriptions #373

@vicopol

Description

@vicopol

🐛 Current behavior

Using the java client, once subscribed to a stream, listener's onCancelled is not invoked when the user manually stops the subscription or when onEvent throws an exception.

I was aiming at reproducing the use of the onCancelled method examples as of the online documentation (https://docs.kurrent.io/clients/java/v1.1/subscriptions.html#handling-dropped-subscriptions).

🔍 Steps to reproduce

The following test fails after 10 seconds awaiting for the last await instruction:

@Test
fun `should invoke onCancelled when manually stopping the subscription`() {
    val onEventInvoked = AtomicBoolean(false)
    val onCancelledInvoked = AtomicBoolean(false)

    val listener = object : SubscriptionListener() {
        override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
            onEventInvoked.compareAndSet(false, true)
        }

        override fun onCancelled(subscription: Subscription, exception: Throwable?) {
            onCancelledInvoked.compareAndSet(false, true)
        }
    }

    val streamName = "my-stream-1"

    val subscription = kurrentDBClient.subscribeToStream(streamName, listener).join()

    kurrentDBClient.appendToStream(
        streamName,
        EventData.builderAsJson("eventType", "{}".toByteArray()).build()
    ).join()

    subscription.stop()

    await until { onEventInvoked.get() }
    await until { onCancelledInvoked.get() }
}

The following test fails after 10 seconds awaiting for the last await instruction:

@Test
fun `should invoke onCancelled in subscription listener when onEvent fails`() {
    val onEventInvoked = AtomicBoolean(false)
    val onCancelledInvoked = AtomicBoolean(false)

    val listener = object : SubscriptionListener() {
        override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
            onEventInvoked.compareAndSet(false, true)

            throw RuntimeException("failed")
        }

        override fun onCancelled(subscription: Subscription, exception: Throwable?) {
            onCancelledInvoked.compareAndSet(false, true)
        }
    }

    val streamName = "my-stream-2"

    kurrentDBClient.subscribeToStream(streamName, listener).join()

    kurrentDBClient.appendToStream(
        streamName,
        EventData.builderAsJson("eventType", "{}".toByteArray()).build()
    ).join()

    await until { onEventInvoked.get() }
    await until { onCancelledInvoked.get() }
}

Reproducible link

github.com

💭 Expected behavior

Subscription listener onCancelled callback should be invoked when event handling fails or when manually stopping the subscription. It would be nice to have also some logging to be informed about the error.

Package version

io.kurrent:kurrentdb-client:1.1.1

KurrentDB Version

docker.kurrent.io/kurrent-lts/kurrentdb:26.0

Connection string

kurrentdb://localhost:2113?tls=false

☁️ Deployment Environment

Single-node (Docker)

Other Deployment Details

No response

Operating system

macOS 15.7.4

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions