Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
topic = 'temp-nar'
subscription = 'temp'
some_other_sub = 'temp'
respond_topic = 'temp-response'
respond_topic = 'test-response'
# respond_topic_object = core.get_topic(topic_name=respond_topic)


Expand Down Expand Up @@ -45,10 +45,11 @@ def publish_messages(topic_name):
def long_running_task(sleep_time):
# Simulate a long-running task
global exec_count
exec_count += 1
if exec_count % random_exec == 0:
# throw an exception
raise Exception('Random Exception')
exec_count += 1
# Throw exceptions at random
# if exec_count % random_exec == 0:
# # throw an exception
# raise Exception('Random Exception')
time.sleep(sleep_time)
sender_obj = MessageSender(topic_name=respond_topic)
sender_obj.send_message({'message': 'Task Completed'})
Expand All @@ -61,12 +62,13 @@ def process(message):
# long_running_thread = threading.Thread(target=long_running_task, args=(message.data['a'],))
# long_running_thread.start()
# long_running_thread.join()
long_running_task(message.data['a'])
# long_running_task(message.data['a'])
long_running_task(5) # 5 seconds for each task
print(f' > Message Completed: {message.data}')

topic_object = core.get_topic(topic_name=topic_name,max_concurrent_messages=2)
topic_object = core.get_topic(topic_name=topic_name,max_concurrent_messages=1)
try:
topic_object.subscribe(subscription=subscription_name, callback=process)
topic_object.subscribe(subscription=subscription_name, callback=process, max_receivable_messages=1)
except Exception as e:
print(e)

Expand Down
4 changes: 2 additions & 2 deletions src/python_ms_core/core/topic/abstract/topic_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class TopicAbstract(ABC):
def __init__(self, config=None, topic_name=None): pass

@abstractmethod
def subscribe(self, subscription=None, callback=None): pass
def subscribe(self, subscription=None, callback=None, max_receivable_messages=-1): pass

@abstractmethod
def publish(self, data=None): pass
def publish(self, data=None): pass
9 changes: 8 additions & 1 deletion src/python_ms_core/core/topic/azure_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def publish(self, data: QueueMessage):
message = QueueMessage.to_dict(data)
self.publisher.send_messages(ServiceBusMessage(json.dumps(message)))

def subscribe(self, subscription: str, callback):
def subscribe(self, subscription: str, callback, max_receivable_messages=-1):

"""
Subscribes to a subscription of the topic and processes incoming messages.
Expand All @@ -75,17 +75,24 @@ def subscribe(self, subscription: str, callback):
callback (function): The callback function to invoke for each message.
"""
self.receiver = self.client.get_subscription_receiver(topic_name=self.topic_name, subscription_name=subscription)
self.receiver.local_received_messages = 0
while True:
try:
to_receive = (self.max_concurrent_messages - self.internal_count)
total_messages_to_recieve_more = max_receivable_messages - self.receiver.local_received_messages
if max_receivable_messages > 0:
to_receive = min(to_receive, total_messages_to_recieve_more)
if to_receive > 0:
messages = self.receiver.receive_messages(max_message_count=to_receive, max_wait_time=self.wait_time_for_message)
if not messages or len(messages) == 0:
continue
self.receiver.local_received_messages += len(messages)
for message in messages:
self.lock_renewal.register(self.receiver, message, max_lock_renewal_duration=self.max_renewal_duration)
execution_task = self.executor.submit(self.internal_callback, message, callback)
execution_task.add_done_callback(lambda x: self.settle_message(x))
if self.receiver.local_received_messages >= max_receivable_messages and max_receivable_messages > 0: # Break if the messages are more than max_receivable_messages
break
else:
time.sleep(self.wait_time_for_message)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion src/python_ms_core/core/topic/local_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, config=None, topic_name=None):
self.channel.exchange_declare(exchange=self.topic, exchange_type='fanout')

@ExceptionHandler.decorated
def subscribe(self, subscription=None, callback=None):
def subscribe(self, subscription=None, callback=None, max_receivable_messages=-1):

if subscription is not None:
if self.connection.is_open:
Expand Down
2 changes: 1 addition & 1 deletion src/python_ms_core/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.0.23'
__version__ = '0.0.24'
Loading