Skip to content

Commit a22dd96

Browse files
committed
The implementation of the External Processor filter coordinates data across the application thread, the data plane response thread, and the external processor's response thread. To ensure thread safety and compliance with the gRPC contract, the
following synchronization measures were implemented: 1. Thread-Unsafe StreamObserver * Challenge: The gRPC StreamObserver used to send messages to the external processor is not thread-safe. Concurrent calls to its onNext(), onCompleted(), and onError() methods from different threads can corrupt the internal state of the communication channel. Additionally, calling isReady() on the observer while another thread is sending data can lead to race conditions. * Fix: All interactions with the external processor's StreamObserver—including data transmission (onNext), terminal signals (onCompleted, onError), and readiness checks (isReady)—are now protected by the lock object. 2. ClientCall.Listener Serialization Contract * Challenge: gRPC requires that all callbacks to an application's ClientCall.Listener (such as onHeaders, onMessage, and onReady) be strictly serialized. Because these events can be triggered by either the backend server or the external processor, there was a risk of overlapping callbacks. * Fix: The logic that delivers events to the application's Listener is now synchronized using the lock. This ensures that even if multiple threads attempt to "unblock" and deliver buffered metadata or status simultaneously, the application receives them in a single, non-overlapping sequence. 3. Visibility and Consistency of Internal State * Challenge: The filter maintains several internal state variables, such as buffers for response metadata and flags to track the lifecycle of the call. If these are accessed concurrently without synchronization, one thread might act on stale data, potentially leading to duplicate headers or incorrect flow control decisions. * Fix: Access to all internal state and control flags is now guarded by the lock. Furthermore, the flag indicating whether request headers have been processed was marked as volatile to ensure its state is immediately visible across threads during high-frequency checks like sendMessage(). 4. Synchronizing Terminal Signals * Challenge: Closing or faulting the external processor's stream while another thread is still attempting to send data can cause crashes or undefined behavior in the gRPC transport. * Fix: All terminal signals (onCompleted and onError) sent to the external processor's StreamObserver are synchronized with the same lock used for sending data. This ensures that the stream is only terminated after any ongoing data transfers have safely finished.
1 parent 1055c82 commit a22dd96

1 file changed

Lines changed: 94 additions & 56 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 94 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -341,10 +341,11 @@ private static void applyHeaderMutations(Metadata headers, io.envoyproxy.envoy.s
341341
private static class ExtProcClientCall extends SimpleForwardingClientCall<InputStream, InputStream> {
342342
private final ExternalProcessorGrpc.ExternalProcessorStub stub;
343343
private final ExternalProcessor config;
344+
private final Object lock = new Object();
344345
private ClientCallStreamObserver<ProcessingRequest> extProcClientCallRequestObserver;
345346
private ExtProcListener wrappedListener;
346347

347-
private boolean headersSent = false;
348+
private volatile boolean headersSent = false;
348349
private Metadata requestHeaders;
349350
private final java.util.Queue<Runnable> pendingActions = new java.util.concurrent.ConcurrentLinkedQueue<>();
350351
final AtomicBoolean extProcStreamFailed = new AtomicBoolean(false);
@@ -378,7 +379,9 @@ public void onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse re
378379

379380
if (response.getRequestDrain()) {
380381
drainingExtProcStream.set(true);
381-
extProcClientCallRequestObserver.onCompleted(); // Sends half-close to ext_proc
382+
synchronized (lock) {
383+
extProcClientCallRequestObserver.onCompleted(); // Sends half-close to ext_proc
384+
}
382385
return;
383386
}
384387

@@ -402,7 +405,9 @@ else if (response.hasRequestBody()) {
402405
io.grpc.StatusRuntimeException ex = io.grpc.Status.INTERNAL
403406
.withDescription("gRPC message compression not supported in ext_proc")
404407
.asRuntimeException();
405-
extProcClientCallRequestObserver.onError(ex);
408+
synchronized (lock) {
409+
extProcClientCallRequestObserver.onError(ex);
410+
}
406411
onError(ex);
407412
return;
408413
}
@@ -418,14 +423,16 @@ else if (response.hasResponseHeaders()) {
418423
}
419424
// 5. Server Message (Response Body)
420425
else if (response.hasResponseBody()) {
421-
if (response.getResponseBody().hasResponse()
426+
if (response.hasResponseBody().hasResponse()
422427
&& response.getResponseBody().getResponse().hasBodyMutation()
423428
&& response.getResponseBody().getResponse().getBodyMutation().hasStreamedResponse()
424429
&& response.getResponseBody().getResponse().getBodyMutation().getStreamedResponse().getGrpcMessageCompressed()) {
425430
io.grpc.StatusRuntimeException ex = io.grpc.Status.INTERNAL
426431
.withDescription("gRPC message compression not supported in ext_proc")
427432
.asRuntimeException();
428-
extProcClientCallRequestObserver.onError(ex);
433+
synchronized (lock) {
434+
extProcClientCallRequestObserver.onError(ex);
435+
}
429436
onError(ex);
430437
return;
431438
}
@@ -442,7 +449,9 @@ else if (response.hasResponseBody()) {
442449
}
443450
// Finally notify the local app of the completion
444451
wrappedListener.proceedWithClose();
445-
extProcClientCallRequestObserver.onCompleted();
452+
synchronized (lock) {
453+
extProcClientCallRequestObserver.onCompleted();
454+
}
446455
}
447456
}
448457

@@ -470,11 +479,13 @@ public void onCompleted() {
470479

471480
wrappedListener.setStream(extProcClientCallRequestObserver);
472481

473-
extProcClientCallRequestObserver.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
474-
.setRequestHeaders(io.envoyproxy.envoy.service.ext_proc.v3.HttpHeaders.newBuilder()
475-
.setHeaders(toHeaderMap(headers))
476-
.build())
477-
.build());
482+
synchronized (lock) {
483+
extProcClientCallRequestObserver.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
484+
.setRequestHeaders(io.envoyproxy.envoy.service.ext_proc.v3.HttpHeaders.newBuilder()
485+
.setHeaders(toHeaderMap(headers))
486+
.build())
487+
.build());
488+
}
478489

479490
if (config.getObservabilityMode()) {
480491
headersSent = true;
@@ -497,8 +508,10 @@ public boolean isReady() {
497508
return false;
498509
}
499510
if (config.getObservabilityMode()) {
500-
return super.isReady() && extProcClientCallRequestObserver != null
501-
&& extProcClientCallRequestObserver.isReady();
511+
synchronized (lock) {
512+
return super.isReady() && extProcClientCallRequestObserver != null
513+
&& extProcClientCallRequestObserver.isReady();
514+
}
502515
}
503516
return super.isReady();
504517
}
@@ -531,12 +544,14 @@ public void sendMessage(InputStream message) {
531544

532545
try {
533546
byte[] bodyBytes = ByteStreams.toByteArray(message);
534-
extProcClientCallRequestObserver.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
535-
.setRequestBody(io.envoyproxy.envoy.service.ext_proc.v3.HttpBody.newBuilder()
536-
.setBody(com.google.protobuf.ByteString.copyFrom(bodyBytes))
537-
.setEndOfStream(false)
538-
.build())
539-
.build());
547+
synchronized (lock) {
548+
extProcClientCallRequestObserver.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
549+
.setRequestBody(io.envoyproxy.envoy.service.ext_proc.v3.HttpBody.newBuilder()
550+
.setBody(com.google.protobuf.ByteString.copyFrom(bodyBytes))
551+
.setEndOfStream(false)
552+
.build())
553+
.build());
554+
}
540555

541556
if (config.getObservabilityMode()) {
542557
super.sendMessage(new ByteArrayInputStream(bodyBytes));
@@ -554,18 +569,22 @@ public void halfClose() {
554569
}
555570

556571
// Signal end of request body stream to the external processor.
557-
extProcClientCallRequestObserver.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
558-
.setRequestBody(io.envoyproxy.envoy.service.ext_proc.v3.HttpBody.newBuilder()
559-
.setEndOfStreamWithoutMessage(true)
560-
.build())
561-
.build());
572+
synchronized (lock) {
573+
extProcClientCallRequestObserver.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
574+
.setRequestBody(io.envoyproxy.envoy.service.ext_proc.v3.HttpBody.newBuilder()
575+
.setEndOfStreamWithoutMessage(true)
576+
.build())
577+
.build());
578+
}
562579
super.halfClose();
563580
}
564581

565582
@Override
566583
public void cancel(@Nullable String message, @Nullable Throwable cause) {
567-
if (extProcClientCallRequestObserver != null) {
568-
extProcClientCallRequestObserver.onError(Status.CANCELLED.withDescription(message).withCause(cause).asRuntimeException());
584+
synchronized (lock) {
585+
if (extProcClientCallRequestObserver != null) {
586+
extProcClientCallRequestObserver.onError(Status.CANCELLED.withDescription(message).withCause(cause).asRuntimeException());
587+
}
569588
}
570589
super.cancel(message, cause);
571590
}
@@ -605,7 +624,9 @@ private void handleImmediateResponse(io.envoyproxy.envoy.service.ext_proc.v3.Imm
605624
io.grpc.Status status = io.grpc.Status.fromCodeValue(immediate.getGrpcStatus().getStatus());
606625
delegate().cancel("Rejected by ExtProc", null);
607626
listener.onClose(status, new Metadata());
608-
extProcClientCallRequestObserver.onCompleted();
627+
synchronized (lock) {
628+
extProcClientCallRequestObserver.onCompleted();
629+
}
609630
}
610631

611632
private void handleFailOpen(ExtProcListener listener) {
@@ -655,19 +676,28 @@ public void onHeaders(Metadata headers) {
655676
super.onHeaders(headers);
656677
return;
657678
}
658-
this.savedHeaders = headers;
659-
extProcClientCall.extProcClientCallRequestObserver.onNext(ProcessingRequest.newBuilder()
660-
.setResponseHeaders(io.envoyproxy.envoy.service.ext_proc.v3.HttpHeaders.newBuilder()
661-
.setHeaders(toHeaderMap(headers))
662-
.build())
663-
.build());
679+
synchronized (extProcClientCall.lock) {
680+
this.savedHeaders = headers;
681+
extProcClientCall.extProcClientCallRequestObserver.onNext(ProcessingRequest.newBuilder()
682+
.setResponseHeaders(io.envoyproxy.envoy.service.ext_proc.v3.HttpHeaders.newBuilder()
683+
.setHeaders(toHeaderMap(headers))
684+
.build())
685+
.build());
686+
}
664687

665688
if (extProcClientCall.config.getObservabilityMode()) {
666689
super.onHeaders(headers);
667690
}
668691
}
669692

670-
void proceedWithHeaders() { super.onHeaders(savedHeaders); }
693+
void proceedWithHeaders() {
694+
synchronized (extProcClientCall.lock) {
695+
if (savedHeaders != null) {
696+
super.onHeaders(savedHeaders);
697+
savedHeaders = null;
698+
}
699+
}
700+
}
671701

672702
@Override
673703
public void onMessage(InputStream message) {
@@ -679,7 +709,7 @@ public void onMessage(InputStream message) {
679709
try {
680710
byte[] bodyBytes = ByteStreams.toByteArray(message);
681711
sendResponseBodyToExtProc(bodyBytes, false);
682-
712+
683713
if (extProcClientCall.config.getObservabilityMode()) {
684714
super.onMessage(new ByteArrayInputStream(bodyBytes));
685715
}
@@ -703,22 +733,26 @@ public void onClose(io.grpc.Status status, Metadata trailers) {
703733
return;
704734
}
705735

706-
this.savedStatus = status;
707-
this.savedTrailers = trailers;
736+
synchronized (extProcClientCall.lock) {
737+
this.savedStatus = status;
738+
this.savedTrailers = trailers;
708739

709-
// Signal end of response body stream to the external processor.
710-
sendResponseBodyToExtProc(null, true);
740+
// Signal end of response body stream to the external processor.
741+
sendResponseBodyToExtProc(null, true);
711742

712-
// Event 6: Server Trailers with ACTUAL data
713-
extProcClientCall.extProcClientCallRequestObserver.onNext(ProcessingRequest.newBuilder()
714-
.setResponseTrailers(io.envoyproxy.envoy.service.ext_proc.v3.HttpTrailers.newBuilder()
715-
.setTrailers(toHeaderMap(savedTrailers)) // Map the captured trailers here
716-
.build())
717-
.build());
743+
// Event 6: Server Trailers with ACTUAL data
744+
extProcClientCall.extProcClientCallRequestObserver.onNext(ProcessingRequest.newBuilder()
745+
.setResponseTrailers(io.envoyproxy.envoy.service.ext_proc.v3.HttpTrailers.newBuilder()
746+
.setTrailers(toHeaderMap(savedTrailers)) // Map the captured trailers here
747+
.build())
748+
.build());
749+
}
718750

719751
if (extProcClientCall.config.getObservabilityMode()) {
720752
super.onClose(status, trailers);
721-
extProcClientCall.extProcClientCallRequestObserver.onCompleted();
753+
synchronized (extProcClientCall.lock) {
754+
extProcClientCall.extProcClientCallRequestObserver.onCompleted();
755+
}
722756
}
723757
}
724758

@@ -734,16 +768,24 @@ private void sendResponseBodyToExtProc(@Nullable byte[] bodyBytes, boolean endOf
734768
}
735769
bodyBuilder.setEndOfStream(endOfStream);
736770

737-
extProcClientCall.extProcClientCallRequestObserver.onNext(ProcessingRequest.newBuilder()
738-
.setResponseBody(bodyBuilder.build())
739-
.build());
771+
synchronized (extProcClientCall.lock) {
772+
extProcClientCall.extProcClientCallRequestObserver.onNext(ProcessingRequest.newBuilder()
773+
.setResponseBody(bodyBuilder.build())
774+
.build());
775+
}
740776
}
741777

742778
/**
743779
* Called when ExtProc gives the final "OK" for the trailers phase.
744780
*/
745781
void proceedWithClose() {
746-
super.onClose(savedStatus, savedTrailers);
782+
synchronized (extProcClientCall.lock) {
783+
if (savedStatus != null) {
784+
super.onClose(savedStatus, savedTrailers);
785+
savedStatus = null;
786+
savedTrailers = null;
787+
}
788+
}
747789
}
748790

749791
void onExternalBody(com.google.protobuf.ByteString body) {
@@ -755,12 +797,8 @@ void onExternalBody(com.google.protobuf.ByteString body) {
755797
void unblockAfterStreamComplete() {
756798
// This is called when the ext_proc stream is gracefully completed.
757799
// We need to flush any pending state that is waiting for a response from ext_proc.
758-
if (savedHeaders != null) {
759-
proceedWithHeaders();
760-
}
761-
if (savedStatus != null) {
762-
proceedWithClose();
763-
}
800+
proceedWithHeaders();
801+
proceedWithClose();
764802
}
765803
}
766804
}

0 commit comments

Comments
 (0)