Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions CFX/Transport/AmqpRequestProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public void PublishToSource(string sourceAddress, IEnumerable<CFXEnvelope> messa
foreach (CFXEnvelope env in messages)
{
p.MessageQueue.Enqueue(env);
p.SignalMessageAvailable();
}
}

Expand Down Expand Up @@ -267,6 +268,12 @@ protected void Fire_OnMessageReceivedFromListener(string TargetAddress, CFXEnvel

class InternalSourceProcessor : IMessageSource
{
/// <summary>
/// A semaphore for waiting for new messages.
/// This can fix the issue that GetMessageAsync leads to high CPU consumption.
/// </summary>
private SemaphoreSlim messageAvailable = new SemaphoreSlim(0);

public InternalSourceProcessor(AmqpRequestProcessor parentProcessor, string sourceAddress)
{
this.parentProcessor = parentProcessor;
Expand Down Expand Up @@ -296,30 +303,31 @@ public void DisposeMessage(ReceiveContext receiveContext, DispositionContext dis
public async Task<ReceiveContext> GetMessageAsync(ListenerLink link)
{
ReceiveContext ctx = null;

await Task.Run(() =>
await messageAvailable.WaitAsync();
try
{
try
if (MessageQueue.Count > 0)
{
if (MessageQueue.Count > 0)
Message m;
CFXEnvelope[] envs = MessageQueue.Dequeue();
if (envs != null && envs.Length > 0)
{
Message m;
CFXEnvelope[] envs = MessageQueue.Dequeue();
if (envs != null && envs.Length > 0)
{
m = AmqpUtilities.MessageFromEnvelope(envs[0], AmqpCFXEndpoint.Codec.Value, parentProcessor.Endpoint.SubjectFormat);
ctx = new ReceiveContext(link, m);
}
m = AmqpUtilities.MessageFromEnvelope(envs[0], AmqpCFXEndpoint.Codec.Value, parentProcessor.Endpoint.SubjectFormat);
ctx = new ReceiveContext(link, m);
}
}
catch (Exception ex)
{
AppLog.Error(ex);
}
});

}
catch (Exception ex)
{
AppLog.Error(ex);
}
return ctx;
}

public void SignalMessageAvailable()
{
messageAvailable.Release();
}
}

class InternalRequestProcessor : IRequestProcessor
Expand Down Expand Up @@ -489,6 +497,8 @@ public override void OnMessage(MessageContext messageContext)

public override void OnFlow(FlowContext flowContext)
{


}

public override void OnDisposition(DispositionContext dispositionContext)
Expand Down