forked from luncliff/coroutine
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathc2_socket_echo_udp.cpp
More file actions
176 lines (145 loc) · 5.69 KB
/
c2_socket_echo_udp.cpp
File metadata and controls
176 lines (145 loc) · 5.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
//
// Author : github.com/luncliff (luncliff@gmail.com)
// License : CC BY 4.0
//
#include <catch2/catch.hpp>
#include <coroutine/concrt.h>
#include <coroutine/net.h>
#include <coroutine/return.h>
#include <gsl/gsl>
#include "./socket_test.h"
using namespace std;
using namespace gsl;
using namespace std::chrono_literals;
using namespace coro;
using wait_group = concrt::latch;
auto coro_recv_dgram(int64_t sd, sockaddr_in& remote, int64_t& rsz,
wait_group& wg) -> no_return;
auto coro_send_dgram(int64_t sd, const sockaddr_in& remote, int64_t& ssz,
wait_group& wg) -> no_return;
auto echo_incoming_datagram(int64_t sd) -> no_return;
TEST_CASE("socket udp echo test", "[network][socket]") {
load_network_api();
constexpr auto test_service_port = 32771;
addrinfo hint{};
hint.ai_family = AF_INET; // test with ipv4
hint.ai_socktype = SOCK_DGRAM;
hint.ai_protocol = IPPROTO_UDP;
// service socket
int64_t ss = socket_create(hint);
auto d1 = gsl::finally([ss]() { socket_close(ss); });
socket_set_option_reuse_address(ss);
socket_set_option_nonblock(ss);
endpoint_t ep{};
ep.in6.sin6_family = hint.ai_family; // -- ipv6 --
ep.in4.sin_addr.s_addr = htonl(INADDR_ANY); // in6.sin6_addr <- in6addr_any
ep.in4.sin_port = htons(test_service_port); // in6.sin6_port <- htons(port)
socket_bind(ss, ep);
// start service
echo_incoming_datagram(ss);
SECTION("multiple clients") {
constexpr auto max_clients = 4;
array<int64_t, max_clients> clients{};
array<int64_t, max_clients> recv_lengths{};
array<int64_t, max_clients> send_lengths{};
array<sockaddr_in, max_clients> recv_endpoints{};
gsl::index i = 0u;
for (auto sd : socket_create(hint, clients.size())) {
clients[i++] = sd;
socket_set_option_nonblock(sd);
auto local = ep; // copy family and addr
local.in4.sin_port = 0; // system will assign port for the socket
socket_bind(sd, local); // socket must be bound
// before starting I/O operation
}
auto d2 = gsl::finally([&clients]() {
for (auto sd : clients)
socket_close(sd);
});
wait_group wg{max_clients * 2}; // wait group for coroutine sync
// each client will perform
// 1 recv and 1 send
{
// recv packets
// later echo response will be delivered to these coroutines
for (i = 0; i < max_clients; ++i)
coro_recv_dgram(clients[i], recv_endpoints[i], recv_lengths[i],
wg);
// send packets to service address
// in6.sin6_addr <- in6addr_loopback; // for ipv6
ep.in4.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
for (i = 0; i < max_clients; ++i)
coro_send_dgram(clients[i], ep.in4, send_lengths[i], wg);
if constexpr (is_winsock == false) {
// unlike windows api, we have to resume tasks manually
// the library doesn't guarantee they will be fetched at once
// so user have to repeat enough to finish all i/o tasks
auto count = 30;
while (count--)
for (auto task : wait_io_tasks(10ms))
task.resume();
}
}
wg.wait(); // ensure all coroutines are finished
// now, receive coroutines must hold same data
// sent by each client sockets
for (i = 0; i < max_clients; ++i) {
REQUIRE(send_lengths[i] == recv_lengths[i]);
bool equal = memcmp(addressof(ep.in4.sin_addr),
addressof(recv_endpoints[i].sin_addr), //
sizeof(in_addr)) == 0;
REQUIRE(equal);
}
}
// test end
}
auto coro_recv_dgram(int64_t sd, sockaddr_in& remote, int64_t& rsz,
wait_group& wg) -> no_return {
using gsl::byte;
auto d = finally([&wg]() { // ensure noti to wait_group
wg.count_down();
});
io_work_t work{};
array<byte, 1253> storage{};
rsz = co_await recv_from(sd, remote, storage, work);
if (auto errc = work.error()) {
CAPTURE(errc);
FAIL(std::system_category().message(errc));
}
REQUIRE(rsz > 0);
}
auto coro_send_dgram(int64_t sd, const sockaddr_in& remote, int64_t& ssz,
wait_group& wg) -> no_return {
using gsl::byte;
auto d = finally([&wg]() { // ensure noti to wait_group
wg.count_down();
});
io_work_t work{};
array<byte, 782> storage{};
ssz = co_await send_to(sd, remote, storage, work);
if (auto errc = work.error()) {
CAPTURE(errc);
FAIL(std::system_category().message(errc));
}
REQUIRE(ssz == storage.size());
}
auto echo_incoming_datagram(int64_t sd) -> no_return {
using gsl::byte;
io_work_t work{};
io_buffer_t buf{};
int64_t rsz = 0, ssz = 0;
sockaddr_in remote{};
array<byte, 3927> storage{};
while (true) {
rsz = co_await recv_from(sd, remote, buf = storage, work);
if (work.error())
goto OnError;
ssz = co_await send_to(sd, remote, buf = {storage.data(), rsz}, work);
if (work.error())
goto OnError;
}
co_return;
OnError:
auto em = std::system_category().message(work.error());
CAPTURE(em);
}