diff --git a/src/example.py b/src/example.py index d2f4782..9fcb768 100644 --- a/src/example.py +++ b/src/example.py @@ -16,7 +16,7 @@ topic = 'temp-nar' subscription = 'temp' some_other_sub = 'temp' -respond_topic = 'temp-response' +respond_topic = 'test-response' # respond_topic_object = core.get_topic(topic_name=respond_topic) @@ -45,10 +45,11 @@ def publish_messages(topic_name): def long_running_task(sleep_time): # Simulate a long-running task global exec_count - exec_count += 1 - if exec_count % random_exec == 0: - # throw an exception - raise Exception('Random Exception') + exec_count += 1 + # Throw exceptions at random + # if exec_count % random_exec == 0: + # # throw an exception + # raise Exception('Random Exception') time.sleep(sleep_time) sender_obj = MessageSender(topic_name=respond_topic) sender_obj.send_message({'message': 'Task Completed'}) @@ -61,12 +62,13 @@ def process(message): # long_running_thread = threading.Thread(target=long_running_task, args=(message.data['a'],)) # long_running_thread.start() # long_running_thread.join() - long_running_task(message.data['a']) + # long_running_task(message.data['a']) + long_running_task(5) # 5 seconds for each task print(f' > Message Completed: {message.data}') - topic_object = core.get_topic(topic_name=topic_name,max_concurrent_messages=2) + topic_object = core.get_topic(topic_name=topic_name,max_concurrent_messages=1) try: - topic_object.subscribe(subscription=subscription_name, callback=process) + topic_object.subscribe(subscription=subscription_name, callback=process, max_receivable_messages=1) except Exception as e: print(e) diff --git a/src/python_ms_core/core/topic/abstract/topic_abstract.py b/src/python_ms_core/core/topic/abstract/topic_abstract.py index b475b1e..3dad05b 100644 --- a/src/python_ms_core/core/topic/abstract/topic_abstract.py +++ b/src/python_ms_core/core/topic/abstract/topic_abstract.py @@ -6,7 +6,7 @@ class TopicAbstract(ABC): def __init__(self, config=None, topic_name=None): pass @abstractmethod - def subscribe(self, subscription=None, callback=None): pass + def subscribe(self, subscription=None, callback=None, max_receivable_messages=-1): pass @abstractmethod - def publish(self, data=None): pass + def publish(self, data=None): pass \ No newline at end of file diff --git a/src/python_ms_core/core/topic/azure_topic.py b/src/python_ms_core/core/topic/azure_topic.py index 5259ebe..deb25c3 100644 --- a/src/python_ms_core/core/topic/azure_topic.py +++ b/src/python_ms_core/core/topic/azure_topic.py @@ -66,7 +66,7 @@ def publish(self, data: QueueMessage): message = QueueMessage.to_dict(data) self.publisher.send_messages(ServiceBusMessage(json.dumps(message))) - def subscribe(self, subscription: str, callback): + def subscribe(self, subscription: str, callback, max_receivable_messages=-1): """ Subscribes to a subscription of the topic and processes incoming messages. @@ -75,17 +75,24 @@ def subscribe(self, subscription: str, callback): callback (function): The callback function to invoke for each message. """ self.receiver = self.client.get_subscription_receiver(topic_name=self.topic_name, subscription_name=subscription) + self.receiver.local_received_messages = 0 while True: try: to_receive = (self.max_concurrent_messages - self.internal_count) + total_messages_to_recieve_more = max_receivable_messages - self.receiver.local_received_messages + if max_receivable_messages > 0: + to_receive = min(to_receive, total_messages_to_recieve_more) if to_receive > 0: messages = self.receiver.receive_messages(max_message_count=to_receive, max_wait_time=self.wait_time_for_message) if not messages or len(messages) == 0: continue + self.receiver.local_received_messages += len(messages) for message in messages: self.lock_renewal.register(self.receiver, message, max_lock_renewal_duration=self.max_renewal_duration) execution_task = self.executor.submit(self.internal_callback, message, callback) execution_task.add_done_callback(lambda x: self.settle_message(x)) + if self.receiver.local_received_messages >= max_receivable_messages and max_receivable_messages > 0: # Break if the messages are more than max_receivable_messages + break else: time.sleep(self.wait_time_for_message) except Exception as e: diff --git a/src/python_ms_core/core/topic/local_topic.py b/src/python_ms_core/core/topic/local_topic.py index 291b18d..c2a53bd 100644 --- a/src/python_ms_core/core/topic/local_topic.py +++ b/src/python_ms_core/core/topic/local_topic.py @@ -32,7 +32,7 @@ def __init__(self, config=None, topic_name=None): self.channel.exchange_declare(exchange=self.topic, exchange_type='fanout') @ExceptionHandler.decorated - def subscribe(self, subscription=None, callback=None): + def subscribe(self, subscription=None, callback=None, max_receivable_messages=-1): if subscription is not None: if self.connection.is_open: diff --git a/src/python_ms_core/version.py b/src/python_ms_core/version.py index 29906fb..071e34f 100644 --- a/src/python_ms_core/version.py +++ b/src/python_ms_core/version.py @@ -1 +1 @@ -__version__ = '0.0.23' \ No newline at end of file +__version__ = '0.0.24' \ No newline at end of file