-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathWhenRetryingSameMessageMultipleTimes.cs
More file actions
128 lines (108 loc) · 4.86 KB
/
WhenRetryingSameMessageMultipleTimes.cs
File metadata and controls
128 lines (108 loc) · 4.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
namespace ServiceControl.MultiInstance.AcceptanceTests.Recoverability
{
using System;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.EndpointTemplates;
using MessageFailures;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.Settings;
using NUnit.Framework;
using ServiceControl.Infrastructure;
using TestSupport;
class WhenRetryingSameMessageMultipleTimes : WhenRetrying
{
public enum RetryType
{
NoEdit,
Edit
}
[Explicit]
[TestCase(new[] { RetryType.NoEdit, RetryType.NoEdit, RetryType.Edit })]
[TestCase(new[] { RetryType.Edit, RetryType.NoEdit, RetryType.Edit })]
[TestCase(new[] { RetryType.NoEdit, RetryType.Edit, RetryType.NoEdit })]
[TestCase(new[] { RetryType.Edit, RetryType.Edit, RetryType.NoEdit })]
public async Task WithMixOfRetryTypes(RetryType[] retryTypes)
{
CustomServiceControlPrimarySettings = s => { s.AllowMessageEditing = true; };
await Define<MyContext>()
.WithEndpoint<FailureEndpoint>(b =>
b.When(bus => bus.SendLocal(new MyMessage())).DoNotFailOnErrorMessages())
.Done(async c =>
{
if (c.RetryCount >= retryTypes.Length) // Are all retries done?
{
return !(await GetAllFailedMessage(ServiceControlInstanceName, FailedMessageStatus.Unresolved))
.HasResult; // Should return true if all failed messages are no longer unresolved
}
if (retryTypes[c.RetryCount] == RetryType.Edit)
{
var results = await GetAllFailedMessage(ServiceControlInstanceName,
FailedMessageStatus.Unresolved);
if (!results.HasResult)
{
return false; // No failed messages yet
}
var result = results.Items.Single();
c.MessageId = result.MessageId;
}
var failedMessage = await GetFailedMessage(c.UniqueMessageId, ServiceControlInstanceName, FailedMessageStatus.Unresolved);
if (!failedMessage.HasResult)
{
return false; // No failed message yet
}
if (retryTypes[c.RetryCount] == RetryType.Edit)
{
await this.Post<object>($"/api/edit/{failedMessage.Item.UniqueMessageId}",
new
{
MessageBody = $"{{ \"Name\": \"Hello {c.RetryCount}\" }}",
MessageHeaders = failedMessage.Item.ProcessingAttempts[^1].Headers
}, null,
ServiceControlInstanceName);
}
else
{
await this.Post<object>($"/api/errors/{failedMessage.Item.UniqueMessageId}/retry", null, null,
ServiceControlInstanceName);
}
c.RetryCount++;
return false;
})
.Run(TimeSpan.FromMinutes(2));
}
class FailureEndpoint : EndpointConfigurationBuilder
{
public FailureEndpoint() => EndpointSetup<DefaultServerWithoutAudit>(c => { c.NoRetries(); });
public class MyMessageHandler(MyContext testContext, IReadOnlySettings settings)
: IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
testContext.MessageId = context.MessageId.Replace(@"\", "-");
testContext.EndpointNameOfReceivingEndpoint = settings.EndpointName();
if (testContext.RetryCount < 3)
{
Console.Out.WriteLine("Throwing exception");
throw new Exception("Simulated exception");
}
Console.Out.WriteLine("Handling message");
return Task.CompletedTask;
}
}
}
class MyMessage : ICommand
{
public string Name { get; set; }
}
class MyContext : ScenarioContext
{
public string MessageId { get; set; }
public string EndpointNameOfReceivingEndpoint { get; set; }
public string UniqueMessageId => DeterministicGuid.MakeId(MessageId, EndpointNameOfReceivingEndpoint).ToString();
public int RetryCount { get; set; }
}
}
}