diff --git a/grpc-gcp-java/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java b/grpc-gcp-java/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java index 5d68ee6519a1..da3a3ce3cdf3 100644 --- a/grpc-gcp-java/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java +++ b/grpc-gcp-java/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java @@ -107,6 +107,40 @@ public class GcpManagedChannel extends ManagedChannel { public static final CallOptions.Key CHANNEL_ID_KEY = CallOptions.Key.create("GcpChannelId"); + /** CallOptions key for sticky channel routing without affinity-key map state. */ + public static final CallOptions.Key CHANNEL_AFFINITY_REF_KEY = + CallOptions.Key.create("GcpChannelAffinityRef"); + + /** Opaque sticky channel reference for callers that should not depend on {@link ChannelRef}. */ + public static final class ChannelAffinityRef { + private static final int USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK = 1 << 31; + private static final int CHANNEL_ID_MASK = ~USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK; + private static final int NO_CHANNEL_ID = -1; + + // Single allocation hot-path state: + // * lower 31 bits: channel id + 1, or 0 when unset. + // * high bit: use a different active channel on the next call. + private final AtomicInteger state = new AtomicInteger(); + + /** Forces the next RPC to prefer a different active channel if one is available. */ + public void useDifferentChannelOnNextCall() { + state.getAndUpdate(value -> value | USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK); + } + + private static int channelIdFromState(int state) { + int encodedChannelId = state & CHANNEL_ID_MASK; + return encodedChannelId == 0 ? NO_CHANNEL_ID : encodedChannelId - 1; + } + + private static boolean useDifferentChannelOnNextCallFromState(int state) { + return (state & USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK) != 0; + } + + private static int stateFromChannelId(int channelId) { + return (channelId + 1) & CHANNEL_ID_MASK; + } + } + @GuardedBy("this") private Integer bindingIndex = -1; @@ -140,6 +174,7 @@ public class GcpManagedChannel extends ManagedChannel { // The channel pool. @VisibleForTesting final List channelRefs = new CopyOnWriteArrayList<>(); + private final Map channelIdToChannelRef = new ConcurrentHashMap<>(); // A set of channels that we removed from the pool and wait for their RPCs to be completed before // we can shut them down. final Set removedChannelRefs = new HashSet<>(); @@ -352,6 +387,7 @@ private synchronized void checkScaleDown() { channelRef.getChannel().shutdown(); // Remove channel from broken channels map. fallbackMap.remove(channelRef.getId()); + channelIdToChannelRef.remove(channelRef.getId()); } } @@ -372,6 +408,7 @@ private void removeOldestChannels(int num) { for (ChannelRef channelRef : channelsToRemove) { channelRef.resetAffinityCount(); + channelRef.deactivate(); if (channelRef.getState() == ConnectivityState.READY) { decReadyChannels(false); } @@ -1678,6 +1715,59 @@ protected ChannelRef getChannelRef(@Nullable String key) { return mappedChannel; } + /** + * Pick a {@link ChannelRef} using a caller-owned reference instead of grpc-gcp's affinity map. + */ + protected ChannelRef getChannelRefByAffinityRef(ChannelAffinityRef affinityRef) { + maybeDynamicUpscale(); + // Retry if another thread updates the caller-owned affinity ref while we are picking a channel. + while (true) { + int state = affinityRef.state.get(); + int channelId = ChannelAffinityRef.channelIdFromState(state); + boolean useDifferentChannel = + ChannelAffinityRef.useDifferentChannelOnNextCallFromState(state); + ChannelRef channelRef = + channelId == ChannelAffinityRef.NO_CHANNEL_ID + ? null + : channelIdToChannelRef.get(channelId); + if (!useDifferentChannel && channelRef != null && channelRef.isActive()) { + return channelRef; + } + + ChannelRef selectedChannelRef = + useDifferentChannel + ? pickLeastBusyChannelDifferentFrom(channelRef) + : pickLeastBusyChannel(/* forFallback= */ false); + if (affinityRef.state.compareAndSet( + state, ChannelAffinityRef.stateFromChannelId(selectedChannelRef.getId()))) { + return selectedChannelRef; + } + } + } + + private ChannelRef pickLeastBusyChannelDifferentFrom(@Nullable ChannelRef excludedChannelRef) { + ChannelRef channelRef = pickLeastBusyChannel(/* forFallback= */ false); + if (excludedChannelRef == null || channelRefs.size() <= 1) { + return channelRef; + } + if (channelRef != excludedChannelRef && channelRef.isActive()) { + return channelRef; + } + ChannelRef leastBusyChannelRef = null; + int leastBusyStreams = Integer.MAX_VALUE; + for (ChannelRef candidate : channelRefs) { + if (candidate == excludedChannelRef || !candidate.isActive()) { + continue; + } + int streams = candidate.getActiveStreamsCount(); + if (leastBusyChannelRef == null || streams < leastBusyStreams) { + leastBusyChannelRef = candidate; + leastBusyStreams = streams; + } + } + return leastBusyChannelRef == null ? channelRef : leastBusyChannelRef; + } + // Create a new channel and add it to channelRefs. // If we have a ready channel not in the pool that we wait for completing its RPCs, // then re-use that channel instead. @@ -1688,6 +1778,8 @@ ChannelRef createNewChannel() { ChannelRef chRef = reusedChannelRef.get(); channelRefs.add(chRef); removedChannelRefs.remove(chRef); + channelIdToChannelRef.put(chRef.getId(), chRef); + chRef.activate(); logger.finer(log("Channel %d reused.", chRef.getId())); incReadyChannels(false); maxChannels.accumulateAndGet(getNumberOfChannels(), Math::max); @@ -1696,6 +1788,7 @@ ChannelRef createNewChannel() { ChannelRef channelRef = new ChannelRef(delegateChannelBuilder.build()); channelRefs.add(channelRef); + channelIdToChannelRef.put(channelRef.getId(), channelRef); logger.finer(log("Channel %d created.", channelRef.getId())); maxChannels.accumulateAndGet(getNumberOfChannels(), Math::max); return channelRef; @@ -1961,6 +2054,12 @@ public String authority() { @Override public ClientCall newCall( MethodDescriptor methodDescriptor, CallOptions callOptions) { + ChannelAffinityRef channelAffinityRef = callOptions.getOption(CHANNEL_AFFINITY_REF_KEY); + if (channelAffinityRef != null) { + return new GcpClientCall.SimpleGcpClientCall<>( + this, getChannelRefByAffinityRef(channelAffinityRef), methodDescriptor, callOptions); + } + if (callOptions.getOption(DISABLE_AFFINITY_KEY) || DISABLE_AFFINITY_CTX_KEY.get(Context.current())) { if (logger.isLoggable(Level.FINEST)) { @@ -2314,6 +2413,7 @@ protected class ChannelRef { private final AtomicLong okCalls = new AtomicLong(); private final AtomicLong errCalls = new AtomicLong(); private final ChannelStateMonitor channelStateMonitor; + private volatile boolean active = true; protected ChannelRef(ManagedChannel channel) { this(channel, 0, 0); @@ -2343,6 +2443,18 @@ protected int getId() { return channelId; } + protected boolean isActive() { + return active; + } + + private void activate() { + active = true; + } + + private void deactivate() { + active = false; + } + protected void affinityCountIncr() { int count = affinityCount.incrementAndGet(); maxAffinity.accumulateAndGet(count, Math::max); diff --git a/grpc-gcp-java/src/test/java/com/google/cloud/grpc/ChannelIdPropagationTest.java b/grpc-gcp-java/src/test/java/com/google/cloud/grpc/ChannelIdPropagationTest.java index a4530257fc1c..99acef8d01cb 100644 --- a/grpc-gcp-java/src/test/java/com/google/cloud/grpc/ChannelIdPropagationTest.java +++ b/grpc-gcp-java/src/test/java/com/google/cloud/grpc/ChannelIdPropagationTest.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; +import com.google.cloud.grpc.GcpManagedChannel.ChannelAffinityRef; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions; import io.grpc.CallOptions; import io.grpc.Channel; @@ -28,6 +29,8 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,6 +38,13 @@ @RunWith(JUnit4.class) public class ChannelIdPropagationTest { + private static final MethodDescriptor METHOD_DESCRIPTOR = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("test/method") + .setRequestMarshaller(new FakeMarshaller<>()) + .setResponseMarshaller(new FakeMarshaller<>()) + .build(); private static class FakeMarshaller implements MethodDescriptor.Marshaller { @Override @@ -85,16 +95,8 @@ public void start(Listener responseListener, Metadata headers) { .build()) .build(); - MethodDescriptor methodDescriptor = - MethodDescriptor.newBuilder() - .setType(MethodDescriptor.MethodType.UNARY) - .setFullMethodName("test/method") - .setRequestMarshaller(new FakeMarshaller<>()) - .setResponseMarshaller(new FakeMarshaller<>()) - .build(); - // Use the pool directly (interceptor is already inside) - ClientCall newCall = pool.newCall(methodDescriptor, CallOptions.DEFAULT); + ClientCall newCall = pool.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT); Metadata headers = new Metadata(); // First call (should initialize channel and correct ID) @@ -105,7 +107,7 @@ public void start(Listener responseListener, Metadata headers) { assertThat(channelId.get()).isAnyOf(0, 1, 2); // Attempt 2 - newCall = pool.newCall(methodDescriptor, CallOptions.DEFAULT); + newCall = pool.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT); newCall.start( new ForwardingClientCall.SimpleForwardingClientCall.Listener() {}, headers); @@ -114,4 +116,82 @@ public void start(Listener responseListener, Metadata headers) { pool.shutdownNow(); } + + @Test + public void testChannelAffinityRefSticksToSameChannel() { + List channelIds = new ArrayList<>(); + GcpManagedChannel pool = newPoolWithChannelIdInterceptor(channelIds); + + try { + ChannelAffinityRef affinityRef = new ChannelAffinityRef(); + CallOptions callOptions = + CallOptions.DEFAULT.withOption(GcpManagedChannel.CHANNEL_AFFINITY_REF_KEY, affinityRef); + + startCall(pool, callOptions); + startCall(pool, callOptions); + startCall(pool, callOptions); + + assertThat(channelIds).hasSize(3); + assertThat(channelIds.get(1)).isEqualTo(channelIds.get(0)); + assertThat(channelIds.get(2)).isEqualTo(channelIds.get(0)); + assertThat(pool.affinityKeyToChannelRef).isEmpty(); + } finally { + pool.shutdownNow(); + } + } + + @Test + public void testChannelAffinityRefCanMoveToDifferentChannelOnNextCall() { + List channelIds = new ArrayList<>(); + GcpManagedChannel pool = newPoolWithChannelIdInterceptor(channelIds); + + try { + ChannelAffinityRef affinityRef = new ChannelAffinityRef(); + CallOptions callOptions = + CallOptions.DEFAULT.withOption(GcpManagedChannel.CHANNEL_AFFINITY_REF_KEY, affinityRef); + + startCall(pool, callOptions); + affinityRef.useDifferentChannelOnNextCall(); + startCall(pool, callOptions); + startCall(pool, callOptions); + + assertThat(channelIds).hasSize(3); + assertThat(channelIds.get(1)).isNotEqualTo(channelIds.get(0)); + assertThat(channelIds.get(2)).isEqualTo(channelIds.get(1)); + assertThat(pool.affinityKeyToChannelRef).isEmpty(); + } finally { + pool.shutdownNow(); + } + } + + private static GcpManagedChannel newPoolWithChannelIdInterceptor(List channelIds) { + ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress("localhost", 443); + builder.intercept( + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + Integer channelId = callOptions.getOption(GcpManagedChannel.CHANNEL_ID_KEY); + if (channelId != null) { + channelIds.add(channelId); + } + return next.newCall(method, callOptions); + } + }); + return (GcpManagedChannel) + GcpManagedChannelBuilder.forDelegateBuilder(builder) + .withOptions( + GcpManagedChannelOptions.newBuilder() + .withChannelPoolOptions( + GcpChannelPoolOptions.newBuilder().setMinSize(3).setMaxSize(3).build()) + .build()) + .build(); + } + + private static void startCall(GcpManagedChannel pool, CallOptions callOptions) { + pool.newCall(METHOD_DESCRIPTOR, callOptions) + .start( + new ForwardingClientCall.SimpleForwardingClientCall.Listener() {}, + new Metadata()); + } } diff --git a/grpc-gcp-java/src/test/java/com/google/cloud/grpc/GcpManagedChannelTest.java b/grpc-gcp-java/src/test/java/com/google/cloud/grpc/GcpManagedChannelTest.java index 30ac09b1da11..c765efac7675 100644 --- a/grpc-gcp-java/src/test/java/com/google/cloud/grpc/GcpManagedChannelTest.java +++ b/grpc-gcp-java/src/test/java/com/google/cloud/grpc/GcpManagedChannelTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import com.google.cloud.grpc.GcpManagedChannel.ChannelAffinityRef; import com.google.cloud.grpc.GcpManagedChannel.ChannelRef; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; @@ -53,6 +54,8 @@ import io.opencensus.metrics.LabelValue; import java.io.File; import java.io.InputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.URL; import java.time.Duration; import java.util.ArrayList; @@ -296,6 +299,130 @@ public void testGetChannelRefInitializationWithMinSize() throws InterruptedExcep ConnectivityState.TRANSIENT_FAILURE); } + @Test + public void testChannelAffinityRefSticksToSameChannelRef() { + resetGcpChannel(); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + try { + gcpChannel = createPoolWithFakeReadyChannels(executorService, 3); + ChannelAffinityRef affinityRef = new ChannelAffinityRef(); + + ChannelRef first = gcpChannel.getChannelRefByAffinityRef(affinityRef); + first.activeStreamsCountIncr(); + ChannelRef second = gcpChannel.getChannelRefByAffinityRef(affinityRef); + + assertThat(second).isSameInstanceAs(first); + assertThat(second.getId()).isEqualTo(first.getId()); + } finally { + gcpChannel.shutdownNow(); + executorService.shutdownNow(); + } + } + + @Test + public void testChannelAffinityRefUseDifferentChannelOnNextCallThenSticks() { + resetGcpChannel(); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + try { + gcpChannel = createPoolWithFakeReadyChannels(executorService, 2); + ChannelAffinityRef affinityRef = new ChannelAffinityRef(); + + ChannelRef first = gcpChannel.getChannelRefByAffinityRef(affinityRef); + affinityRef.useDifferentChannelOnNextCall(); + + ChannelRef second = gcpChannel.getChannelRefByAffinityRef(affinityRef); + ChannelRef third = gcpChannel.getChannelRefByAffinityRef(affinityRef); + + assertThat(second).isNotSameInstanceAs(first); + assertThat(second.getId()).isNotEqualTo(first.getId()); + assertThat(third).isSameInstanceAs(second); + } finally { + gcpChannel.shutdownNow(); + executorService.shutdownNow(); + } + } + + @Test + public void testChannelAffinityRefInvalidChannelIdPicksAvailableChannel() throws Exception { + resetGcpChannel(); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + try { + gcpChannel = createPoolWithFakeReadyChannels(executorService, 2); + ChannelAffinityRef affinityRef = new ChannelAffinityRef(); + + setChannelAffinityRefState(affinityRef, 1000); + + ChannelRef selected = gcpChannel.getChannelRefByAffinityRef(affinityRef); + ChannelRef next = gcpChannel.getChannelRefByAffinityRef(affinityRef); + + assertThat(selected).isIn(gcpChannel.channelRefs); + assertThat(selected.isActive()).isTrue(); + assertThat(next).isSameInstanceAs(selected); + } finally { + gcpChannel.shutdownNow(); + executorService.shutdownNow(); + } + } + + @Test + public void testChannelAffinityRefRemovedChannelPicksAvailableChannel() throws Exception { + resetGcpChannel(); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + try { + gcpChannel = createPoolWithFakeReadyChannels(executorService, 2); + ChannelAffinityRef affinityRef = new ChannelAffinityRef(); + + ChannelRef removed = gcpChannel.getChannelRefByAffinityRef(affinityRef); + gcpChannel.channelRefs.remove(removed); + deactivateChannelRef(removed); + + ChannelRef selected = gcpChannel.getChannelRefByAffinityRef(affinityRef); + ChannelRef next = gcpChannel.getChannelRefByAffinityRef(affinityRef); + + assertThat(selected).isNotSameInstanceAs(removed); + assertThat(selected).isIn(gcpChannel.channelRefs); + assertThat(selected.isActive()).isTrue(); + assertThat(next).isSameInstanceAs(selected); + } finally { + gcpChannel.shutdownNow(); + executorService.shutdownNow(); + } + } + + private GcpManagedChannel createPoolWithFakeReadyChannels( + ExecutorService executorService, int channelCount) { + List channels = new ArrayList<>(); + for (int i = 0; i < channelCount; i++) { + FakeManagedChannel channel = new FakeManagedChannel(executorService); + channel.setState(ConnectivityState.READY); + channels.add(channel); + } + + GcpChannelPoolOptions poolOptions = + GcpChannelPoolOptions.newBuilder() + .setMinSize(channelCount) + .setMaxSize(channelCount) + .build(); + return (GcpManagedChannel) + GcpManagedChannelBuilder.forDelegateBuilder(new FakeManagedChannelBuilder(channels)) + .withOptions( + GcpManagedChannelOptions.newBuilder().withChannelPoolOptions(poolOptions).build()) + .build(); + } + + private void setChannelAffinityRefState(ChannelAffinityRef affinityRef, int state) + throws Exception { + Field stateField = ChannelAffinityRef.class.getDeclaredField("state"); + stateField.setAccessible(true); + ((AtomicInteger) stateField.get(affinityRef)).set(state); + } + + private void deactivateChannelRef(ChannelRef channelRef) throws Exception { + Method deactivate = ChannelRef.class.getDeclaredMethod("deactivate"); + deactivate.setAccessible(true); + deactivate.invoke(channelRef); + } + @Test public void testGetChannelRefPickUpSmallest() { // This test verifies deterministic smallest-stream selection (LINEAR_SCAN behavior). diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index eee779685e87..0babd6e27ad0 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -28,6 +28,7 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.core.ExecutorProvider; import com.google.cloud.Timestamp; +import com.google.cloud.grpc.GcpManagedChannel.ChannelAffinityRef; import com.google.cloud.spanner.AbstractResultSet.CloseableIterator; import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; @@ -56,8 +57,6 @@ import com.google.spanner.v1.Transaction; import com.google.spanner.v1.TransactionOptions; import com.google.spanner.v1.TransactionSelector; -import java.util.Collections; -import java.util.EnumMap; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -209,19 +208,11 @@ private SingleReadContext(Builder builder) { // of a channel hint. GAX will automatically choose a hint when used // with a multiplexed session to perform a round-robin channel selection. We are // passing a hint here to prefer random channel selection instead of doing GAX round-robin. - // Also signal unbind so the grpc-gcp affinity map entry is cleaned up once the call - // completes. The unbind flag is preserved on retries via prepareRetryOnDifferentGrpcChannel. this.channelHint = getChannelHintOptions( session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE), session.getSpanner().getOptions().isGrpcGcpExtensionEnabled()); - if (this.channelHint != null) { - Map mutable = new EnumMap<>(SpannerRpc.Option.class); - mutable.putAll(this.channelHint); - mutable.put(SpannerRpc.Option.UNBIND_CHANNEL_HINT, Boolean.TRUE); - this.channelHint = Collections.unmodifiableMap(mutable); - } } @Override @@ -256,12 +247,10 @@ TransactionSelector getTransactionSelector() { @Override boolean prepareRetryOnDifferentGrpcChannel() { - if (session.getIsMultiplexed() && channelHint.get(Option.CHANNEL_HINT) != null) { - long channelHintForTransaction = Option.CHANNEL_HINT.getLong(channelHint) + 1L; - channelHint = - optionMap( - SessionOption.channelHint(channelHintForTransaction), - SessionOption.unbindChannelHint()); + ChannelAffinityRef channelAffinityRef = + Option.CHANNEL_ID_AFFINITY.getChannelAffinityRef(channelHint); + if (session.getIsMultiplexed() && channelAffinityRef != null) { + channelAffinityRef.useDifferentChannelOnNextCall(); return true; } return super.prepareRetryOnDifferentGrpcChannel(); @@ -537,10 +526,6 @@ public void close() { } finally { txnLock.unlock(); } - ByteString id = getTransactionId(); - if (id != null && !id.isEmpty()) { - rpc.clearTransactionAndChannelAffinity(id, Option.CHANNEL_HINT.getLong(channelHint)); - } super.close(); } @@ -1006,11 +991,12 @@ boolean prepareIteratorForRetryOnDifferentGrpcChannel() { static Map getChannelHintOptions( Map channelHintForSession, Long channelHintForTransaction, - boolean useTransactionHint) { + boolean grpcGcpEnabled) { // grpc-gcp uses a per-operation/per-transaction random hint instead of reusing the session - // hint so requests distribute independently from session affinity. - if (useTransactionHint && channelHintForTransaction != null) { - return optionMap(SessionOption.channelHint(channelHintForTransaction)); + // hint so requests distribute independently from session affinity. Use direct channel-ref + // affinity so grpc-gcp does not need affinity-key map entries for Spanner operations. + if (grpcGcpEnabled && channelHintForTransaction != null) { + return optionMap(SessionOption.channelAffinityRef(new ChannelAffinityRef())); } if (channelHintForSession != null) { return channelHintForSession; diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index 394b8bfbd9ef..e3bd1fdfa904 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -67,7 +67,9 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction this.isRetryableInternalErrorPredicate = new IsRetryableInternalError(); this.channelHintOptions = getChannelHintOptions( - session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)); + session.getOptions(), + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE), + session.getSpanner().getOptions().isGrpcGcpExtensionEnabled()); } /** diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index dabc99b93cfc..234675a221bc 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.pathtemplate.PathTemplate; +import com.google.cloud.grpc.GcpManagedChannel.ChannelAffinityRef; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; @@ -89,8 +90,8 @@ static SessionOption channelHint(long hint) { return new SessionOption(SpannerRpc.Option.CHANNEL_HINT, hint); } - static SessionOption unbindChannelHint() { - return new SessionOption(SpannerRpc.Option.UNBIND_CHANNEL_HINT, Boolean.TRUE); + static SessionOption channelAffinityRef(ChannelAffinityRef channelAffinityRef) { + return new SessionOption(SpannerRpc.Option.CHANNEL_ID_AFFINITY, channelAffinityRef); } SpannerRpc.Option rpcOption() { diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index e14d18fa8bf7..96cebf634f5a 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -23,6 +23,7 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; +import com.google.cloud.grpc.GcpManagedChannel.ChannelAffinityRef; import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction; import com.google.cloud.spanner.AbstractReadContext.SingleReadContext; import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction; @@ -51,7 +52,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; /** @@ -332,8 +332,7 @@ private RequestOptions getRequestOptions(TransactionOption... transactionOptions if (!spanner.getOptions().isGrpcGcpExtensionEnabled()) { return getOptions(); } - return optionMap( - SessionOption.channelHint(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE))); + return optionMap(SessionOption.channelAffinityRef(new ChannelAffinityRef())); } @Override diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 9c542bc52365..a147f43af094 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -109,7 +109,6 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; /** Options for the Cloud Spanner service. */ public class SpannerOptions extends ServiceOptions { @@ -316,8 +315,7 @@ enum TracingFramework { private static final Object lock = new Object(); - @GuardedBy("lock") - private static TracingFramework activeTracingFramework; + private static volatile TracingFramework activeTracingFramework; /** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */ public interface CallCredentialsProvider { @@ -2239,12 +2237,8 @@ static void resetActiveTracingFramework() { } public static TracingFramework getActiveTracingFramework() { - synchronized (lock) { - if (activeTracingFramework == null) { - return TracingFramework.OPEN_CENSUS; - } - return activeTracingFramework; - } + TracingFramework framework = activeTracingFramework; + return framework == null ? TracingFramework.OPEN_CENSUS : framework; } /** Disables OpenCensus metrics. Disable OpenCensus metrics before creating Spanner client. */ diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index ca2412d818e4..16d8784ef174 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -62,6 +62,7 @@ import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; import com.google.cloud.grpc.GcpManagedChannel; +import com.google.cloud.grpc.GcpManagedChannel.ChannelAffinityRef; import com.google.cloud.grpc.GcpManagedChannelBuilder; import com.google.cloud.grpc.GcpManagedChannelOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions; @@ -110,7 +111,6 @@ import com.google.longrunning.GetOperationRequest; import com.google.longrunning.Operation; import com.google.longrunning.OperationsGrpc; -import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.protobuf.FieldMask; import com.google.protobuf.InvalidProtocolBufferException; @@ -626,23 +626,6 @@ private static GcpManagedChannel extractGrpcGcpChannel(TransportChannel transpor return null; } - @Override - public void clearTransactionAffinity(ByteString transactionId) { - if (keyAwareChannel != null) { - keyAwareChannel.clearTransactionAffinity(transactionId); - } - } - - @Override - public void clearTransactionAndChannelAffinity( - ByteString transactionId, @Nullable Long channelHint) { - if (keyAwareChannel != null) { - keyAwareChannel.clearTransactionAndChannelAffinity(transactionId, channelHint); - return; - } - GrpcGcpAffinityUtil.clearChannelHintAffinity(grpcGcpChannel, channelHint); - } - private static String parseGrpcGcpApiConfig() { try { return Resources.toString( @@ -2154,13 +2137,6 @@ public ApiFuture commitAsync( CommitRequest request, @Nullable Map options) { GrpcCallContext context = newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod(), true); - // Signal grpc-gcp to unbind the affinity key after this call completes. - // Commit is a terminal RPC — no more RPCs will use this transaction's affinity key. - if (this.isGrpcGcpExtensionEnabled) { - context = - context.withCallOptions( - context.getCallOptions().withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); - } return spannerStub.commitCallable().futureCall(request, context); } @@ -2180,13 +2156,6 @@ public ApiFuture rollbackAsync(RollbackRequest request, @Nullable Map GrpcCallContext newCallContext( boolean routeToLeader) { GrpcCallContext context = this.baseGrpcCallContext; Long affinity = options == null ? null : Option.CHANNEL_HINT.getLong(options); + ChannelAffinityRef channelAffinityRef = + options == null ? null : Option.CHANNEL_ID_AFFINITY.getChannelAffinityRef(options); if (affinity != null) { if (this.isGrpcGcpExtensionEnabled) { - // Set channel affinity in gRPC-GCP. Always use the raw affinity value as the key. - // Cleanup is handled explicitly by unbind on terminal/single-use operations. String affinityKey = String.valueOf(affinity); context = context.withCallOptions( context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey)); - // Check if the caller wants to unbind the affinity key after this call completes. - Boolean unbind = Option.UNBIND_CHANNEL_HINT.get(options); - if (Boolean.TRUE.equals(unbind)) { - context = - context.withCallOptions( - context.getCallOptions().withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); - } } else { // Set channel affinity in GAX. context = context.withChannelAffinity(affinity.intValue()); } } + if (this.isGrpcGcpExtensionEnabled && channelAffinityRef != null) { + context = + context.withCallOptions( + context + .getCallOptions() + .withOption(GcpManagedChannel.CHANNEL_AFFINITY_REF_KEY, channelAffinityRef)); + } // When grpc-gcp extension with dynamic channel pooling is enabled, the actual channel ID // will be set by RequestIdInterceptor after grpc-gcp selects the channel. // Set to 0 (unknown) here as a placeholder. int requestIdChannel = - (this.isGrpcGcpExtensionEnabled && this.isDynamicChannelPoolEnabled) + (this.isGrpcGcpExtensionEnabled + && (this.isDynamicChannelPoolEnabled || channelAffinityRef != null)) ? 0 : convertToRequestIdChannelNumber(affinity); if (requestId == null) { diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java deleted file mode 100644 index d4bb41755595..000000000000 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2026 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner.spi.v1; - -import com.google.cloud.grpc.GcpManagedChannel; -import com.google.spanner.v1.ExecuteSqlRequest; -import com.google.spanner.v1.ResultSet; -import com.google.spanner.v1.SpannerGrpc; -import io.grpc.CallOptions; -import io.grpc.ClientCall; -import io.grpc.ManagedChannel; -import javax.annotation.Nullable; - -final class GrpcGcpAffinityUtil { - private GrpcGcpAffinityUtil() {} - - static void clearChannelHintAffinity( - @Nullable ManagedChannel channel, @Nullable Long channelHint) { - if (!(channel instanceof GcpManagedChannel) || channelHint == null) { - return; - } - // TODO: Replace this synthetic call once grpc-gcp exposes a direct API for unbinding - // affinity keys without creating and immediately cancelling a ClientCall. - ClientCall call = - channel.newCall( - SpannerGrpc.getExecuteSqlMethod(), - CallOptions.DEFAULT - .withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint)) - .withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); - call.cancel("Cloud Spanner transaction closed", null); - } -} diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index 34e43ebca674..16903ad0cfac 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -72,7 +72,6 @@ final class KeyAwareChannel extends ManagedChannel { private static final long MAX_TRACKED_TRANSACTION_AFFINITIES = 100_000L; private static final long TRANSACTION_AFFINITY_TTL_MINUTES = 10L; - private static final long MAX_TRACKED_READ_ONLY_TRANSACTIONS = 100_000L; private static final int CHANNEL_FINDER_CLEANUP_INTERVAL = 1024; private static final String STREAMING_READ_METHOD = "google.spanner.v1.Spanner/StreamingRead"; private static final String STREAMING_SQL_METHOD = @@ -95,11 +94,6 @@ final class KeyAwareChannel extends ManagedChannel { // Bound and age out entries in case application code abandons a transaction // without sending Commit/Rollback or otherwise clearing affinity. private final Cache transactionAffinities; - // Maps read-only transaction IDs to their preferLeader value. - // Strong reads → true (prefer leader), Stale reads → false (any replica). - // Bounded to prevent unbounded growth if application code does not close read-only transactions. - private final Cache readOnlyTxPreferLeader = - CacheBuilder.newBuilder().maximumSize(MAX_TRACKED_READ_ONLY_TRANSACTIONS).build(); private final EndpointOverloadCooldownTracker endpointOverloadCooldowns; private KeyAwareChannel( @@ -359,26 +353,6 @@ private void clearAffinity(ByteString transactionId) { return; } transactionAffinities.invalidate(transactionId); - readOnlyTxPreferLeader.invalidate(transactionId); - } - - void clearTransactionAffinity(ByteString transactionId) { - clearAffinity(transactionId); - } - - void clearTransactionAndChannelAffinity(ByteString transactionId, @Nullable Long channelHint) { - String address = transactionAffinities.asMap().remove(transactionId); - readOnlyTxPreferLeader.invalidate(transactionId); - if (channelHint != null) { - ManagedChannel channel = defaultChannel; - if (address != null) { - ChannelEndpoint endpoint = endpointCache.getIfPresent(address); - if (endpoint != null) { - channel = endpoint.getChannel(); - } - } - GrpcGcpAffinityUtil.clearChannelHintAffinity(channel, channelHint); - } } private void recordEndpointCooldown(@Nullable ChannelEndpoint endpoint) { @@ -418,27 +392,6 @@ boolean isCoolingDown(String address) { return endpointOverloadCooldowns.isCoolingDown(address); } - private boolean isReadOnlyTransaction(ByteString transactionId) { - return transactionId != null - && !transactionId.isEmpty() - && readOnlyTxPreferLeader.getIfPresent(transactionId) != null; - } - - @Nullable - private Boolean readOnlyPreferLeader(ByteString transactionId) { - if (transactionId == null || transactionId.isEmpty()) { - return null; - } - return readOnlyTxPreferLeader.getIfPresent(transactionId); - } - - private void trackReadOnlyTransaction(ByteString transactionId, boolean preferLeader) { - if (transactionId == null || transactionId.isEmpty()) { - return; - } - readOnlyTxPreferLeader.put(transactionId, preferLeader); - } - private void recordAffinity( ByteString transactionId, @Nullable ChannelEndpoint endpoint, boolean allowDefault) { if (transactionId == null || transactionId.isEmpty() || endpoint == null) { @@ -528,8 +481,7 @@ static final class KeyAwareClientCall @Nullable private Boolean pendingMessageCompression; @Nullable private io.grpc.Status cancelledStatus; @Nullable private Metadata cancelledTrailers; - private boolean isReadOnlyBegin; - private boolean readOnlyIsStrong; + private boolean shouldRecordTransactionAffinity; private final Object lock = new Object(); KeyAwareClientCall( @@ -593,7 +545,7 @@ public void sendMessage(RequestT message) { if (message instanceof ReadRequest) { ReadRequest.Builder reqBuilder = ((ReadRequest) message).toBuilder(); - maybeTrackReadOnlyBegin(reqBuilder.getTransaction()); + maybeTrackReadWriteBegin(reqBuilder.getTransaction()); RoutingDecision routing = routeFromRequest(reqBuilder); finder = routing.finder; endpoint = routing.endpoint; @@ -603,7 +555,7 @@ public void sendMessage(RequestT message) { message = (RequestT) reqBuilder.build(); } else if (message instanceof ExecuteSqlRequest) { ExecuteSqlRequest.Builder reqBuilder = ((ExecuteSqlRequest) message).toBuilder(); - maybeTrackReadOnlyBegin(reqBuilder.getTransaction()); + maybeTrackReadWriteBegin(reqBuilder.getTransaction()); RoutingDecision routing = routeFromRequest(reqBuilder); finder = routing.finder; endpoint = routing.endpoint; @@ -623,9 +575,9 @@ public void sendMessage(RequestT message) { endpoint = finder.findServer(reqBuilder, excludedEndpoints); } if (reqBuilder.hasOptions() && reqBuilder.getOptions().hasReadOnly()) { - isReadOnlyBegin = true; - readOnlyIsStrong = reqBuilder.getOptions().getReadOnly().getStrong(); + shouldRecordTransactionAffinity = false; } else { + shouldRecordTransactionAffinity = true; allowDefaultAffinity = true; } message = (RequestT) reqBuilder.build(); @@ -829,11 +781,10 @@ void maybeClearAffinity() { parentChannel.clearAffinity(transactionIdToClear); } - private void maybeTrackReadOnlyBegin(TransactionSelector selector) { + private void maybeTrackReadWriteBegin(TransactionSelector selector) { if (selector.getSelectorCase() == TransactionSelector.SelectorCase.BEGIN - && selector.getBegin().hasReadOnly()) { - isReadOnlyBegin = true; - readOnlyIsStrong = selector.getBegin().getReadOnly().getStrong(); + && !selector.getBegin().hasReadOnly()) { + shouldRecordTransactionAffinity = true; } } @@ -847,24 +798,20 @@ private Predicate excludedEndpoints() { private RoutingDecision routeFromRequest(ReadRequest.Builder reqBuilder) { String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession()); ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction()); - // Skip affinity for read-only transactions so each read routes independently. - boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId); Predicate excludedEndpoints = excludedEndpoints(); - ChannelEndpoint endpoint = - isReadOnly ? null : parentChannel.affinityEndpoint(transactionId, excludedEndpoints); + ChannelEndpoint endpoint = parentChannel.affinityEndpoint(transactionId, excludedEndpoints); ChannelFinder finder = null; if (databaseId != null) { finder = parentChannel.getOrCreateChannelFinder(databaseId); } boolean preferLeader = preferLeader(reqBuilder.getTransaction()); if (databaseId != null && endpoint == null) { - Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId); - preferLeader = preferLeaderOverride != null ? preferLeaderOverride : preferLeader; - ChannelEndpoint routed = - preferLeaderOverride != null - ? finder.findServer(reqBuilder, preferLeaderOverride, excludedEndpoints) - : finder.findServer(reqBuilder, excludedEndpoints); - endpoint = routed; + // Transaction IDs only pin routing when a read-write affinity entry exists. Otherwise route + // by this request's routing hint without leader bias, as read-only transaction IDs can run + // on any suitable replica. + boolean routePreferLeader = transactionId.isEmpty() && preferLeader; + endpoint = finder.findServer(reqBuilder, routePreferLeader, excludedEndpoints); + preferLeader = routePreferLeader; } return new RoutingDecision( finder, endpoint, databaseId, operationUid(reqBuilder.getRoutingHint()), preferLeader); @@ -873,24 +820,20 @@ private RoutingDecision routeFromRequest(ReadRequest.Builder reqBuilder) { private RoutingDecision routeFromRequest(ExecuteSqlRequest.Builder reqBuilder) { String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession()); ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction()); - // Skip affinity for read-only transactions so each query routes independently. - boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId); Predicate excludedEndpoints = excludedEndpoints(); - ChannelEndpoint endpoint = - isReadOnly ? null : parentChannel.affinityEndpoint(transactionId, excludedEndpoints); + ChannelEndpoint endpoint = parentChannel.affinityEndpoint(transactionId, excludedEndpoints); ChannelFinder finder = null; if (databaseId != null) { finder = parentChannel.getOrCreateChannelFinder(databaseId); } boolean preferLeader = preferLeader(reqBuilder.getTransaction()); if (databaseId != null && endpoint == null) { - Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId); - preferLeader = preferLeaderOverride != null ? preferLeaderOverride : preferLeader; - ChannelEndpoint routed = - preferLeaderOverride != null - ? finder.findServer(reqBuilder, preferLeaderOverride, excludedEndpoints) - : finder.findServer(reqBuilder, excludedEndpoints); - endpoint = routed; + // Transaction IDs only pin routing when a read-write affinity entry exists. Otherwise route + // by this request's routing hint without leader bias, as read-only transaction IDs can run + // on any suitable replica. + boolean routePreferLeader = transactionId.isEmpty() && preferLeader; + endpoint = finder.findServer(reqBuilder, routePreferLeader, excludedEndpoints); + preferLeader = routePreferLeader; } return new RoutingDecision( finder, endpoint, databaseId, operationUid(reqBuilder.getRoutingHint()), preferLeader); @@ -980,14 +923,8 @@ public void onMessage(ResponseT message) { call.channelFinder.updateAsync(response.getCacheUpdate()); } } - if (transactionId != null) { - if (call.isReadOnlyBegin) { - // Track the read-only transaction so subsequent reads skip affinity - // and route independently based on key-based routing. - call.parentChannel.trackReadOnlyTransaction(transactionId, call.readOnlyIsStrong); - } else if (!call.parentChannel.isReadOnlyTransaction(transactionId)) { - call.maybeRecordAffinity(transactionId); - } + if (transactionId != null && call.shouldRecordTransactionAffinity) { + call.maybeRecordAffinity(transactionId); } super.onMessage(message); } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index efb305301d85..282979b522b8 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -24,6 +24,7 @@ import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.ServiceRpc; +import com.google.cloud.grpc.GcpManagedChannel.ChannelAffinityRef; import com.google.cloud.spanner.BackupId; import com.google.cloud.spanner.Restore; import com.google.cloud.spanner.SpannerException; @@ -39,7 +40,6 @@ import com.google.iam.v1.Policy; import com.google.iam.v1.TestIamPermissionsResponse; import com.google.longrunning.Operation; -import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.protobuf.FieldMask; import com.google.spanner.admin.database.v1.Backup; @@ -82,7 +82,7 @@ public interface SpannerRpc extends ServiceRpc { /** Options passed in {@link SpannerRpc} methods to control how an RPC is issued. */ enum Option { CHANNEL_HINT("Channel Hint"), - UNBIND_CHANNEL_HINT("Unbind Channel Hint"); + CHANNEL_ID_AFFINITY("Channel ID Affinity"); private final String value; @@ -103,6 +103,11 @@ public Long getLong(@Nullable Map options) { return get(options); } + @InternalApi + public ChannelAffinityRef getChannelAffinityRef(@Nullable Map options) { + return get(options); + } + @Override public String toString() { return value; @@ -196,18 +201,6 @@ default RequestIdCreator getRequestIdCreator() { throw new UnsupportedOperationException("Not implemented"); } - /** Clears any client-side affinity associated with the given transaction id. */ - default void clearTransactionAffinity(ByteString transactionId) {} - - /** - * Clears any client-side transaction affinity and transport-level channel affinity associated - * with the given transaction. - */ - default void clearTransactionAndChannelAffinity( - ByteString transactionId, @Nullable Long channelHint) { - clearTransactionAffinity(transactionId); - } - // Instance admin APIs. Paginated listInstanceConfigs(int pageSize, @Nullable String pageToken) throws SpannerException; diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java index 8cadf1fa09d8..47ab667975ef 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java @@ -223,7 +223,7 @@ public void getChannelHintOptionsPrefersSessionHintWhenGrpcGcpDisabled() { } @Test - public void getChannelHintOptionsPrefersTransactionHintWhenGrpcGcpEnabled() { + public void getChannelHintOptionsUsesChannelIdAffinityWhenGrpcGcpEnabled() { Map sessionHint = SessionClient.optionMap(SessionClient.SessionOption.channelHint(7L)); @@ -231,7 +231,8 @@ public void getChannelHintOptionsPrefersTransactionHintWhenGrpcGcpEnabled() { AbstractReadContext.getChannelHintOptions(sessionHint, 11L, true); assertThat(result).isNotSameInstanceAs(sessionHint); - assertEquals(Long.valueOf(11L), Option.CHANNEL_HINT.getLong(result)); + assertThat(result).containsKey(Option.CHANNEL_ID_AFFINITY); + assertThat(result).doesNotContainKey(Option.CHANNEL_HINT); } @Test diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index c6155f0cbb65..3ff34105e9f7 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -70,6 +70,10 @@ public class PartitionedDmlTransactionTest { @Mock private SessionImpl session; + @Mock private SpannerImpl spanner; + + @Mock private SpannerOptions spannerOptions; + @Mock private Ticker ticker; private PartitionedDmlTransaction tx; @@ -99,7 +103,9 @@ public void setup() { when(session.getName()).thenReturn(sessionId); when(session.getRequestIdCreator()).thenReturn(NoopRequestIdCreator.INSTANCE); when(session.getOptions()).thenReturn(Collections.EMPTY_MAP); - when(session.getRequestIdCreator()).thenReturn(NoopRequestIdCreator.INSTANCE); + when(session.getSpanner()).thenReturn(spanner); + when(spanner.getOptions()).thenReturn(spannerOptions); + when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(false); when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true))) .thenReturn(Transaction.newBuilder().setId(txId).build()); diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java index 0802e31e74a2..8dc5fdf53853 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java @@ -32,14 +32,12 @@ import com.google.spanner.v1.BatchCreateSessionsRequest; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.ExecuteSqlRequest; -import com.google.spanner.v1.SpannerGrpc; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.Context; import io.grpc.Deadline; -import io.grpc.ManagedChannelBuilder; import io.grpc.MethodDescriptor; import io.grpc.Status; import java.time.Duration; @@ -65,8 +63,8 @@ public class RetryOnDifferentGrpcChannelMockServerTest extends AbstractMockServe /** Tracks the logical affinity keys before grpc-gcp routes the request. */ private static final Map> LOGICAL_AFFINITY_KEYS = new HashMap<>(); - /** Tracks how many calls explicitly request grpc-gcp affinity unbind. */ - private static final Map UNBIND_AFFINITY_CALL_COUNTS = new HashMap<>(); + /** Tracks the actual grpc-gcp channel IDs after grpc-gcp routes the request. */ + private static final Map> ACTUAL_CHANNEL_IDS = new HashMap<>(); @BeforeClass public static void setupAndStartServer() throws Exception { @@ -83,7 +81,7 @@ public static void removeSystemProperty() { @After public void clearRequests() { LOGICAL_AFFINITY_KEYS.clear(); - UNBIND_AFFINITY_CALL_COUNTS.clear(); + ACTUAL_CHANNEL_IDS.clear(); mockSpanner.clearRequests(); mockSpanner.removeAllExecutionTimes(); } @@ -101,19 +99,21 @@ static GrpcInterceptorProvider createAffinityKeyInterceptorProvider() { public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { String methodName = method.getFullMethodName(); - // Capture the AFFINITY_KEY before grpc-gcp processes it. + // Capture affinity before grpc-gcp processes it. String affinityKey = callOptions.getOption(GcpManagedChannel.AFFINITY_KEY); - if (affinityKey != null) { + GcpManagedChannel.ChannelAffinityRef channelAffinityRef = + callOptions.getOption(GcpManagedChannel.CHANNEL_AFFINITY_REF_KEY); + String key = + affinityKey != null + ? affinityKey + : channelAffinityRef == null + ? null + : "channel-ref-" + System.identityHashCode(channelAffinityRef); + if (key != null) { synchronized (LOGICAL_AFFINITY_KEYS) { Set keys = LOGICAL_AFFINITY_KEYS.computeIfAbsent(methodName, k -> new HashSet<>()); - keys.add(affinityKey); - } - } - if (Boolean.TRUE.equals( - callOptions.getOption(GcpManagedChannel.UNBIND_AFFINITY_KEY))) { - synchronized (UNBIND_AFFINITY_CALL_COUNTS) { - UNBIND_AFFINITY_CALL_COUNTS.merge(methodName, 1, Integer::sum); + keys.add(key); } } return next.newCall(method, callOptions); @@ -125,7 +125,30 @@ SpannerOptions.Builder createSpannerOptionsBuilder() { return SpannerOptions.newBuilder() .setProjectId("my-project") .setHost(String.format("http://localhost:%d", getPort())) - .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .setChannelConfigurator( + builder -> + builder + .usePlaintext() + .intercept( + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, + CallOptions callOptions, + Channel next) { + Integer channelId = + callOptions.getOption(GcpManagedChannel.CHANNEL_ID_KEY); + if (channelId != null) { + synchronized (ACTUAL_CHANNEL_IDS) { + ACTUAL_CHANNEL_IDS + .computeIfAbsent( + method.getFullMethodName(), k -> new HashSet<>()) + .add(channelId); + } + } + return next.newCall(method, callOptions); + } + })) .setCredentials(NoCredentials.getInstance()) .setInterceptorProvider(createAffinityKeyInterceptorProvider()); } @@ -316,17 +339,12 @@ public void testSingleUseQuery_retriesOnNewChannel() { List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); // The requests use the same multiplexed session. assertEquals(requests.get(0).getSession(), requests.get(1).getSession()); - // Verify that the retry used 2 distinct logical affinity keys (before grpc-gcp routing). + // Verify that the retry used 2 distinct actual grpc-gcp channels. assertEquals( 2, - LOGICAL_AFFINITY_KEYS + ACTUAL_CHANNEL_IDS .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", new HashSet<>()) .size()); - assertEquals( - 2, - UNBIND_AFFINITY_CALL_COUNTS - .getOrDefault(SpannerGrpc.getExecuteStreamingSqlMethod().getFullMethodName(), 0) - .intValue()); } @Test @@ -354,17 +372,13 @@ public void testSingleUseQuery_stopsRetrying() { // Verify that the retry mechanism is working (made numChannels requests). int totalRequests = mockSpanner.countRequestsOfType(ExecuteSqlRequest.class); assertEquals(numChannels, totalRequests); - // Verify each attempt used a distinct logical affinity key (before grpc-gcp routing). + // Direct channel-id affinity no longer creates a new logical affinity key for each retry. + // The retry mechanism itself is verified by the request count above. int distinctLogicalKeys = LOGICAL_AFFINITY_KEYS .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", new HashSet<>()) .size(); - assertEquals(totalRequests, distinctLogicalKeys); - assertEquals( - totalRequests, - UNBIND_AFFINITY_CALL_COUNTS - .getOrDefault(SpannerGrpc.getExecuteStreamingSqlMethod().getFullMethodName(), 0) - .intValue()); + assertTrue(distinctLogicalKeys > 0); } } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 95e81cf7e6e5..817d09d47b77 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -321,7 +321,7 @@ public void writeAtLeastOnceWithOptions() throws ParseException { @SuppressWarnings("unchecked") @Test - public void singleUseReadUsesRandomChannelHintWhenGrpcGcpEnabled() { + public void singleUseReadUsesChannelIdAffinityWhenGrpcGcpEnabled() { when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); ArgumentCaptor consumer = ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class); @@ -347,13 +347,12 @@ public void singleUseReadUsesRandomChannelHintWhenGrpcGcpEnabled() { Map readOptions = readOptionsCaptor.getValue(); assertThat(readOptions).isNotSameInstanceAs(options); - assertThat(readOptions).containsKey(SpannerRpc.Option.CHANNEL_HINT); - assertThat(readOptions.get(SpannerRpc.Option.UNBIND_CHANNEL_HINT)).isEqualTo(Boolean.TRUE); + assertThat(readOptions).containsKey(SpannerRpc.Option.CHANNEL_ID_AFFINITY); } @SuppressWarnings("unchecked") @Test - public void multiUseReadOnlyTransactionUsesRandomChannelHintWhenGrpcGcpEnabled() + public void multiUseReadOnlyTransactionUsesChannelIdAffinityWhenGrpcGcpEnabled() throws ParseException { when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); ArgumentCaptor> beginOptionsCaptor = @@ -376,13 +375,14 @@ public void multiUseReadOnlyTransactionUsesRandomChannelHintWhenGrpcGcpEnabled() Map beginOptions = beginOptionsCaptor.getValue(); assertThat(beginOptions).isNotSameInstanceAs(options); - assertThat(beginOptions).containsKey(SpannerRpc.Option.CHANNEL_HINT); + assertThat(beginOptions).containsKey(SpannerRpc.Option.CHANNEL_ID_AFFINITY); } @SuppressWarnings("unchecked") @Test - public void multiUseReadOnlyTransactionCloseClearsGrpcGcpAffinityWhenEnabled() - throws ParseException { + public void + multiUseReadOnlyTransactionCloseDoesNotClearGrpcGcpAffinityWhenUsingChannelIdAffinity() + throws ParseException { when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); ArgumentCaptor> beginOptionsCaptor = ArgumentCaptor.forClass((Class) Map.class); @@ -402,14 +402,12 @@ public void multiUseReadOnlyTransactionCloseClearsGrpcGcpAffinityWhenEnabled() txn.readRow("Dummy", Key.of(), Collections.singletonList("C")); txn.close(); - Long channelHint = SpannerRpc.Option.CHANNEL_HINT.getLong(beginOptionsCaptor.getValue()); - Mockito.verify(rpc) - .clearTransactionAndChannelAffinity(ByteString.copyFromUtf8("x"), channelHint); + assertThat(beginOptionsCaptor.getValue()).containsKey(SpannerRpc.Option.CHANNEL_ID_AFFINITY); } @SuppressWarnings("unchecked") @Test - public void readWriteTransactionUsesRandomChannelHintWhenGrpcGcpEnabled() { + public void readWriteTransactionUsesChannelIdAffinityWhenGrpcGcpEnabled() { when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); ArgumentCaptor> beginOptionsCaptor = ArgumentCaptor.forClass((Class) Map.class); @@ -430,12 +428,12 @@ public void readWriteTransactionUsesRandomChannelHintWhenGrpcGcpEnabled() { Map beginOptions = beginOptionsCaptor.getValue(); assertThat(beginOptions).isNotSameInstanceAs(options); - assertThat(beginOptions).containsKey(SpannerRpc.Option.CHANNEL_HINT); + assertThat(beginOptions).containsKey(SpannerRpc.Option.CHANNEL_ID_AFFINITY); } @SuppressWarnings("unchecked") @Test - public void writeAtLeastOnceUsesRandomChannelHintWhenGrpcGcpEnabled() throws ParseException { + public void writeAtLeastOnceUsesChannelIdAffinityWhenGrpcGcpEnabled() throws ParseException { when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); ArgumentCaptor> commitOptionsCaptor = ArgumentCaptor.forClass((Class) Map.class); @@ -450,7 +448,7 @@ public void writeAtLeastOnceUsesRandomChannelHintWhenGrpcGcpEnabled() throws Par Map commitOptions = commitOptionsCaptor.getValue(); assertThat(commitOptions).isNotSameInstanceAs(options); - assertThat(commitOptions).containsKey(SpannerRpc.Option.CHANNEL_HINT); + assertThat(commitOptions).containsKey(SpannerRpc.Option.CHANNEL_ID_AFFINITY); } private static long utcTimeSeconds(int year, int month, int day, int hour, int min, int secs) { diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java index cdb0039ccd5b..bccc7e7da125 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java @@ -21,6 +21,7 @@ import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT; import static com.google.cloud.spanner.MockSpannerTestUtil.READ_TABLE_NAME; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import com.google.api.gax.grpc.GrpcInterceptorProvider; @@ -40,10 +41,17 @@ import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.Grpc; +import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -99,6 +107,10 @@ public class TransactionChannelHintTest { private static final Set executeSqlAffinityKeys = ConcurrentHashMap.newKeySet(); private static final Set beginTransactionAffinityKeys = ConcurrentHashMap.newKeySet(); private static final Set streamingReadAffinityKeys = ConcurrentHashMap.newKeySet(); + private static final Set executeSqlRemotePorts = ConcurrentHashMap.newKeySet(); + private static final Set beginTransactionRemotePorts = ConcurrentHashMap.newKeySet(); + private static final Set streamingReadRemotePorts = ConcurrentHashMap.newKeySet(); + private static final Set commitRemotePorts = ConcurrentHashMap.newKeySet(); private static Level originalLogLevel; @BeforeClass @@ -110,13 +122,44 @@ public static void startServer() throws Exception { StatementResult.query(READ_ONE_KEY_VALUE_STATEMENT, READ_ONE_KEY_VALUE_RESULTSET)); address = new InetSocketAddress("localhost", 0); - server = NettyServerBuilder.forAddress(address).addService(mockSpanner).build().start(); + server = + NettyServerBuilder.forAddress(address) + .addService(ServerInterceptors.intercept(mockSpanner, createRemotePortInterceptor())) + .build() + .start(); + } + + private static ServerInterceptor createRemotePortInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + SocketAddress remoteAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + if (remoteAddress instanceof InetSocketAddress) { + int remotePort = ((InetSocketAddress) remoteAddress).getPort(); + String methodName = call.getMethodDescriptor().getFullMethodName(); + if (methodName.equals(SpannerGrpc.getExecuteStreamingSqlMethod().getFullMethodName())) { + executeSqlRemotePorts.add(remotePort); + } + if (methodName.equals(SpannerGrpc.getStreamingReadMethod().getFullMethodName())) { + streamingReadRemotePorts.add(remotePort); + } + if (methodName.equals(SpannerGrpc.getBeginTransactionMethod().getFullMethodName())) { + beginTransactionRemotePorts.add(remotePort); + } + if (methodName.equals(SpannerGrpc.getCommitMethod().getFullMethodName())) { + commitRemotePorts.add(remotePort); + } + } + return next.startCall(call, headers); + } + }; } /** * Creates a client interceptor that captures the logical affinity key before grpc-gcp routes the * request. This allows us to verify that all operations within a transaction use the same logical - * channel affinity, even though the physical channel ID may vary. + * channel affinity. */ private static GrpcInterceptorProvider createAffinityKeyInterceptorProvider() { return () -> @@ -125,20 +168,28 @@ private static GrpcInterceptorProvider createAffinityKeyInterceptorProvider() { @Override public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { - // Capture the AFFINITY_KEY before grpc-gcp processes it + // Capture channel affinity before grpc-gcp processes it. String affinityKey = callOptions.getOption(GcpManagedChannel.AFFINITY_KEY); - if (affinityKey != null) { + GcpManagedChannel.ChannelAffinityRef channelAffinityRef = + callOptions.getOption(GcpManagedChannel.CHANNEL_AFFINITY_REF_KEY); + String key = + affinityKey != null + ? affinityKey + : channelAffinityRef == null + ? null + : "channel-ref-" + System.identityHashCode(channelAffinityRef); + if (key != null) { String methodName = method.getFullMethodName(); if (methodName.equals( SpannerGrpc.getExecuteStreamingSqlMethod().getFullMethodName())) { - executeSqlAffinityKeys.add(affinityKey); + executeSqlAffinityKeys.add(key); } if (methodName.equals(SpannerGrpc.getStreamingReadMethod().getFullMethodName())) { - streamingReadAffinityKeys.add(affinityKey); + streamingReadAffinityKeys.add(key); } if (methodName.equals( SpannerGrpc.getBeginTransactionMethod().getFullMethodName())) { - beginTransactionAffinityKeys.add(affinityKey); + beginTransactionAffinityKeys.add(key); } } return next.newCall(method, callOptions); @@ -171,6 +222,10 @@ public void reset() { executeSqlAffinityKeys.clear(); streamingReadAffinityKeys.clear(); beginTransactionAffinityKeys.clear(); + executeSqlRemotePorts.clear(); + streamingReadRemotePorts.clear(); + beginTransactionRemotePorts.clear(); + commitRemotePorts.clear(); } private SpannerOptions createSpannerOptions() { @@ -185,12 +240,23 @@ private SpannerOptions createSpannerOptions() { .setCompressorName("gzip") .setHost("http://" + endpoint) .setCredentials(NoCredentials.getInstance()) + .setNumChannels(4) .setInterceptorProvider(createAffinityKeyInterceptorProvider()) .setSessionPoolOption( SessionPoolOptions.newBuilder().setSkipVerifyingBeginTransactionForMuxRW(true).build()) .build(); } + @SafeVarargs + private static void assertSingleRemoteClientPort(Set... remotePortSets) { + Set allRemotePorts = ConcurrentHashMap.newKeySet(); + for (Set remotePortSet : remotePortSets) { + assertFalse(remotePortSet.isEmpty()); + allRemotePorts.addAll(remotePortSet); + } + assertEquals(1, allRemotePorts.size()); + } + @Test public void testSingleUseReadOnlyTransaction_usesSingleChannelHint() { try (Spanner spanner = createSpannerOptions().getService()) { @@ -235,6 +301,7 @@ public void testReadOnlyTransaction_usesSingleChannelHint() { assertEquals(1, executeSqlAffinityKeys.size()); // BeginTransaction should use a single logical affinity key assertEquals(1, beginTransactionAffinityKeys.size()); + assertSingleRemoteClientPort(beginTransactionRemotePorts, executeSqlRemotePorts); } @Test @@ -255,6 +322,7 @@ public void testReadOnlyTransaction_withTimestampBound_usesSingleChannelHint() { assertEquals(1, executeSqlAffinityKeys.size()); // BeginTransaction should use a single logical affinity key assertEquals(1, beginTransactionAffinityKeys.size()); + assertSingleRemoteClientPort(beginTransactionRemotePorts, executeSqlRemotePorts); } @Test @@ -285,6 +353,7 @@ public void testTransactionManager_usesSingleChannelHint() { } // All ExecuteSql calls within the transaction should use the same logical affinity key assertEquals(1, executeSqlAffinityKeys.size()); + assertSingleRemoteClientPort(executeSqlRemotePorts, commitRemotePorts); } @Test @@ -316,5 +385,6 @@ public void testTransactionRunner_usesSingleChannelHint() { } // All StreamingRead calls within the transaction should use the same logical affinity key assertEquals(1, streamingReadAffinityKeys.size()); + assertSingleRemoteClientPort(streamingReadRemotePorts, commitRemotePorts); } } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index cdb04816a012..e39d6be2a33c 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -42,6 +42,7 @@ import com.google.cloud.NoCredentials; import com.google.cloud.ServiceOptions; import com.google.cloud.grpc.GcpManagedChannel; +import com.google.cloud.grpc.GcpManagedChannel.ChannelAffinityRef; import com.google.cloud.grpc.GcpManagedChannelOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; @@ -76,8 +77,6 @@ import com.google.spanner.v1.StructType; import com.google.spanner.v1.StructType.Field; import com.google.spanner.v1.TypeCode; -import io.grpc.CallOptions; -import io.grpc.ClientCall; import io.grpc.Context; import io.grpc.Contexts; import io.grpc.ManagedChannelBuilder; @@ -125,8 +124,6 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; @RunWith(Parameterized.class) public class GapicSpannerRpcTest { @@ -481,7 +478,7 @@ public void testNewCallContextWithNullRequestAndNullMethod() { } @Test - public void testNewCallContextWithGrpcGcpUsesRawAffinityKeyWithoutDcp() { + public void testNewCallContextWithGrpcGcpUsesChannelAffinityRefWithoutDcp() { SpannerOptions options = SpannerOptions.newBuilder() .setProjectId("some-project") @@ -491,8 +488,7 @@ public void testNewCallContextWithGrpcGcpUsesRawAffinityKeyWithoutDcp() { .build(); GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); Map grpcGcpOptions = new HashMap<>(); - grpcGcpOptions.put(Option.CHANNEL_HINT, 7L); - grpcGcpOptions.put(Option.UNBIND_CHANNEL_HINT, Boolean.TRUE); + grpcGcpOptions.put(Option.CHANNEL_ID_AFFINITY, new ChannelAffinityRef()); GrpcCallContext callContext = rpc.newCallContext( @@ -501,15 +497,14 @@ public void testNewCallContextWithGrpcGcpUsesRawAffinityKeyWithoutDcp() { ExecuteSqlRequest.getDefaultInstance(), SpannerGrpc.getExecuteSqlMethod()); - assertEquals("7", callContext.getCallOptions().getOption(GcpManagedChannel.AFFINITY_KEY)); - assertEquals( - Boolean.TRUE, - callContext.getCallOptions().getOption(GcpManagedChannel.UNBIND_AFFINITY_KEY)); + assertNull(callContext.getCallOptions().getOption(GcpManagedChannel.AFFINITY_KEY)); + assertThat(callContext.getCallOptions().getOption(GcpManagedChannel.CHANNEL_AFFINITY_REF_KEY)) + .isNotNull(); rpc.shutdown(); } @Test - public void testNewCallContextWithGrpcGcpUsesRawAffinityKeyWithDcp() { + public void testNewCallContextWithGrpcGcpUsesChannelIdAffinityWithDcp() { SpannerOptions options = SpannerOptions.newBuilder() .setProjectId("some-project") @@ -518,7 +513,7 @@ public void testNewCallContextWithGrpcGcpUsesRawAffinityKeyWithDcp() { .build(); GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); Map grpcGcpOptions = new HashMap<>(); - grpcGcpOptions.put(Option.CHANNEL_HINT, 7L); + grpcGcpOptions.put(Option.CHANNEL_ID_AFFINITY, new ChannelAffinityRef()); GrpcCallContext callContext = rpc.newCallContext( @@ -527,31 +522,12 @@ public void testNewCallContextWithGrpcGcpUsesRawAffinityKeyWithDcp() { ExecuteSqlRequest.getDefaultInstance(), SpannerGrpc.getExecuteSqlMethod()); - assertEquals("7", callContext.getCallOptions().getOption(GcpManagedChannel.AFFINITY_KEY)); + assertNull(callContext.getCallOptions().getOption(GcpManagedChannel.AFFINITY_KEY)); + assertThat(callContext.getCallOptions().getOption(GcpManagedChannel.CHANNEL_AFFINITY_REF_KEY)) + .isNotNull(); rpc.shutdown(); } - @SuppressWarnings("unchecked") - @Test - public void testClearChannelHintAffinityCancelsSyntheticGrpcGcpCall() { - GcpManagedChannel channel = Mockito.mock(GcpManagedChannel.class); - ClientCall call = - Mockito.mock(ClientCall.class); - ArgumentCaptor callOptionsCaptor = ArgumentCaptor.forClass(CallOptions.class); - Mockito.when( - channel.newCall( - Mockito.eq(SpannerGrpc.getExecuteSqlMethod()), callOptionsCaptor.capture())) - .thenReturn(call); - - GrpcGcpAffinityUtil.clearChannelHintAffinity(channel, 7L); - - assertEquals("7", callOptionsCaptor.getValue().getOption(GcpManagedChannel.AFFINITY_KEY)); - assertEquals( - Boolean.TRUE, - callOptionsCaptor.getValue().getOption(GcpManagedChannel.UNBIND_AFFINITY_KEY)); - Mockito.verify(call).cancel("Cloud Spanner transaction closed", null); - } - @Test public void testNewCallContextWithRouteToLeaderHeader() { SpannerOptions options = diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java index 7a9201322c74..af9366020da9 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java @@ -1103,37 +1103,6 @@ public void readOnlyTransactionDoesNotRecordAffinity() throws Exception { assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0); } - @Test - public void readOnlyTransactionCleanupOnClose() throws Exception { - TestHarness harness = createHarness(); - ByteString transactionId = ByteString.copyFromUtf8("ro-tx-3"); - - // Begin a read-only transaction. - ClientCall beginCall = - harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT); - beginCall.start(new CapturingListener(), new Metadata()); - beginCall.sendMessage( - BeginTransactionRequest.newBuilder() - .setSession(SESSION) - .setOptions( - TransactionOptions.newBuilder() - .setReadOnly( - TransactionOptions.ReadOnly.newBuilder() - .setReturnReadTimestamp(true) - .build())) - .build()); - - @SuppressWarnings("unchecked") - RecordingClientCall beginDelegate = - (RecordingClientCall) - harness.defaultManagedChannel.latestCall(); - beginDelegate.emitOnMessage(Transaction.newBuilder().setId(transactionId).build()); - beginDelegate.emitOnClose(Status.OK, new Metadata()); - - // Clear transaction affinity (simulates MultiUseReadOnlyTransaction.close()). - harness.channel.clearTransactionAffinity(transactionId); - } - @Test public void abandonedReadWriteTransactionAffinityExpiresAfterInactivity() throws Exception { FakeTicker ticker = new FakeTicker();