-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSBReceiver.cs
More file actions
111 lines (99 loc) · 3.53 KB
/
SBReceiver.cs
File metadata and controls
111 lines (99 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
using Azure.Messaging.ServiceBus;
namespace ServiceBusPeekMessagesBug;
internal class SBReceiver : IDisposable
{
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly HashSet<string> _processedMessages;
private readonly string _serviceBusConnectionString;
private readonly string _subscriptionName;
private readonly PeriodicTimer _timer;
private readonly Task _timerTask;
private readonly string _topicName;
private readonly bool _withFix;
private ServiceBusClient _client;
private ServiceBusReceiver _serviceBusReceiver;
public SBReceiver(string serviceBusConnectionString, string topicName, string subscriptionName, bool withFix)
{
_serviceBusConnectionString = serviceBusConnectionString;
_topicName = topicName;
_processedMessages = new HashSet<string>();
_cancellationTokenSource = new CancellationTokenSource();
_subscriptionName = subscriptionName;
_withFix = withFix;
_timer = new PeriodicTimer(TimeSpan.FromMilliseconds(1000));
_timerTask = HandleTimerAsync();
}
public void Dispose()
{
_cancellationTokenSource?.Cancel();
_timerTask?.Wait();
_timerTask?.Dispose();
_timer?.Dispose();
_cancellationTokenSource?.Dispose();
}
private async Task HandleTimerAsync()
{
try
{
while (await _timer.WaitForNextTickAsync(_cancellationTokenSource.Token))
try
{
await GetMessages();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
catch (Exception exc)
{
Console.WriteLine(exc.ToString());
}
}
private async Task GetMessages()
{
if (_client?.IsClosed != false) _client = new ServiceBusClient(_serviceBusConnectionString);
if (_serviceBusReceiver?.IsClosed != false)
{
ServiceBusReceiverOptions options = new()
{
ReceiveMode = ServiceBusReceiveMode.PeekLock,
SubQueue = SubQueue.None,
PrefetchCount = 110
};
_serviceBusReceiver = _client.CreateReceiver(_topicName, _subscriptionName, options);
}
try
{
while (true)
{
var messages =
await _serviceBusReceiver.PeekMessagesAsync(500);
if (messages == null || messages.Count == 0)
{
if (_withFix)
{
await _serviceBusReceiver.CloseAsync();
_serviceBusReceiver = null;
}
break;
}
foreach (var serviceBusReceivedMessage in messages)
{
if (_processedMessages.Contains(serviceBusReceivedMessage.MessageId))
{
Console.WriteLine(
$"\t\tReceived already received message, ID: {serviceBusReceivedMessage.MessageId}");
continue;
}
_processedMessages.Add(serviceBusReceivedMessage.MessageId);
Console.WriteLine($"Received message with ID: {serviceBusReceivedMessage.MessageId}, total of {_processedMessages.Count}");
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
}