Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions pulsar/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,31 @@ async def acknowledge_cumulative(
)
await future

async def negative_acknowledge(
self,
message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId]
) -> None:
"""
Acknowledge the failure to process a single message asynchronously.
When a message is "negatively acked" it will be marked for redelivery after
some fixed delay. The delay is configurable when constructing the consumer
with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
This call is not blocking.
Parameters
----------
message:
The received message or message id.
"""
if isinstance(message, pulsar.Message):
msg = message._message
elif isinstance(message, pulsar.MessageId):
msg = message._msg_id
else:
msg = message
await asyncio.to_thread(self._consumer.negative_acknowledge, msg)

async def unsubscribe(self) -> None:
"""
Unsubscribe the current consumer from the topic asynchronously.
Expand Down
12 changes: 12 additions & 0 deletions src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ void Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const Me
consumer.acknowledgeCumulativeAsync(msgId, callback);
}

void Consumer_negative_acknowledgeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) {
py::gil_scoped_release release;
consumer.negativeAcknowledge(msg);
}

void Consumer_negative_acknowledgeAsync_message_id(Consumer& consumer, const MessageId& msgId, ResultCallback callback) {
py::gil_scoped_release release;
consumer.negativeAcknowledge(msgId);
}

void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) {
py::gil_scoped_release release;
consumer.closeAsync(callback);
Expand Down Expand Up @@ -183,6 +193,8 @@ void export_consumer(py::module_& m) {
.def("acknowledge_async", &Consumer_acknowledgeAsync_message_id)
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync)
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id)
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync)
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync_message_id)
.def("close_async", &Consumer_closeAsync)
.def("unsubscribe_async", &Consumer_unsubscribeAsync)
.def("seek_async", &Consumer_seekAsync)
Expand Down
30 changes: 30 additions & 0 deletions tests/asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,36 @@ async def test_consumer_individual_acknowledge(self):
msg = await consumer.receive()
self.assertEqual(msg.data(), b'msg-3')

async def test_consumer_negative_acknowledge(self):
topic = f'asyncio-test-consumer-negative-ack-{time.time()}'
sub = 'sub'
consumer = await self._client.subscribe(topic, sub,
consumer_type=pulsar.ConsumerType.Shared,
negative_ack_redelivery_delay_ms=100)

producer = await self._client.create_producer(topic)
await self._prepare_messages(producer)
msgs = []
for _ in range(5):
msg = await consumer.receive()
msgs.append(msg)

await consumer.acknowledge(msgs[1])
await consumer.acknowledge(msgs[3])

await consumer.negative_acknowledge(msgs[0])
await consumer.negative_acknowledge(msgs[2])
await consumer.negative_acknowledge(msgs[4])
await asyncio.sleep(0.2)

received = []
for _ in range(3):
msg = await consumer.receive()
received.append(msg.data())

self.assertEqual(sorted(received), [b'msg-0', b'msg-2', b'msg-4'])
await consumer.close()

async def test_multi_topic_consumer(self):
topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
producers = []
Expand Down
Loading