From 3162a25e8ab15c748add2c2db008a86bbd52b5dc Mon Sep 17 00:00:00 2001 From: jdengitw Date: Wed, 19 Nov 2025 07:49:55 +0800 Subject: [PATCH] Update AmqpRequestProcessor.cs Add a semaphore for waiting for new messages to fix the issue that GetMessageAsync leads to high CPU consumption. --- CFX/Transport/AmqpRequestProcessor.cs | 44 ++++++++++++++++----------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/CFX/Transport/AmqpRequestProcessor.cs b/CFX/Transport/AmqpRequestProcessor.cs index 5960f1e4..33622e34 100644 --- a/CFX/Transport/AmqpRequestProcessor.cs +++ b/CFX/Transport/AmqpRequestProcessor.cs @@ -197,6 +197,7 @@ public void PublishToSource(string sourceAddress, IEnumerable messa foreach (CFXEnvelope env in messages) { p.MessageQueue.Enqueue(env); + p.SignalMessageAvailable(); } } @@ -267,6 +268,12 @@ protected void Fire_OnMessageReceivedFromListener(string TargetAddress, CFXEnvel class InternalSourceProcessor : IMessageSource { + /// + /// A semaphore for waiting for new messages. + /// This can fix the issue that GetMessageAsync leads to high CPU consumption. + /// + private SemaphoreSlim messageAvailable = new SemaphoreSlim(0); + public InternalSourceProcessor(AmqpRequestProcessor parentProcessor, string sourceAddress) { this.parentProcessor = parentProcessor; @@ -296,30 +303,31 @@ public void DisposeMessage(ReceiveContext receiveContext, DispositionContext dis public async Task GetMessageAsync(ListenerLink link) { ReceiveContext ctx = null; - - await Task.Run(() => + await messageAvailable.WaitAsync(); + try { - try + if (MessageQueue.Count > 0) { - if (MessageQueue.Count > 0) + Message m; + CFXEnvelope[] envs = MessageQueue.Dequeue(); + if (envs != null && envs.Length > 0) { - Message m; - CFXEnvelope[] envs = MessageQueue.Dequeue(); - if (envs != null && envs.Length > 0) - { - m = AmqpUtilities.MessageFromEnvelope(envs[0], AmqpCFXEndpoint.Codec.Value, parentProcessor.Endpoint.SubjectFormat); - ctx = new ReceiveContext(link, m); - } + m = AmqpUtilities.MessageFromEnvelope(envs[0], AmqpCFXEndpoint.Codec.Value, parentProcessor.Endpoint.SubjectFormat); + ctx = new ReceiveContext(link, m); } } - catch (Exception ex) - { - AppLog.Error(ex); - } - }); - + } + catch (Exception ex) + { + AppLog.Error(ex); + } return ctx; } + + public void SignalMessageAvailable() + { + messageAvailable.Release(); + } } class InternalRequestProcessor : IRequestProcessor @@ -489,6 +497,8 @@ public override void OnMessage(MessageContext messageContext) public override void OnFlow(FlowContext flowContext) { + + } public override void OnDisposition(DispositionContext dispositionContext)