diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 5c3178a..064e353 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -320,6 +320,31 @@ async def acknowledge_cumulative( ) await future + async def negative_acknowledge( + self, + message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId] + ) -> None: + """ + Acknowledge the failure to process a single message asynchronously. + + When a message is "negatively acked" it will be marked for redelivery after + some fixed delay. The delay is configurable when constructing the consumer + with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}. + This call is not blocking. + + Parameters + ---------- + message: + The received message or message id. + """ + if isinstance(message, pulsar.Message): + msg = message._message + elif isinstance(message, pulsar.MessageId): + msg = message._msg_id + else: + msg = message + await asyncio.to_thread(self._consumer.negative_acknowledge, msg) + async def unsubscribe(self) -> None: """ Unsubscribe the current consumer from the topic asynchronously. diff --git a/src/consumer.cc b/src/consumer.cc index f1d7367..fa52720 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -133,6 +133,16 @@ void Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const Me consumer.acknowledgeCumulativeAsync(msgId, callback); } +void Consumer_negative_acknowledgeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) { + py::gil_scoped_release release; + consumer.negativeAcknowledge(msg); +} + +void Consumer_negative_acknowledgeAsync_message_id(Consumer& consumer, const MessageId& msgId, ResultCallback callback) { + py::gil_scoped_release release; + consumer.negativeAcknowledge(msgId); +} + void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) { py::gil_scoped_release release; consumer.closeAsync(callback); @@ -183,6 +193,8 @@ void export_consumer(py::module_& m) { .def("acknowledge_async", &Consumer_acknowledgeAsync_message_id) .def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync) .def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id) + .def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync) + .def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync_message_id) .def("close_async", &Consumer_closeAsync) .def("unsubscribe_async", &Consumer_unsubscribeAsync) .def("seek_async", &Consumer_seekAsync) diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 048dc43..66ff0fd 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -203,6 +203,36 @@ async def test_consumer_individual_acknowledge(self): msg = await consumer.receive() self.assertEqual(msg.data(), b'msg-3') + async def test_consumer_negative_acknowledge(self): + topic = f'asyncio-test-consumer-negative-ack-{time.time()}' + sub = 'sub' + consumer = await self._client.subscribe(topic, sub, + consumer_type=pulsar.ConsumerType.Shared, + negative_ack_redelivery_delay_ms=100) + + producer = await self._client.create_producer(topic) + await self._prepare_messages(producer) + msgs = [] + for _ in range(5): + msg = await consumer.receive() + msgs.append(msg) + + await consumer.acknowledge(msgs[1]) + await consumer.acknowledge(msgs[3]) + + await consumer.negative_acknowledge(msgs[0]) + await consumer.negative_acknowledge(msgs[2]) + await consumer.negative_acknowledge(msgs[4]) + await asyncio.sleep(0.2) + + received = [] + for _ in range(3): + msg = await consumer.receive() + received.append(msg.data()) + + self.assertEqual(sorted(received), [b'msg-0', b'msg-2', b'msg-4']) + await consumer.close() + async def test_multi_topic_consumer(self): topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2'] producers = []