-
Notifications
You must be signed in to change notification settings - Fork 107
Expand file tree
/
Copy pathUnixStreamSocketTest.java
More file actions
199 lines (166 loc) · 6.93 KB
/
UnixStreamSocketTest.java
File metadata and controls
199 lines (166 loc) · 6.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package com.timgroup.statsd;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.io.File;
import java.nio.file.Files;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
public class UnixStreamSocketTest implements StatsDClientErrorHandler {
private static File tmpFolder;
private static NonBlockingStatsDClient client;
private static NonBlockingStatsDClient clientAggregate;
private static DummyStatsDServer server;
private static File socketFile;
private volatile Exception lastException = new Exception();
private static Logger log = Logger.getLogger(StatsDClientErrorHandler.class.getName());
public synchronized void handle(Exception exception) {
log.info("Got exception: " + exception);
lastException = exception;
}
synchronized boolean lastExceptionMessageContains(String s) {
String msg = lastException.getMessage();
return msg != null && msg.contains(s);
}
@BeforeClass
public static void supportedOnly() throws IOException {
Assume.assumeTrue(TestHelpers.isUdsAvailable());
}
@Before
public void start() throws IOException {
tmpFolder = Files.createTempDirectory(System.getProperty("java-dsd-test")).toFile();
tmpFolder.deleteOnExit();
socketFile = new File(tmpFolder, "socket.sock");
socketFile.deleteOnExit();
server = new UnixStreamSocketDummyStatsDServer(socketFile.toString());
client = new NonBlockingStatsDClientBuilder().prefix("my.prefix")
.address("unixstream://" + socketFile.getPath())
.port(0)
.queueSize(1)
.timeout(500) // non-zero timeout to ensure exception triggered if socket buffer full.
.connectionTimeout(500)
.socketBufferSize(1024 * 1024)
.enableAggregation(false)
.errorHandler(this)
.originDetectionEnabled(false)
.build();
clientAggregate = new NonBlockingStatsDClientBuilder().prefix("my.prefix")
.address("unixstream://" + socketFile.getPath())
.port(0)
.queueSize(1)
.timeout(500) // non-zero timeout to ensure exception triggered if socket buffer full.
.connectionTimeout(500)
.socketBufferSize(1024 * 1024)
.enableAggregation(false)
.errorHandler(this)
.originDetectionEnabled(false)
.build();
}
@After
public void stop() throws Exception {
client.stop();
clientAggregate.stop();
server.close();
}
@Test
public void assert_default_uds_size() throws Exception {
assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES);
}
@Test(timeout = 5000L)
public void sends_to_statsd() throws Exception {
for(long i = 0; i < 5 ; i++) {
client.gauge("mycount", i);
server.waitForMessage();
String expected = String.format("my.prefix.mycount:%d|g", i);
assertThat(server.messagesReceived(), contains(expected));
server.clear();
}
assertThat(lastException.getMessage(), nullValue());
}
@Test(timeout = 10000L)
public void resist_dsd_restart() throws Exception {
// Send one metric, check that it works.
client.gauge("mycount", 10);
server.waitForMessage();
assertThat(server.messagesReceived(), contains("my.prefix.mycount:10|g"));
server.clear();
assertThat(lastException.getMessage(), nullValue());
// Close the server, client should throw an IOException
server.close();
while(!lastExceptionMessageContains("Connection refused")) {
client.gauge("mycount", 20);
Thread.sleep(10);
}
// Delete the socket file, client should throw an IOException
lastException = new Exception();
socketFile.delete();
client.gauge("mycount", 21);
while(!lastExceptionMessageContains("No such file or directory")) {
Thread.sleep(10);
}
// Re-open the server, next send should work OK
DummyStatsDServer server2;
server2 = new UnixStreamSocketDummyStatsDServer(socketFile.toString());
lastException = new Exception();
client.gauge("mycount", 30);
server2.waitForMessage();
assertThat(server2.messagesReceived(), hasItem("my.prefix.mycount:30|g"));
server2.clear();
assertThat(lastException.getMessage(), nullValue());
server2.close();
}
@Test(timeout = 10000L)
public void resist_dsd_timeout() throws Exception {
client.gauge("mycount", 10);
server.waitForMessage();
assertThat(server.messagesReceived(), contains("my.prefix.mycount:10|g"));
server.clear();
assertThat(lastException.getMessage(), nullValue());
// Freeze the server to simulate dsd being overwhelmed
server.freeze();
while (lastException.getMessage() == null) {
client.gauge("mycount", 20);
}
String excMessage = "Write timed out";
assertThat(lastException.getMessage(), containsString(excMessage));
// Make sure we recover after we resume listening
server.clear();
server.unfreeze();
// Now make sure we can receive gauges with 30
while (!server.messagesReceived().contains("my.prefix.mycount:30|g")) {
server.clear();
client.gauge("mycount", 30);
server.waitForMessage();
}
assertThat(server.messagesReceived(), hasItem("my.prefix.mycount:30|g"));
server.clear();
}
@Test(timeout = 5000L)
public void stream_uds_has_uds_buffer_size() throws Exception {
final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder()
.prefix("my.prefix")
.address("unixstream:///foo")
.containerID("fake-container-id")
.build();
assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES);
}
@Test(timeout = 5000L)
public void max_packet_size_override() throws Exception {
final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder()
.prefix("my.prefix")
.address("unixstream:///foo")
.containerID("fake-container-id")
.maxPacketSizeBytes(576)
.build();
assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), 576);
}
}