diff --git a/core/src/main/java/io/grpc/util/MirroringInterceptor.java b/core/src/main/java/io/grpc/util/MirroringInterceptor.java new file mode 100644 index 00000000000..851af694c5d --- /dev/null +++ b/core/src/main/java/io/grpc/util/MirroringInterceptor.java @@ -0,0 +1,116 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import com.google.common.base.Preconditions; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import java.util.concurrent.Executor; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A ClientInterceptor that mirrors calls to a shadow channel. + * Designed to support Unary, Client-Streaming, Server-Streaming, and Bidi calls. + */ +public final class MirroringInterceptor implements ClientInterceptor { + private static final Logger logger = Logger.getLogger(MirroringInterceptor.class.getName()); + + private final Channel mirrorChannel; + private final Executor executor; + + public MirroringInterceptor(Channel mirrorChannel, Executor executor) { + this.mirrorChannel = Preconditions.checkNotNull(mirrorChannel, "mirrorChannel"); + this.executor = Preconditions.checkNotNull(executor, "executor"); + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + + private ClientCall mirrorCall; + + @Override + public void start(Listener responseListener, Metadata headers) { + // 1. Capture and copy headers immediately (thread-safe for the executor) + final Metadata mirrorHeaders = new Metadata(); + mirrorHeaders.merge(headers); + + executor.execute(() -> { + try { + // 2. Initialize the shadow call once per stream + mirrorCall = mirrorChannel.newCall(method, callOptions); + mirrorCall.start(new ClientCall.Listener() {}, mirrorHeaders); + } catch (Exception e) { + logger.log(Level.WARNING, "Failed to start mirror call", e); + } + }); + super.start(responseListener, headers); + } + + @Override + public void sendMessage(ReqT message) { + executor.execute(() -> { + if (mirrorCall != null) { + try { + mirrorCall.sendMessage(message); + } catch (Exception e) { + logger.log(Level.WARNING, "Mirroring message failed", e); + } + } + }); + super.sendMessage(message); + } + + @Override + public void halfClose() { + executor.execute(() -> { + if (mirrorCall != null) { + try { + mirrorCall.halfClose(); + } catch (Exception e) { + logger.log(Level.WARNING, "Mirroring halfClose failed", e); + } + } + }); + super.halfClose(); + } + + @Override + public void cancel(String message, Throwable cause) { + executor.execute(() -> { + if (mirrorCall != null) { + try { + mirrorCall.cancel(message, cause); + } catch (Exception e) { + logger.log(Level.WARNING, "Mirroring cancel failed", e); + } + } + }); + super.cancel(message, cause); + } + }; + } +} \ No newline at end of file diff --git a/inprocess/src/test/java/io/grpc/inprocess/MirroringInterceptorTest.java b/inprocess/src/test/java/io/grpc/inprocess/MirroringInterceptorTest.java new file mode 100644 index 00000000000..d81a5aa6904 --- /dev/null +++ b/inprocess/src/test/java/io/grpc/inprocess/MirroringInterceptorTest.java @@ -0,0 +1,140 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.inprocess; + +import static org.junit.Assert.assertTrue; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.ServerCall; +import io.grpc.ServerServiceDefinition; +import io.grpc.Status; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.util.MirroringInterceptor; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Rule; +import org.junit.Test; + +public class MirroringInterceptorTest { + @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + private static final MethodDescriptor.Marshaller MARSHALLER = + new MethodDescriptor.Marshaller() { + @Override + public java.io.InputStream stream(String value) { + return new java.io.ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public String parse(java.io.InputStream stream) { + return "response"; + } + }; + + private final MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("test/Method") + .setRequestMarshaller(MARSHALLER) + .setResponseMarshaller(MARSHALLER) + .build(); + + @Test + public void unaryCallIsMirroredWithHeaders() throws Exception { + CountDownLatch mirrorLatch = new CountDownLatch(1); + Metadata.Key testKey = + Metadata.Key.of("test-header", Metadata.ASCII_STRING_MARSHALLER); + AtomicBoolean mirrorHeaderVerified = new AtomicBoolean(false); + + // 1. Setup Mirror Server - IMPORTANT: It must CLOSE the call + String mirrorName = InProcessServerBuilder.generateName(); + grpcCleanup.register( + InProcessServerBuilder.forName(mirrorName) + .directExecutor() + .addService( + ServerServiceDefinition.builder("test") + .addMethod( + method, + (call, headers) -> { + if ("shadow-value".equals(headers.get(testKey))) { + mirrorHeaderVerified.set(true); + } + mirrorLatch.countDown(); + + // CRITICAL: Close the call so the channel can shut down + call.sendHeaders(new Metadata()); + call.close(Status.OK, new Metadata()); + return new ServerCall.Listener() {}; + }) + .build()) + .build() + .start()); + + // 2. Setup Primary Server - Also must CLOSE the call + String primaryName = InProcessServerBuilder.generateName(); + grpcCleanup.register( + InProcessServerBuilder.forName(primaryName) + .directExecutor() + .addService( + ServerServiceDefinition.builder("test") + .addMethod( + method, + (call, headers) -> { + call.sendHeaders(new Metadata()); + call.close(Status.OK, new Metadata()); + return new ServerCall.Listener() {}; + }) + .build()) + .build() + .start()); + + ManagedChannel mirrorChannel = + grpcCleanup.register(InProcessChannelBuilder.forName(mirrorName).build()); + ManagedChannel primaryChannel = + grpcCleanup.register(InProcessChannelBuilder.forName(primaryName).build()); + + // Use direct executor to keep the mirror call on the same thread + java.util.concurrent.Executor directExecutor = Runnable::run; + + Channel interceptedChannel = + ClientInterceptors.intercept( + primaryChannel, new MirroringInterceptor(mirrorChannel, directExecutor)); + + // 3. Trigger call with Metadata + Metadata headers = new Metadata(); + headers.put(testKey, "shadow-value"); + + ClientCall call = interceptedChannel.newCall(method, CallOptions.DEFAULT); + call.start(new ClientCall.Listener() {}, headers); + call.sendMessage("hello"); + call.halfClose(); + + // 4. Assertions + assertTrue("Mirror server was not reached", mirrorLatch.await(1, TimeUnit.SECONDS)); + assertTrue( + "Headers were not correctly mirrored to shadow service", mirrorHeaderVerified.get()); + System.out.println("FULL MIRRORING SUCCESSFUL!"); + } +} \ No newline at end of file