Skip to content

Commit 59be21b

Browse files
committed
xds: synchronize ext_proc message draining during stream termination
I identified the following synchronization issues by analyzing the potential race conditions between the thread handling external processor responses (or stream completion) and the application thread calling sendMessage(). Specifically, here is how the two major race conditions occurred during state transitions: 1. The isExtProcStreamCompleted() Check Bypass Race In the previous implementation, sendMessage(InputStream message) started with: java if (passThroughMode.get() || isExtProcStreamCompleted()) { super.sendMessage(message); return; } When the external processor stream terminated (e.g., via onCompleted()), the event listener immediately called markExtProcStreamCompleted(). This updated the extProcStreamState atomic reference to COMPLETED before handleFailOpen() ran drainPendingDrainingMessages(). If an application thread called sendMessage() in that exact window, isExtProcStreamCompleted() returned true. The application thread bypassed pendingDrainingMessages and sent the message directly over the wire (super.sendMessage). Shortly after, handleFailOpen() flushed the queue, delivering previously buffered draining messages to the wire after the newer message, resulting in out-of-order (FIFO violation) message delivery. 2. The Queue Flushed / Preemption Race If an application thread entered sendMessage() while the stream was in the DRAINING state, it evaluated: java if (isExtProcStreamDraining()) { pendingDrainingMessages.add(message); return; } If the application thread was preempted right before executing pendingDrainingMessages.add(message), the external processor stream could complete in the background on another thread. E.g., handleFailOpen() would run, invoke drainPendingDrainingMessages() (which found the queue empty), set passThroughMode = true, and finish. When the application thread resumed, it added its message to pendingDrainingMessages. Because drainPendingDrainingMessages() had already executed, that message sat in the queue indefinitely and was dropped. The Solution By wrapping the state checks and queue additions in sendMessage() with synchronized (streamLock), and similarly synchronizing drainPendingDrainingMessages() (where passThroughMode = true is updated atomically with the queue flush), we eliminate both race windows and ensure 100% robust, FIFO-ordered delivery across all stream transitions.
1 parent b36e92f commit 59be21b

1 file changed

Lines changed: 18 additions & 8 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,14 +1344,21 @@ public void sendMessage(InputStream message) {
13441344
return;
13451345
}
13461346

1347-
if (passThroughMode.get() || isExtProcStreamCompleted()) {
1347+
if (passThroughMode.get()) {
13481348
super.sendMessage(message);
13491349
return;
13501350
}
13511351

1352-
if (isExtProcStreamDraining()) {
1353-
pendingDrainingMessages.add(message);
1354-
return;
1352+
synchronized (streamLock) {
1353+
if (passThroughMode.get()) {
1354+
super.sendMessage(message);
1355+
return;
1356+
}
1357+
1358+
if (isExtProcStreamDraining() || isExtProcStreamCompleted()) {
1359+
pendingDrainingMessages.add(message);
1360+
return;
1361+
}
13551362
}
13561363

13571364
if (currentProcessingMode.getRequestBodyMode() == ProcessingMode.BodySendMode.NONE) {
@@ -1495,9 +1502,12 @@ private void handleImmediateResponse(ImmediateResponse immediate, DataPlaneListe
14951502
}
14961503

14971504
private void drainPendingDrainingMessages() {
1498-
InputStream msg;
1499-
while ((msg = pendingDrainingMessages.poll()) != null) {
1500-
super.sendMessage(msg);
1505+
synchronized (streamLock) {
1506+
passThroughMode.set(true);
1507+
InputStream msg;
1508+
while ((msg = pendingDrainingMessages.poll()) != null) {
1509+
super.sendMessage(msg);
1510+
}
15011511
}
15021512
}
15031513

@@ -1722,7 +1732,7 @@ void onExternalBody(ByteString body) {
17221732

17231733
void unblockAfterStreamComplete() {
17241734
proceedWithHeaders();
1725-
dataPlaneClientCall.passThroughMode.set(true);
1735+
dataPlaneClientCall.drainPendingDrainingMessages();
17261736
proceedWithClose();
17271737
}
17281738

0 commit comments

Comments
 (0)