diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java index cd6b386dc8a7..fc82c530fc6f 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java @@ -41,19 +41,27 @@ public interface ChannelEndpoint { String getAddress(); /** - * Returns whether this server is ready to accept RPCs. + * Returns whether this server's channel is in {@code READY} state and can accept location-aware + * RPCs. * - *

A server is considered unhealthy if: + *

Only endpoints in {@code READY} state are eligible for location-aware routing. Endpoints in + * {@code IDLE}, {@code CONNECTING}, {@code TRANSIENT_FAILURE}, or {@code SHUTDOWN} are not + * considered healthy for location-aware routing purposes. * - *

- * - * @return true if the server is healthy and ready to accept RPCs + * @return true if the channel is in READY state */ boolean isHealthy(); + /** + * Returns whether this server's channel is in {@code TRANSIENT_FAILURE} state. + * + *

When an endpoint is in transient failure, it should be reported as a skipped tablet in + * routing hints so the server can refresh the client cache. + * + * @return true if the channel is in TRANSIENT_FAILURE state + */ + boolean isTransientFailure(); + /** * Returns the gRPC channel for making RPCs to this server. * diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java index 879ed546f2c2..db2af4902f84 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java @@ -54,6 +54,19 @@ public interface ChannelEndpointCache { */ ChannelEndpoint get(String address); + /** + * Returns a cached channel for the given address without creating it. + * + *

Unlike {@link #get(String)}, this method does not create a new endpoint if one does not + * already exist in the cache. This is used by location-aware routing to avoid foreground endpoint + * creation on the request path. + * + * @param address the server address in "host:port" format + * @return the cached channel instance, or null if no endpoint exists for this address + */ + @javax.annotation.Nullable + ChannelEndpoint getIfPresent(String address); + /** * Evicts a server connection from the cache and gracefully shuts down its channel. * diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java index 4ced18eb9203..0fefd93fd180 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java @@ -22,9 +22,11 @@ import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.DirectedReadOptions; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.Group; import com.google.spanner.v1.Mutation; import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RoutingHint; +import com.google.spanner.v1.Tablet; import com.google.spanner.v1.TransactionOptions; import com.google.spanner.v1.TransactionSelector; import java.util.ArrayList; @@ -32,6 +34,7 @@ import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; /** * Finds a server for a request using location-aware routing metadata. @@ -44,9 +47,16 @@ public final class ChannelFinder { private final AtomicLong databaseId = new AtomicLong(); private final KeyRecipeCache recipeCache = new KeyRecipeCache(); private final KeyRangeCache rangeCache; + @Nullable private final EndpointLifecycleManager lifecycleManager; public ChannelFinder(ChannelEndpointCache endpointCache) { - this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache)); + this(endpointCache, null); + } + + public ChannelFinder( + ChannelEndpointCache endpointCache, @Nullable EndpointLifecycleManager lifecycleManager) { + this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache), lifecycleManager); + this.lifecycleManager = lifecycleManager; } void useDeterministicRandom() { @@ -67,6 +77,19 @@ public void update(CacheUpdate update) { recipeCache.addRecipes(update.getKeyRecipes()); } rangeCache.addRanges(update); + + // Notify the lifecycle manager about server addresses so it can create endpoints + // in the background and start probing. + if (lifecycleManager != null) { + for (Group group : update.getGroupList()) { + for (Tablet tablet : group.getTabletsList()) { + String addr = tablet.getServerAddress(); + if (!addr.isEmpty()) { + lifecycleManager.ensureEndpointExists(addr); + } + } + } + } } } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java new file mode 100644 index 000000000000..7e3a317562ee --- /dev/null +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java @@ -0,0 +1,449 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import com.google.api.core.InternalApi; +import com.google.common.annotations.VisibleForTesting; +import com.google.spanner.v1.GetSessionRequest; +import com.google.spanner.v1.SpannerGrpc; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.Status; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Manages the lifecycle of location-aware routing endpoints including background probing, traffic + * tracking, and idle eviction. + * + *

This manager is the only component that proactively creates routed replica endpoints. It: + * + *

+ */ +@InternalApi +class EndpointLifecycleManager { + + private static final Logger logger = Logger.getLogger(EndpointLifecycleManager.class.getName()); + + /** Default probe interval: 60 seconds. Keeps channels from drifting into IDLE. */ + @VisibleForTesting static final long DEFAULT_PROBE_INTERVAL_SECONDS = 60; + + /** Default idle eviction threshold: 30 minutes without real traffic. */ + @VisibleForTesting static final Duration DEFAULT_IDLE_EVICTION_DURATION = Duration.ofMinutes(30); + + /** Interval for checking idle eviction: every 5 minutes. */ + private static final long EVICTION_CHECK_INTERVAL_SECONDS = 300; + + /** Timeout for probe RPCs. */ + private static final long PROBE_TIMEOUT_SECONDS = 10; + + /** Per-endpoint lifecycle state. */ + static final class EndpointState { + final String address; + volatile Instant lastProbeAt; + volatile Instant lastRealTrafficAt; + volatile Instant lastReadyAt; + volatile ScheduledFuture probeFuture; + + EndpointState(String address, Instant now) { + this.address = address; + this.lastRealTrafficAt = now; + this.lastProbeAt = null; + this.lastReadyAt = null; + } + } + + private final ChannelEndpointCache endpointCache; + private final Map endpoints = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final long probeIntervalSeconds; + private final Duration idleEvictionDuration; + private final Clock clock; + private final String defaultEndpointAddress; + + private volatile String multiplexedSessionName; + private ScheduledFuture evictionFuture; + + EndpointLifecycleManager(ChannelEndpointCache endpointCache) { + this( + endpointCache, + DEFAULT_PROBE_INTERVAL_SECONDS, + DEFAULT_IDLE_EVICTION_DURATION, + Clock.systemUTC()); + } + + @VisibleForTesting + EndpointLifecycleManager( + ChannelEndpointCache endpointCache, + long probeIntervalSeconds, + Duration idleEvictionDuration, + Clock clock) { + this.endpointCache = endpointCache; + this.probeIntervalSeconds = probeIntervalSeconds; + this.idleEvictionDuration = idleEvictionDuration; + this.clock = clock; + this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress(); + this.scheduler = + Executors.newScheduledThreadPool( + 1, + r -> { + Thread t = new Thread(r, "spanner-endpoint-lifecycle"); + t.setDaemon(true); + return t; + }); + + // Start periodic eviction checks. + this.evictionFuture = + scheduler.scheduleAtFixedRate( + this::checkIdleEviction, + EVICTION_CHECK_INTERVAL_SECONDS, + EVICTION_CHECK_INTERVAL_SECONDS, + TimeUnit.SECONDS); + } + + /** + * Sets the multiplexed session name used for GetSession probes. All probers reuse the same + * session since multiplexed sessions are enabled. + * + *

The session name is updated on every call because the client may rotate multiplexed sessions + * over time. Probes must always use the current session to avoid sending GetSession to a stale, + * deleted session. + */ + void setMultiplexedSessionName(String sessionName) { + if (sessionName == null || sessionName.isEmpty()) { + return; + } + String previous = this.multiplexedSessionName; + this.multiplexedSessionName = sessionName; + if (previous == null) { + logger.log( + Level.FINE, "Lifecycle manager captured session name for probing: {0}", sessionName); + } else if (!previous.equals(sessionName)) { + logger.log( + Level.FINE, + "Lifecycle manager updated session name for probing: {0} -> {1}", + new Object[] {previous, sessionName}); + } + } + + /** Returns the multiplexed session name, or null if not yet captured. */ + String getMultiplexedSessionName() { + return multiplexedSessionName; + } + + /** + * Ensures an endpoint exists for the given address. If the endpoint does not exist, creates it in + * the background and starts probing. If it already exists, this is a no-op. + * + *

This is called from the cache update path when new server addresses appear. + */ + void ensureEndpointExists(String address) { + if (isShutdown.get() || address == null || address.isEmpty()) { + return; + } + // Don't manage the default endpoint. + if (defaultEndpointAddress.equals(address)) { + return; + } + + endpoints.computeIfAbsent( + address, + addr -> { + logger.log(Level.INFO, "Scheduling background endpoint creation for address: {0}", addr); + EndpointState state = new EndpointState(addr, clock.instant()); + scheduler.submit(() -> createAndStartProbing(addr)); + return state; + }); + } + + /** + * Records that real (non-probe) traffic was routed to an endpoint. This refreshes the idle + * eviction timer for this endpoint. + */ + void recordRealTraffic(String address) { + if (address == null || defaultEndpointAddress.equals(address)) { + return; + } + EndpointState state = endpoints.get(address); + if (state != null) { + state.lastRealTrafficAt = clock.instant(); + } + } + + /** Creates an endpoint and starts probing. Runs on the scheduler thread. */ + private void createAndStartProbing(String address) { + if (isShutdown.get()) { + return; + } + try { + endpointCache.get(address); + logger.log(Level.INFO, "Background endpoint creation completed for: {0}", address); + startProbing(address); + } catch (Exception e) { + logger.log( + Level.WARNING, "Failed to create endpoint for address: " + address + ", will retry", e); + // Schedule a retry after one probe interval. + if (!isShutdown.get()) { + scheduler.schedule( + () -> createAndStartProbing(address), probeIntervalSeconds, TimeUnit.SECONDS); + } + } + } + + /** Starts periodic probing for an endpoint. */ + private void startProbing(String address) { + EndpointState state = endpoints.get(address); + if (state == null || isShutdown.get()) { + return; + } + + // Cancel any existing probe schedule. + if (state.probeFuture != null) { + state.probeFuture.cancel(false); + } + + state.probeFuture = + scheduler.scheduleAtFixedRate( + () -> probe(address), 0, probeIntervalSeconds, TimeUnit.SECONDS); + logger.log( + Level.INFO, + "Prober started for endpoint {0} with interval {1}s", + new Object[] {address, probeIntervalSeconds}); + } + + /** Stops probing for an endpoint. */ + private void stopProbing(String address) { + EndpointState state = endpoints.get(address); + if (state != null && state.probeFuture != null) { + state.probeFuture.cancel(false); + state.probeFuture = null; + logger.log(Level.INFO, "Prober stopped for endpoint: {0}", address); + } + } + + /** Sends a GetSession probe to the endpoint. */ + private void probe(String address) { + if (isShutdown.get()) { + return; + } + + String sessionName = multiplexedSessionName; + if (sessionName == null || sessionName.isEmpty()) { + logger.log( + Level.FINE, + "Skipping probe for {0}: multiplexed session name not yet available", + address); + // Even without a session, request a connection to keep the channel from going idle. + ChannelEndpoint endpoint = endpointCache.getIfPresent(address); + if (endpoint != null) { + try { + endpoint.getChannel().getState(true); + } catch (Exception ignored) { + // Best effort. + } + } + return; + } + + ChannelEndpoint endpoint = endpointCache.getIfPresent(address); + if (endpoint == null) { + logger.log(Level.FINE, "Probe skipped for {0}: endpoint not in cache", address); + return; + } + + EndpointState state = endpoints.get(address); + if (state == null) { + return; + } + + ManagedChannel channel = endpoint.getChannel(); + GetSessionRequest request = GetSessionRequest.newBuilder().setName(sessionName).build(); + + try { + ClientCall call = + channel.newCall( + SpannerGrpc.getGetSessionMethod(), + CallOptions.DEFAULT.withDeadlineAfter(PROBE_TIMEOUT_SECONDS, TimeUnit.SECONDS)); + + call.start( + new ClientCall.Listener() { + @Override + public void onMessage(com.google.spanner.v1.Session message) { + state.lastProbeAt = clock.instant(); + if (endpoint.isHealthy()) { + state.lastReadyAt = clock.instant(); + } + logger.log(Level.FINE, "Probe succeeded for endpoint: {0}", address); + } + + @Override + public void onClose(Status status, Metadata trailers) { + state.lastProbeAt = clock.instant(); + if (!status.isOk()) { + logger.log( + Level.WARNING, + "Probe failed for endpoint {0}: {1}", + new Object[] {address, status}); + } + } + }, + new Metadata()); + + call.sendMessage(request); + call.halfClose(); + call.request(1); + } catch (Exception e) { + state.lastProbeAt = clock.instant(); + logger.log(Level.WARNING, "Probe exception for endpoint " + address, e); + } + } + + /** Checks all managed endpoints for idle eviction. */ + @VisibleForTesting + void checkIdleEviction() { + if (isShutdown.get()) { + return; + } + + Instant now = clock.instant(); + List toEvict = new ArrayList<>(); + + for (Map.Entry entry : endpoints.entrySet()) { + String address = entry.getKey(); + EndpointState state = entry.getValue(); + + // Never evict the default endpoint. + if (defaultEndpointAddress.equals(address)) { + continue; + } + + Duration sinceLastRealTraffic = Duration.between(state.lastRealTrafficAt, now); + if (sinceLastRealTraffic.compareTo(idleEvictionDuration) > 0) { + toEvict.add(address); + } + } + + for (String address : toEvict) { + evictEndpoint(address); + } + } + + /** Evicts an endpoint: stops probing, shuts down the channel pool, removes from cache. */ + private void evictEndpoint(String address) { + logger.log( + Level.INFO, + "Evicting idle endpoint {0}: no real traffic for {1}", + new Object[] {address, idleEvictionDuration}); + + stopProbing(address); + endpoints.remove(address); + endpointCache.evict(address); + } + + /** + * Requests that an evicted endpoint be recreated. The endpoint is created in the background and + * probing starts immediately. The endpoint will only become eligible for location-aware routing + * once it reaches READY state. + */ + void requestEndpointRecreation(String address) { + if (isShutdown.get() || address == null || address.isEmpty()) { + return; + } + if (defaultEndpointAddress.equals(address)) { + return; + } + + // Only recreate if not already managed. + if (endpoints.containsKey(address)) { + return; + } + + logger.log(Level.INFO, "Recreating previously evicted endpoint for address: {0}", address); + EndpointState state = new EndpointState(address, clock.instant()); + if (endpoints.putIfAbsent(address, state) == null) { + scheduler.submit(() -> createAndStartProbing(address)); + } + } + + /** Returns whether an endpoint is being actively managed. */ + boolean isManaged(String address) { + return endpoints.containsKey(address); + } + + /** Returns the endpoint state for testing. */ + @VisibleForTesting + EndpointState getEndpointState(String address) { + return endpoints.get(address); + } + + /** Returns the number of managed endpoints. */ + @VisibleForTesting + int managedEndpointCount() { + return endpoints.size(); + } + + /** Shuts down the lifecycle manager and all probing. */ + void shutdown() { + if (!isShutdown.compareAndSet(false, true)) { + return; + } + + logger.log(Level.INFO, "Shutting down endpoint lifecycle manager"); + + if (evictionFuture != null) { + evictionFuture.cancel(false); + } + + for (EndpointState state : endpoints.values()) { + if (state.probeFuture != null) { + state.probeFuture.cancel(false); + } + } + endpoints.clear(); + + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java index 3ee4d789592e..2b85789489d8 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java @@ -33,6 +33,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; /** * gRPC implementation of {@link ChannelEndpointCache}. @@ -44,6 +47,8 @@ @InternalApi class GrpcChannelEndpointCache implements ChannelEndpointCache { + private static final Logger logger = Logger.getLogger(GrpcChannelEndpointCache.class.getName()); + /** Timeout for graceful channel shutdown. */ private static final long SHUTDOWN_TIMEOUT_SECONDS = 5; @@ -88,14 +93,24 @@ public ChannelEndpoint get(String address) { // Create a new provider with the same config but different endpoint. // This is thread-safe as withEndpoint() returns a new provider instance. TransportChannelProvider newProvider = createProviderWithAuthorityOverride(addr); - return new GrpcChannelEndpoint(addr, newProvider); + GrpcChannelEndpoint endpoint = new GrpcChannelEndpoint(addr, newProvider); + logger.log(Level.INFO, "Location-aware endpoint created for address: {0}", addr); + return endpoint; } catch (IOException e) { + logger.log( + Level.WARNING, "Failed to create location-aware endpoint for address: " + addr, e); throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, "Failed to create channel for address: " + addr, e); } }); } + @Override + @Nullable + public ChannelEndpoint getIfPresent(String address) { + return servers.get(address); + } + private TransportChannelProvider createProviderWithAuthorityOverride(String address) { InstantiatingGrpcChannelProvider endpointProvider = (InstantiatingGrpcChannelProvider) baseProvider.withEndpoint(address); @@ -210,13 +225,38 @@ public boolean isHealthy() { return false; } // Check connectivity state without triggering a connection attempt. + // Only READY channels are considered healthy for location-aware routing. // Some channel implementations don't support getState(), in which case - // we assume the channel is healthy if it's not shutdown/terminated. + // we treat the endpoint as not ready for location-aware routing (defensive). try { ConnectivityState state = channel.getState(false); - return state != ConnectivityState.SHUTDOWN && state != ConnectivityState.TRANSIENT_FAILURE; - } catch (UnsupportedOperationException ignore) { - return true; + boolean ready = state == ConnectivityState.READY; + if (!ready) { + logger.log( + Level.FINE, + "Location-aware endpoint {0} is not ready for location-aware routing, state: {1}", + new Object[] {address, state}); + } + return ready; + } catch (UnsupportedOperationException e) { + logger.log( + Level.WARNING, + "getState(false) unsupported for location-aware endpoint {0}, treating as not ready", + address); + return false; + } + } + + @Override + public boolean isTransientFailure() { + if (channel.isShutdown() || channel.isTerminated()) { + return false; + } + try { + ConnectivityState state = channel.getState(false); + return state == ConnectivityState.TRANSIENT_FAILURE; + } catch (UnsupportedOperationException e) { + return false; } } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index 59fc03dfd80a..7b4baa24b76d 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -43,6 +43,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; /** @@ -55,6 +57,9 @@ */ @InternalApi final class KeyAwareChannel extends ManagedChannel { + + private static final Logger logger = Logger.getLogger(KeyAwareChannel.class.getName()); + private static final long MAX_TRACKED_READ_ONLY_TRANSACTIONS = 100_000L; private static final String STREAMING_READ_METHOD = "google.spanner.v1.Spanner/StreamingRead"; private static final String STREAMING_SQL_METHOD = @@ -67,6 +72,7 @@ final class KeyAwareChannel extends ManagedChannel { private final ManagedChannel defaultChannel; private final ChannelEndpointCache endpointCache; + @Nullable private final EndpointLifecycleManager lifecycleManager; private final String authority; private final String defaultEndpointAddress; private final Map> channelFinders = @@ -90,6 +96,11 @@ private KeyAwareChannel( this.defaultChannel = endpointCache.defaultChannel().getChannel(); this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress(); this.authority = this.defaultChannel.authority(); + // Only create lifecycle manager for production (non-factory) path. + // Factory path is used by tests with custom caches where background probing + // would interfere with test assertions. + this.lifecycleManager = + (endpointCacheFactory == null) ? new EndpointLifecycleManager(endpointCache) : null; } static KeyAwareChannel create( @@ -118,7 +129,10 @@ private ChannelFinder getOrCreateChannelFinder(String databaseId) { ref = channelFinders.get(databaseId); finder = (ref != null) ? ref.get() : null; if (finder == null) { - finder = new ChannelFinder(endpointCache); + finder = + lifecycleManager != null + ? new ChannelFinder(endpointCache, lifecycleManager) + : new ChannelFinder(endpointCache); channelFinders.put(databaseId, new SoftReference<>(finder)); } } @@ -126,14 +140,36 @@ private ChannelFinder getOrCreateChannelFinder(String databaseId) { return finder; } + /** Captures the session name for probe use and records real traffic to the selected endpoint. */ + private void onRequestRouted( + @Nullable String session, @Nullable ChannelEndpoint selectedEndpoint) { + if (lifecycleManager == null) { + return; + } + // Capture session name for lifecycle manager probes. + if (session != null && !session.isEmpty()) { + lifecycleManager.setMultiplexedSessionName(session); + } + // Record real traffic for idle eviction tracking. + if (selectedEndpoint != null && !defaultEndpointAddress.equals(selectedEndpoint.getAddress())) { + lifecycleManager.recordRealTraffic(selectedEndpoint.getAddress()); + } + } + @Override public ManagedChannel shutdown() { + if (lifecycleManager != null) { + lifecycleManager.shutdown(); + } endpointCache.shutdown(); return this; } @Override public ManagedChannel shutdownNow() { + if (lifecycleManager != null) { + lifecycleManager.shutdown(); + } endpointCache.shutdown(); return this; } @@ -186,7 +222,23 @@ private ChannelEndpoint affinityEndpoint(ByteString transactionId) { if (address == null) { return null; } - return endpointCache.get(address); + // Use non-creating lookup and require READY state for location-aware routing. + ChannelEndpoint endpoint = endpointCache.getIfPresent(address); + if (endpoint == null) { + logger.log( + Level.FINE, + "Affinity endpoint for address {0} not present in cache, falling back to default", + address); + return null; + } + if (!endpoint.isHealthy()) { + logger.log( + Level.FINE, + "Affinity endpoint for address {0} not READY, falling back to default", + address); + return null; + } + return endpoint; } private void clearAffinity(ByteString transactionId) { @@ -411,6 +463,10 @@ public void sendMessage(RequestT message) { selectedEndpoint = endpoint; this.channelFinder = finder; + // Record session name for probing and real traffic for idle eviction. + String session = extractSessionFromMessage(message); + parentChannel.onRequestRouted(session, endpoint); + delegate = endpoint.getChannel().newCall(methodDescriptor, callOptions); if (pendingMessageCompression != null) { delegate.setMessageCompression(pendingMessageCompression); @@ -535,6 +591,22 @@ private void drainPendingRequests() { } } + @Nullable + private static String extractSessionFromMessage(Object message) { + if (message instanceof ReadRequest) { + return ((ReadRequest) message).getSession(); + } else if (message instanceof ExecuteSqlRequest) { + return ((ExecuteSqlRequest) message).getSession(); + } else if (message instanceof BeginTransactionRequest) { + return ((BeginTransactionRequest) message).getSession(); + } else if (message instanceof CommitRequest) { + return ((CommitRequest) message).getSession(); + } else if (message instanceof RollbackRequest) { + return ((RollbackRequest) message).getSession(); + } + return null; + } + void maybeRecordAffinity(ByteString transactionId) { parentChannel.recordAffinity(transactionId, selectedEndpoint, allowDefaultAffinity); } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java index bdbd495aa58c..5cc28cc651e7 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java @@ -37,12 +37,16 @@ import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.IntStream; /** Cache for routing information used by location-aware routing. */ @InternalApi public final class KeyRangeCache { + private static final Logger logger = Logger.getLogger(KeyRangeCache.class.getName()); + private static final int MAX_LOCAL_REPLICA_DISTANCE = 5; private static final int DEFAULT_MIN_ENTRIES_FOR_RANDOM_PICK = 1000; @@ -55,6 +59,7 @@ public enum RangeMode { } private final ChannelEndpointCache endpointCache; + @javax.annotation.Nullable private final EndpointLifecycleManager lifecycleManager; private final NavigableMap ranges = new TreeMap<>(ByteString.unsignedLexicographicalComparator()); private final Map groups = new HashMap<>(); @@ -65,7 +70,14 @@ public enum RangeMode { private volatile int minCacheEntriesForRandomPick = DEFAULT_MIN_ENTRIES_FOR_RANDOM_PICK; public KeyRangeCache(ChannelEndpointCache endpointCache) { + this(endpointCache, null); + } + + public KeyRangeCache( + ChannelEndpointCache endpointCache, + @javax.annotation.Nullable EndpointLifecycleManager lifecycleManager) { this.endpointCache = Objects.requireNonNull(endpointCache); + this.lifecycleManager = lifecycleManager; } @VisibleForTesting @@ -469,21 +481,87 @@ private boolean matches(DirectedReadOptions.ReplicaSelection selection) { } } + /** + * Evaluates whether this tablet should be skipped for location-aware routing. + * + *

State-aware skip logic: + * + *

+ */ boolean shouldSkip(RoutingHint.Builder hintBuilder) { - if (skip || serverAddress.isEmpty() || (endpoint != null && !endpoint.isHealthy())) { - RoutingHint.SkippedTablet.Builder skipped = hintBuilder.addSkippedTabletUidBuilder(); - skipped.setTabletUid(tabletUid); - skipped.setIncarnation(incarnation); + // Server-marked skip or no address: always report. + if (skip || serverAddress.isEmpty()) { + addSkippedTablet(hintBuilder); + return true; + } + + // If the cached endpoint's channel has been shut down (e.g. after idle eviction), + // discard the stale reference so we re-lookup from the cache below. + if (endpoint != null && endpoint.getChannel().isShutdown()) { + logger.log( + Level.FINE, + "Tablet {0} at {1}: cached endpoint is shutdown, clearing stale reference", + new Object[] {tabletUid, serverAddress}); + endpoint = null; + } + + // Lookup without creating: location-aware routing should not trigger foreground endpoint + // creation. + if (endpoint == null) { + endpoint = endpointCache.getIfPresent(serverAddress); + } + + // No endpoint exists yet - skip silently, request background recreation so the + // endpoint becomes available for future requests. + if (endpoint == null) { + logger.log( + Level.FINE, + "Tablet {0} at {1}: no endpoint present, skipping silently", + new Object[] {tabletUid, serverAddress}); + if (lifecycleManager != null) { + lifecycleManager.requestEndpointRecreation(serverAddress); + } + return true; + } + + // READY - usable for location-aware routing. + if (endpoint.isHealthy()) { + return false; + } + + // TRANSIENT_FAILURE - skip and report so server can refresh client cache. + if (endpoint.isTransientFailure()) { + logger.log( + Level.INFO, + "Tablet {0} at {1}: endpoint in TRANSIENT_FAILURE, adding to skipped_tablets", + new Object[] {tabletUid, serverAddress}); + addSkippedTablet(hintBuilder); return true; } - return false; + + // IDLE, CONNECTING, SHUTDOWN, or unsupported - skip silently. + logger.log( + Level.FINE, + "Tablet {0} at {1}: endpoint not ready, skipping silently", + new Object[] {tabletUid, serverAddress}); + return true; + } + + private void addSkippedTablet(RoutingHint.Builder hintBuilder) { + RoutingHint.SkippedTablet.Builder skipped = hintBuilder.addSkippedTabletUidBuilder(); + skipped.setTabletUid(tabletUid); + skipped.setIncarnation(incarnation); } ChannelEndpoint pick(RoutingHint.Builder hintBuilder) { hintBuilder.setTabletUid(tabletUid); - if (endpoint == null && !serverAddress.isEmpty()) { - endpoint = endpointCache.get(serverAddress); - } + // Endpoint must already exist and be READY if shouldSkip returned false. return endpoint; } @@ -567,37 +645,18 @@ ChannelEndpoint fillRoutingHint( directedReadOptions.getReplicasCase() != DirectedReadOptions.ReplicasCase.REPLICAS_NOT_SET; - // Fast path: pick a tablet while holding the lock. If the endpoint is already - // cached on the tablet, return it immediately without releasing the lock. - // If the endpoint needs to be created (blocking network dial), release the - // lock first so other threads are not blocked during channel creation. - CachedTablet selected; + // Select a tablet while holding the lock. With state-aware routing, only READY + // endpoints pass shouldSkip(), so the selected tablet always has a cached + // endpoint. No foreground endpoint creation is needed — the lifecycle manager + // creates endpoints in the background. synchronized (this) { - selected = + CachedTablet selected = selectTabletLocked( preferLeader, hasDirectedReadOptions, hintBuilder, directedReadOptions); if (selected == null) { return null; } - if (selected.endpoint != null || selected.serverAddress.isEmpty()) { - return selected.pick(hintBuilder); - } - // Slow path: endpoint not yet created. Capture the address and release the - // lock before calling endpointCache.get(), which may block on network dial. - hintBuilder.setTabletUid(selected.tabletUid); - } - - String serverAddress = selected.serverAddress; - ChannelEndpoint endpoint = endpointCache.get(serverAddress); - - synchronized (this) { - // Only update if the tablet's address hasn't changed since we released the lock. - if (selected.endpoint == null && selected.serverAddress.equals(serverAddress)) { - selected.endpoint = endpoint; - } - // Re-set tabletUid with the latest value in case update() ran concurrently. - hintBuilder.setTabletUid(selected.tabletUid); - return selected.endpoint; + return selected.pick(hintBuilder); } } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java index 525313f1ab4e..7e9d2476346b 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java @@ -131,6 +131,12 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + public ChannelEndpoint getIfPresent(String address) { + // Auto-create for golden tests — simulates lifecycle manager having pre-created endpoints. + return endpoints.computeIfAbsent(address, FakeEndpoint::new); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -158,6 +164,11 @@ public boolean isHealthy() { return !unhealthyServers.contains(address); } + @Override + public boolean isTransientFailure() { + return unhealthyServers.contains(address); + } + @Override public ManagedChannel getChannel() { return new ManagedChannel() { diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java new file mode 100644 index 000000000000..24d92dc9c565 --- /dev/null +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java @@ -0,0 +1,304 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class EndpointLifecycleManagerTest { + + private EndpointLifecycleManager manager; + + @After + public void tearDown() { + if (manager != null) { + manager.shutdown(); + } + } + + @Test + public void endpointCreationStartsProbing() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), Clock.systemUTC()); + + manager.ensureEndpointExists("server1"); + + // Wait for background creation. + Thread.sleep(500); + + // Endpoint should be created in the cache. + assertNotNull(cache.getIfPresent("server1")); + + // Should be managed. + assertTrue(manager.isManaged("server1")); + assertNotNull(manager.getEndpointState("server1")); + assertEquals(1, manager.managedEndpointCount()); + } + + @Test + public void duplicateEnsureEndpointExistsIsNoop() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + manager.ensureEndpointExists("server1"); + manager.ensureEndpointExists("server1"); + manager.ensureEndpointExists("server1"); + + Thread.sleep(300); + + assertEquals(1, manager.managedEndpointCount()); + } + + @Test + public void defaultEndpointIsNotManaged() { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + manager.ensureEndpointExists("default"); + + assertFalse(manager.isManaged("default")); + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void probeTrafficDoesNotUpdateLastRealTrafficAt() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), clock); + + Instant creationTime = clock.instant(); + manager.ensureEndpointExists("server1"); + Thread.sleep(300); + + // Probe traffic should not change lastRealTrafficAt. + EndpointLifecycleManager.EndpointState state = manager.getEndpointState("server1"); + assertNotNull(state); + assertEquals(creationTime, state.lastRealTrafficAt); + } + + @Test + public void realRoutedTrafficUpdatesLastRealTrafficAt() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), clock); + + manager.ensureEndpointExists("server1"); + Thread.sleep(300); + + Instant before = clock.instant(); + clock.advance(Duration.ofMinutes(5)); + manager.recordRealTraffic("server1"); + + EndpointLifecycleManager.EndpointState state = manager.getEndpointState("server1"); + assertNotNull(state); + assertTrue(state.lastRealTrafficAt.isAfter(before)); + } + + @Test + public void endpointWithOnlyProbeTrafficIsEvictedAfterIdleDuration() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + Duration idleDuration = Duration.ofMinutes(30); + manager = + new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock); + + manager.ensureEndpointExists("server1"); + Thread.sleep(300); + + assertTrue(manager.isManaged("server1")); + + // Advance past idle threshold. + clock.advance(Duration.ofMinutes(31)); + + // Trigger eviction check manually. + manager.checkIdleEviction(); + + // Endpoint should be evicted. + assertFalse(manager.isManaged("server1")); + assertNull(cache.getIfPresent("server1")); + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void endpointWithRecentRealTrafficIsNotEvicted() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + Duration idleDuration = Duration.ofMinutes(30); + manager = + new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock); + + manager.ensureEndpointExists("server1"); + Thread.sleep(300); + + // Record real traffic at 20 minutes. + clock.advance(Duration.ofMinutes(20)); + manager.recordRealTraffic("server1"); + + // Advance to 31 minutes (only 11 minutes since last real traffic). + clock.advance(Duration.ofMinutes(11)); + manager.checkIdleEviction(); + + // Should NOT be evicted because last real traffic was 11 minutes ago. + assertTrue(manager.isManaged("server1")); + } + + @Test + public void evictedEndpointIsRecreatedOnDemand() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + Duration idleDuration = Duration.ofMinutes(30); + manager = + new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock); + + manager.ensureEndpointExists("server1"); + Thread.sleep(300); + + // Evict. + clock.advance(Duration.ofMinutes(31)); + manager.checkIdleEviction(); + assertFalse(manager.isManaged("server1")); + + // Recreate. + manager.requestEndpointRecreation("server1"); + Thread.sleep(500); + + assertTrue(manager.isManaged("server1")); + assertNotNull(cache.getIfPresent("server1")); + } + + @Test + public void sessionNameIsUpdatedOnRotation() { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + assertNull(manager.getMultiplexedSessionName()); + + manager.setMultiplexedSessionName("projects/p/instances/i/databases/d/sessions/s1"); + assertEquals( + "projects/p/instances/i/databases/d/sessions/s1", manager.getMultiplexedSessionName()); + + // Session rotation: second set should overwrite so probes use the current session. + manager.setMultiplexedSessionName("projects/p/instances/i/databases/d/sessions/s2"); + assertEquals( + "projects/p/instances/i/databases/d/sessions/s2", manager.getMultiplexedSessionName()); + + // Null and empty should not clear. + manager.setMultiplexedSessionName(null); + assertEquals( + "projects/p/instances/i/databases/d/sessions/s2", manager.getMultiplexedSessionName()); + manager.setMultiplexedSessionName(""); + assertEquals( + "projects/p/instances/i/databases/d/sessions/s2", manager.getMultiplexedSessionName()); + } + + @Test + public void shutdownStopsAllProbing() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), Clock.systemUTC()); + + manager.ensureEndpointExists("server1"); + manager.ensureEndpointExists("server2"); + Thread.sleep(300); + + assertEquals(2, manager.managedEndpointCount()); + + manager.shutdown(); + + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void emptyOrNullAddressIsIgnored() { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + manager.ensureEndpointExists(null); + manager.ensureEndpointExists(""); + + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void recordRealTrafficForDefaultEndpointIsIgnored() { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + // Should not throw or create state. + manager.recordRealTraffic("default"); + manager.recordRealTraffic(null); + assertEquals(0, manager.managedEndpointCount()); + } + + /** Test clock that can be advanced manually. */ + private static final class TestClock extends Clock { + private Instant now; + + TestClock(Instant now) { + this.now = now; + } + + void advance(Duration duration) { + now = now.plus(duration); + } + + @Override + public Instant instant() { + return now; + } + + @Override + public ZoneId getZone() { + return ZoneId.of("UTC"); + } + + @Override + public Clock withZone(ZoneId zone) { + return this; + } + } +} diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java index 56e6d3cfc2bc..74afec18bfc3 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java @@ -114,10 +114,13 @@ public void healthReflectsChannelShutdown() throws Exception { GrpcChannelEndpointCache cache = new GrpcChannelEndpointCache(createProvider("localhost:1234")); try { ChannelEndpoint server = cache.get("localhost:1111"); - assertThat(server.isHealthy()).isTrue(); + // Newly created channel is not READY (likely IDLE), so isHealthy is false for location aware. + // isHealthy now requires READY state for location aware routing. + assertThat(server.isHealthy()).isFalse(); server.getChannel().shutdownNow(); assertThat(server.isHealthy()).isFalse(); + assertThat(server.isTransientFailure()).isFalse(); } finally { cache.shutdown(); } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java index a4919389a85c..b323562a708c 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java @@ -196,7 +196,8 @@ public void timeoutOnCommitClearsTransactionAffinity() throws Exception { commitCall.sendMessage( CommitRequest.newBuilder().setSession(SESSION).setTransactionId(transactionId).build()); - assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(1); + // affinityEndpoint now uses getIfPresent (non-creating), so getCount stays at 0. + assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0); @SuppressWarnings("unchecked") RecordingClientCall commitDelegate = @@ -210,7 +211,8 @@ public void timeoutOnCommitClearsTransactionAffinity() throws Exception { rollbackCall.sendMessage( RollbackRequest.newBuilder().setSession(SESSION).setTransactionId(transactionId).build()); - assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(1); + // Rollback also uses getIfPresent for affinity, so getCount remains 0. + assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0); } @Test @@ -1078,6 +1080,16 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + public ChannelEndpoint getIfPresent(String address) { + if (defaultAddress.equals(address)) { + return defaultEndpoint; + } + // Auto-create for integration tests — simulates lifecycle manager having pre-created + // endpoints. + return endpoints.computeIfAbsent(address, FakeEndpoint::new); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -1139,6 +1151,11 @@ public boolean isHealthy() { return true; } + @Override + public boolean isTransientFailure() { + return false; + } + @Override public ManagedChannel getChannel() { return channel; diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java index 763a36dbfd64..7fa2874ada5b 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java @@ -125,6 +125,12 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + public ChannelEndpoint getIfPresent(String address) { + // Auto-create for golden tests — simulates lifecycle manager having pre-created endpoints. + return endpoints.computeIfAbsent(address, FakeEndpoint::new); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -154,6 +160,11 @@ public boolean isHealthy() { return true; } + @Override + public boolean isTransientFailure() { + return false; + } + @Override public ManagedChannel getChannel() { return channel; diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java index 2405aa7a062b..9ba48cb344ec 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java @@ -18,6 +18,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import com.google.protobuf.ByteString; import com.google.spanner.v1.CacheUpdate; @@ -33,6 +35,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -41,53 +44,33 @@ public class KeyRangeCacheTest { @Test - public void skipsUnhealthyTabletAfterItIsCached() { + public void skipsTransientFailureTabletWithSkippedTablet() { FakeEndpointCache endpointCache = new FakeEndpointCache(); KeyRangeCache cache = new KeyRangeCache(endpointCache); - cache.addRanges( - CacheUpdate.newBuilder() - .addRange( - Range.newBuilder() - .setStartKey(bytes("a")) - .setLimitKey(bytes("z")) - .setGroupUid(5) - .setSplitId(1) - .setGeneration(bytes("1"))) - .addGroup( - Group.newBuilder() - .setGroupUid(5) - .setGeneration(bytes("1")) - .setLeaderIndex(0) - .addTablets( - Tablet.newBuilder() - .setTabletUid(1) - .setServerAddress("server1") - .setIncarnation(bytes("1")) - .setDistance(0)) - .addTablets( - Tablet.newBuilder() - .setTabletUid(2) - .setServerAddress("server2") - .setIncarnation(bytes("1")) - .setDistance(0))) - .build()); + cache.addRanges(twoReplicaUpdate()); + // Pre-create endpoints. + endpointCache.get("server1"); + endpointCache.get("server2"); + + // Initial routing works. RoutingHint.Builder initialHint = RoutingHint.newBuilder().setKey(bytes("a")); ChannelEndpoint initialServer = cache.fillRoutingHint( - /* preferLeader= */ false, + false, KeyRangeCache.RangeMode.COVERING_SPLIT, DirectedReadOptions.getDefaultInstance(), initialHint); assertNotNull(initialServer); - endpointCache.setHealthy("server1", false); + // Mark server1 as TRANSIENT_FAILURE. + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); ChannelEndpoint server = cache.fillRoutingHint( - /* preferLeader= */ false, + false, KeyRangeCache.RangeMode.COVERING_SPLIT, DirectedReadOptions.getDefaultInstance(), hint); @@ -125,6 +108,8 @@ public void shrinkToEvictsRanges() { .setIncarnation(bytes("1")))) .build(); cache.addRanges(update); + // Pre-create endpoint so READY state check passes in shouldSkip. + endpointCache.get("server" + i); } checkContents(cache, numRanges, numRanges); @@ -140,6 +125,425 @@ public void shrinkToEvictsRanges() { checkContents(cache, 0, numRanges); } + @Test + public void readyEndpointIsUsableForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + // Pre-create endpoint so getIfPresent finds it. Default state is READY. + endpointCache.get("server1"); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server1", server.getAddress()); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void idleEndpointIsNotUsableForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + // Ensure endpoint exists in cache first. + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.IDLE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // IDLE causes silent skip — falls back to null (default host), no skipped_tablets. + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void connectingEndpointIsNotUsableForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.CONNECTING); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void transientFailureEndpointIsNotUsable() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // TRANSIENT_FAILURE: skip with skipped_tablets. + assertNull(server); + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); + } + + @Test + public void unsupportedGetStateTreatedAsNotReady() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.UNSUPPORTED); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // Unsupported state: skip silently, no skipped_tablets. + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void missingEndpointCausesDefaultHostFallbackWithoutSkippedTablet() { + // Endpoint not in cache at all — getIfPresent returns null. + FakeEndpointCache endpointCache = new FakeEndpointCache(); + endpointCache.setCreateOnGet(false); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void idleEndpointCausesDefaultHostFallbackWithoutSkippedTablet() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.IDLE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void connectingEndpointCausesDefaultHostFallbackWithoutSkippedTablet() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.CONNECTING); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void transientFailureEndpointCausesSkippedTabletPlusDefaultHostFallback() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); + } + + @Test + public void oneUnusableReplicaAndOneReadyReplicaUsesReady() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(twoReplicaUpdate()); + + // Make both endpoints present. + endpointCache.get("server1"); + endpointCache.get("server2"); + + // server1 is IDLE (not ready), server2 is READY. + endpointCache.setState("server1", EndpointHealthState.IDLE); + endpointCache.setState("server2", EndpointHealthState.READY); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server2", server.getAddress()); + // server1 was IDLE, so no skipped_tablets for it. + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void readyEndpointIsUsedForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.READY); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server1", server.getAddress()); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void transientFailureReplicaSkippedAndReadyReplicaSelected() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(twoReplicaUpdate()); + + endpointCache.get("server1"); + endpointCache.get("server2"); + + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); + endpointCache.setState("server2", EndpointHealthState.READY); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server2", server.getAddress()); + // server1 was TRANSIENT_FAILURE, so it should be in skipped_tablets. + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); + } + + // --- Eviction and recreation tests --- + + @Test + public void staleShutdownEndpointIsClearedAndRelookedUp() { + // Bug 1 regression: after idle eviction shuts down a channel, the tablet's cached + // endpoint reference becomes stale. shouldSkip must detect the shutdown channel, + // discard it, and re-lookup from the cache. + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + // Route once so the tablet caches the endpoint reference. + endpointCache.get("server1"); + RoutingHint.Builder hint1 = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint first = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint1); + assertNotNull(first); + assertEquals("server1", first.getAddress()); + + // Simulate idle eviction: shut down the channel and evict from cache. + first.getChannel().shutdownNow(); + endpointCache.evict("server1"); + + // Without the fix, the tablet would keep using the stale shutdown endpoint forever. + // With the fix, shouldSkip detects the shutdown, clears it, and re-lookups from cache. + + // Re-create the endpoint (simulating lifecycle manager recreation). + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.READY); + + RoutingHint.Builder hint2 = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint second = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint2); + + // Should find the new READY endpoint. + assertNotNull(second); + assertEquals("server1", second.getAddress()); + assertEquals(0, hint2.getSkippedTabletUidCount()); + } + + @Test + public void missingEndpointTriggersRecreationViaLifecycleManager() { + // Bug 2 regression: when a routing lookup finds no endpoint, it should call + // requestEndpointRecreation so the endpoint becomes available for future requests. + FakeEndpointCache endpointCache = new FakeEndpointCache(); + TrackingLifecycleManager tracking = new TrackingLifecycleManager(endpointCache); + try { + KeyRangeCache cache = new KeyRangeCache(endpointCache, tracking); + cache.addRanges(singleReplicaUpdate("server1")); + + // No endpoint exists in cache. + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // Should fall back to default. + assertNull(server); + + // Lifecycle manager should have been asked to recreate the endpoint. + assertTrue( + "requestEndpointRecreation should have been called for server1", + tracking.recreationRequested.contains("server1")); + } finally { + tracking.shutdown(); + } + } + + /** Minimal lifecycle manager stub that records recreation requests. */ + private static final class TrackingLifecycleManager extends EndpointLifecycleManager { + final java.util.Set recreationRequested = new java.util.HashSet<>(); + + TrackingLifecycleManager(ChannelEndpointCache cache) { + super(cache); + } + + @Override + void requestEndpointRecreation(String address) { + recreationRequested.add(address); + } + } + + // --- Helper methods --- + + private static CacheUpdate singleReplicaUpdate(String serverAddress) { + return CacheUpdate.newBuilder() + .addRange( + Range.newBuilder() + .setStartKey(bytes("a")) + .setLimitKey(bytes("z")) + .setGroupUid(5) + .setSplitId(1) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(5) + .setGeneration(bytes("1")) + .setLeaderIndex(0) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1) + .setServerAddress(serverAddress) + .setIncarnation(bytes("1")) + .setDistance(0))) + .build(); + } + + private static CacheUpdate twoReplicaUpdate() { + return CacheUpdate.newBuilder() + .addRange( + Range.newBuilder() + .setStartKey(bytes("a")) + .setLimitKey(bytes("z")) + .setGroupUid(5) + .setSplitId(1) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(5) + .setGeneration(bytes("1")) + .setLeaderIndex(0) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1) + .setServerAddress("server1") + .setIncarnation(bytes("1")) + .setDistance(0)) + .addTablets( + Tablet.newBuilder() + .setTabletUid(2) + .setServerAddress("server2") + .setIncarnation(bytes("1")) + .setDistance(0))) + .build(); + } + private static void checkContents(KeyRangeCache cache, int expectedSize, int mustBeInCache) { assertEquals(expectedSize, cache.size()); int hitCount = 0; @@ -147,7 +551,7 @@ private static void checkContents(KeyRangeCache cache, int expectedSize, int mus RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes(String.format("%04d", i))); ChannelEndpoint server = cache.fillRoutingHint( - /* preferLeader= */ false, + false, KeyRangeCache.RangeMode.COVERING_SPLIT, DirectedReadOptions.getDefaultInstance(), hint); @@ -166,9 +570,23 @@ private static ByteString bytes(String value) { return ByteString.copyFromUtf8(value); } - private static final class FakeEndpointCache implements ChannelEndpointCache { + // --- Health state for testing --- + + enum EndpointHealthState { + READY, + IDLE, + CONNECTING, + TRANSIENT_FAILURE, + SHUTDOWN, + UNSUPPORTED + } + + // --- Test doubles --- + + static final class FakeEndpointCache implements ChannelEndpointCache { private final Map endpoints = new HashMap<>(); private final FakeEndpoint defaultEndpoint = new FakeEndpoint("default"); + private boolean createOnGet = true; @Override public ChannelEndpoint defaultChannel() { @@ -180,6 +598,12 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + @Nullable + public ChannelEndpoint getIfPresent(String address) { + return endpoints.get(address); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -190,18 +614,28 @@ public void shutdown() { endpoints.clear(); } - void setHealthy(String address, boolean healthy) { + void setCreateOnGet(boolean createOnGet) { + this.createOnGet = createOnGet; + } + + void setState(String address, EndpointHealthState state) { FakeEndpoint endpoint = endpoints.get(address); if (endpoint != null) { - endpoint.setHealthy(healthy); + endpoint.setState(state); } } + + @Deprecated + void setHealthy(String address, boolean healthy) { + setState( + address, healthy ? EndpointHealthState.READY : EndpointHealthState.TRANSIENT_FAILURE); + } } - private static final class FakeEndpoint implements ChannelEndpoint { + static final class FakeEndpoint implements ChannelEndpoint { private final String address; private final ManagedChannel channel = new FakeManagedChannel(); - private boolean healthy = true; + private EndpointHealthState state = EndpointHealthState.READY; FakeEndpoint(String address) { this.address = address; @@ -214,7 +648,12 @@ public String getAddress() { @Override public boolean isHealthy() { - return healthy; + return state == EndpointHealthState.READY; + } + + @Override + public boolean isTransientFailure() { + return state == EndpointHealthState.TRANSIENT_FAILURE; } @Override @@ -222,8 +661,8 @@ public ManagedChannel getChannel() { return channel; } - void setHealthy(boolean healthy) { - this.healthy = healthy; + void setState(EndpointHealthState state) { + this.state = state; } }