From d436d70129cfdf47de28c690e11a5550204f30d5 Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Sat, 4 Apr 2026 09:05:53 +0530 Subject: [PATCH] fix(spanner): enforce READY-only location aware routing and add endpoint lifecycle management Location aware routing previously treated IDLE and CONNECTING channels as healthy, which could send traffic to stale replicas after cache updates. This change tightens endpoint readiness to READY-only, adds state-aware skipped_tablets reporting (TRANSIENT_FAILURE only), and introduces a background lifecycle manager that probes endpoints with GetSession to keep channels warm and evicts idle endpoints after 30 minutes of no real traffic. --- .../cloud/spanner/spi/v1/ChannelEndpoint.java | 24 +- .../spanner/spi/v1/ChannelEndpointCache.java | 13 + .../cloud/spanner/spi/v1/ChannelFinder.java | 25 +- .../spi/v1/EndpointLifecycleManager.java | 449 +++++++++++++++ .../spi/v1/GrpcChannelEndpointCache.java | 50 +- .../cloud/spanner/spi/v1/KeyAwareChannel.java | 76 ++- .../cloud/spanner/spi/v1/KeyRangeCache.java | 125 +++-- .../spi/v1/ChannelFinderGoldenTest.java | 11 + .../spi/v1/EndpointLifecycleManagerTest.java | 304 ++++++++++ .../spi/v1/GrpcChannelEndpointCacheTest.java | 5 +- .../spanner/spi/v1/KeyAwareChannelTest.java | 21 +- .../spi/v1/KeyRangeCacheGoldenTest.java | 11 + .../spanner/spi/v1/KeyRangeCacheTest.java | 519 ++++++++++++++++-- 13 files changed, 1541 insertions(+), 92 deletions(-) create mode 100644 java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java create mode 100644 java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java index cd6b386dc8a7..fc82c530fc6f 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java @@ -41,19 +41,27 @@ public interface ChannelEndpoint { String getAddress(); /** - * Returns whether this server is ready to accept RPCs. + * Returns whether this server's channel is in {@code READY} state and can accept location-aware + * RPCs. * - *

A server is considered unhealthy if: + *

Only endpoints in {@code READY} state are eligible for location-aware routing. Endpoints in + * {@code IDLE}, {@code CONNECTING}, {@code TRANSIENT_FAILURE}, or {@code SHUTDOWN} are not + * considered healthy for location-aware routing purposes. * - *

- * - * @return true if the server is healthy and ready to accept RPCs + * @return true if the channel is in READY state */ boolean isHealthy(); + /** + * Returns whether this server's channel is in {@code TRANSIENT_FAILURE} state. + * + *

When an endpoint is in transient failure, it should be reported as a skipped tablet in + * routing hints so the server can refresh the client cache. + * + * @return true if the channel is in TRANSIENT_FAILURE state + */ + boolean isTransientFailure(); + /** * Returns the gRPC channel for making RPCs to this server. * diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java index 879ed546f2c2..db2af4902f84 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java @@ -54,6 +54,19 @@ public interface ChannelEndpointCache { */ ChannelEndpoint get(String address); + /** + * Returns a cached channel for the given address without creating it. + * + *

Unlike {@link #get(String)}, this method does not create a new endpoint if one does not + * already exist in the cache. This is used by location-aware routing to avoid foreground endpoint + * creation on the request path. + * + * @param address the server address in "host:port" format + * @return the cached channel instance, or null if no endpoint exists for this address + */ + @javax.annotation.Nullable + ChannelEndpoint getIfPresent(String address); + /** * Evicts a server connection from the cache and gracefully shuts down its channel. * diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java index 4ced18eb9203..0fefd93fd180 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java @@ -22,9 +22,11 @@ import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.DirectedReadOptions; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.Group; import com.google.spanner.v1.Mutation; import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RoutingHint; +import com.google.spanner.v1.Tablet; import com.google.spanner.v1.TransactionOptions; import com.google.spanner.v1.TransactionSelector; import java.util.ArrayList; @@ -32,6 +34,7 @@ import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; /** * Finds a server for a request using location-aware routing metadata. @@ -44,9 +47,16 @@ public final class ChannelFinder { private final AtomicLong databaseId = new AtomicLong(); private final KeyRecipeCache recipeCache = new KeyRecipeCache(); private final KeyRangeCache rangeCache; + @Nullable private final EndpointLifecycleManager lifecycleManager; public ChannelFinder(ChannelEndpointCache endpointCache) { - this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache)); + this(endpointCache, null); + } + + public ChannelFinder( + ChannelEndpointCache endpointCache, @Nullable EndpointLifecycleManager lifecycleManager) { + this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache), lifecycleManager); + this.lifecycleManager = lifecycleManager; } void useDeterministicRandom() { @@ -67,6 +77,19 @@ public void update(CacheUpdate update) { recipeCache.addRecipes(update.getKeyRecipes()); } rangeCache.addRanges(update); + + // Notify the lifecycle manager about server addresses so it can create endpoints + // in the background and start probing. + if (lifecycleManager != null) { + for (Group group : update.getGroupList()) { + for (Tablet tablet : group.getTabletsList()) { + String addr = tablet.getServerAddress(); + if (!addr.isEmpty()) { + lifecycleManager.ensureEndpointExists(addr); + } + } + } + } } } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java new file mode 100644 index 000000000000..7e3a317562ee --- /dev/null +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java @@ -0,0 +1,449 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import com.google.api.core.InternalApi; +import com.google.common.annotations.VisibleForTesting; +import com.google.spanner.v1.GetSessionRequest; +import com.google.spanner.v1.SpannerGrpc; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.Status; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Manages the lifecycle of location-aware routing endpoints including background probing, traffic + * tracking, and idle eviction. + * + *

This manager is the only component that proactively creates routed replica endpoints. It: + * + *

+ */ +@InternalApi +class EndpointLifecycleManager { + + private static final Logger logger = Logger.getLogger(EndpointLifecycleManager.class.getName()); + + /** Default probe interval: 60 seconds. Keeps channels from drifting into IDLE. */ + @VisibleForTesting static final long DEFAULT_PROBE_INTERVAL_SECONDS = 60; + + /** Default idle eviction threshold: 30 minutes without real traffic. */ + @VisibleForTesting static final Duration DEFAULT_IDLE_EVICTION_DURATION = Duration.ofMinutes(30); + + /** Interval for checking idle eviction: every 5 minutes. */ + private static final long EVICTION_CHECK_INTERVAL_SECONDS = 300; + + /** Timeout for probe RPCs. */ + private static final long PROBE_TIMEOUT_SECONDS = 10; + + /** Per-endpoint lifecycle state. */ + static final class EndpointState { + final String address; + volatile Instant lastProbeAt; + volatile Instant lastRealTrafficAt; + volatile Instant lastReadyAt; + volatile ScheduledFuture probeFuture; + + EndpointState(String address, Instant now) { + this.address = address; + this.lastRealTrafficAt = now; + this.lastProbeAt = null; + this.lastReadyAt = null; + } + } + + private final ChannelEndpointCache endpointCache; + private final Map endpoints = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final long probeIntervalSeconds; + private final Duration idleEvictionDuration; + private final Clock clock; + private final String defaultEndpointAddress; + + private volatile String multiplexedSessionName; + private ScheduledFuture evictionFuture; + + EndpointLifecycleManager(ChannelEndpointCache endpointCache) { + this( + endpointCache, + DEFAULT_PROBE_INTERVAL_SECONDS, + DEFAULT_IDLE_EVICTION_DURATION, + Clock.systemUTC()); + } + + @VisibleForTesting + EndpointLifecycleManager( + ChannelEndpointCache endpointCache, + long probeIntervalSeconds, + Duration idleEvictionDuration, + Clock clock) { + this.endpointCache = endpointCache; + this.probeIntervalSeconds = probeIntervalSeconds; + this.idleEvictionDuration = idleEvictionDuration; + this.clock = clock; + this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress(); + this.scheduler = + Executors.newScheduledThreadPool( + 1, + r -> { + Thread t = new Thread(r, "spanner-endpoint-lifecycle"); + t.setDaemon(true); + return t; + }); + + // Start periodic eviction checks. + this.evictionFuture = + scheduler.scheduleAtFixedRate( + this::checkIdleEviction, + EVICTION_CHECK_INTERVAL_SECONDS, + EVICTION_CHECK_INTERVAL_SECONDS, + TimeUnit.SECONDS); + } + + /** + * Sets the multiplexed session name used for GetSession probes. All probers reuse the same + * session since multiplexed sessions are enabled. + * + *

The session name is updated on every call because the client may rotate multiplexed sessions + * over time. Probes must always use the current session to avoid sending GetSession to a stale, + * deleted session. + */ + void setMultiplexedSessionName(String sessionName) { + if (sessionName == null || sessionName.isEmpty()) { + return; + } + String previous = this.multiplexedSessionName; + this.multiplexedSessionName = sessionName; + if (previous == null) { + logger.log( + Level.FINE, "Lifecycle manager captured session name for probing: {0}", sessionName); + } else if (!previous.equals(sessionName)) { + logger.log( + Level.FINE, + "Lifecycle manager updated session name for probing: {0} -> {1}", + new Object[] {previous, sessionName}); + } + } + + /** Returns the multiplexed session name, or null if not yet captured. */ + String getMultiplexedSessionName() { + return multiplexedSessionName; + } + + /** + * Ensures an endpoint exists for the given address. If the endpoint does not exist, creates it in + * the background and starts probing. If it already exists, this is a no-op. + * + *

This is called from the cache update path when new server addresses appear. + */ + void ensureEndpointExists(String address) { + if (isShutdown.get() || address == null || address.isEmpty()) { + return; + } + // Don't manage the default endpoint. + if (defaultEndpointAddress.equals(address)) { + return; + } + + endpoints.computeIfAbsent( + address, + addr -> { + logger.log(Level.INFO, "Scheduling background endpoint creation for address: {0}", addr); + EndpointState state = new EndpointState(addr, clock.instant()); + scheduler.submit(() -> createAndStartProbing(addr)); + return state; + }); + } + + /** + * Records that real (non-probe) traffic was routed to an endpoint. This refreshes the idle + * eviction timer for this endpoint. + */ + void recordRealTraffic(String address) { + if (address == null || defaultEndpointAddress.equals(address)) { + return; + } + EndpointState state = endpoints.get(address); + if (state != null) { + state.lastRealTrafficAt = clock.instant(); + } + } + + /** Creates an endpoint and starts probing. Runs on the scheduler thread. */ + private void createAndStartProbing(String address) { + if (isShutdown.get()) { + return; + } + try { + endpointCache.get(address); + logger.log(Level.INFO, "Background endpoint creation completed for: {0}", address); + startProbing(address); + } catch (Exception e) { + logger.log( + Level.WARNING, "Failed to create endpoint for address: " + address + ", will retry", e); + // Schedule a retry after one probe interval. + if (!isShutdown.get()) { + scheduler.schedule( + () -> createAndStartProbing(address), probeIntervalSeconds, TimeUnit.SECONDS); + } + } + } + + /** Starts periodic probing for an endpoint. */ + private void startProbing(String address) { + EndpointState state = endpoints.get(address); + if (state == null || isShutdown.get()) { + return; + } + + // Cancel any existing probe schedule. + if (state.probeFuture != null) { + state.probeFuture.cancel(false); + } + + state.probeFuture = + scheduler.scheduleAtFixedRate( + () -> probe(address), 0, probeIntervalSeconds, TimeUnit.SECONDS); + logger.log( + Level.INFO, + "Prober started for endpoint {0} with interval {1}s", + new Object[] {address, probeIntervalSeconds}); + } + + /** Stops probing for an endpoint. */ + private void stopProbing(String address) { + EndpointState state = endpoints.get(address); + if (state != null && state.probeFuture != null) { + state.probeFuture.cancel(false); + state.probeFuture = null; + logger.log(Level.INFO, "Prober stopped for endpoint: {0}", address); + } + } + + /** Sends a GetSession probe to the endpoint. */ + private void probe(String address) { + if (isShutdown.get()) { + return; + } + + String sessionName = multiplexedSessionName; + if (sessionName == null || sessionName.isEmpty()) { + logger.log( + Level.FINE, + "Skipping probe for {0}: multiplexed session name not yet available", + address); + // Even without a session, request a connection to keep the channel from going idle. + ChannelEndpoint endpoint = endpointCache.getIfPresent(address); + if (endpoint != null) { + try { + endpoint.getChannel().getState(true); + } catch (Exception ignored) { + // Best effort. + } + } + return; + } + + ChannelEndpoint endpoint = endpointCache.getIfPresent(address); + if (endpoint == null) { + logger.log(Level.FINE, "Probe skipped for {0}: endpoint not in cache", address); + return; + } + + EndpointState state = endpoints.get(address); + if (state == null) { + return; + } + + ManagedChannel channel = endpoint.getChannel(); + GetSessionRequest request = GetSessionRequest.newBuilder().setName(sessionName).build(); + + try { + ClientCall call = + channel.newCall( + SpannerGrpc.getGetSessionMethod(), + CallOptions.DEFAULT.withDeadlineAfter(PROBE_TIMEOUT_SECONDS, TimeUnit.SECONDS)); + + call.start( + new ClientCall.Listener() { + @Override + public void onMessage(com.google.spanner.v1.Session message) { + state.lastProbeAt = clock.instant(); + if (endpoint.isHealthy()) { + state.lastReadyAt = clock.instant(); + } + logger.log(Level.FINE, "Probe succeeded for endpoint: {0}", address); + } + + @Override + public void onClose(Status status, Metadata trailers) { + state.lastProbeAt = clock.instant(); + if (!status.isOk()) { + logger.log( + Level.WARNING, + "Probe failed for endpoint {0}: {1}", + new Object[] {address, status}); + } + } + }, + new Metadata()); + + call.sendMessage(request); + call.halfClose(); + call.request(1); + } catch (Exception e) { + state.lastProbeAt = clock.instant(); + logger.log(Level.WARNING, "Probe exception for endpoint " + address, e); + } + } + + /** Checks all managed endpoints for idle eviction. */ + @VisibleForTesting + void checkIdleEviction() { + if (isShutdown.get()) { + return; + } + + Instant now = clock.instant(); + List toEvict = new ArrayList<>(); + + for (Map.Entry entry : endpoints.entrySet()) { + String address = entry.getKey(); + EndpointState state = entry.getValue(); + + // Never evict the default endpoint. + if (defaultEndpointAddress.equals(address)) { + continue; + } + + Duration sinceLastRealTraffic = Duration.between(state.lastRealTrafficAt, now); + if (sinceLastRealTraffic.compareTo(idleEvictionDuration) > 0) { + toEvict.add(address); + } + } + + for (String address : toEvict) { + evictEndpoint(address); + } + } + + /** Evicts an endpoint: stops probing, shuts down the channel pool, removes from cache. */ + private void evictEndpoint(String address) { + logger.log( + Level.INFO, + "Evicting idle endpoint {0}: no real traffic for {1}", + new Object[] {address, idleEvictionDuration}); + + stopProbing(address); + endpoints.remove(address); + endpointCache.evict(address); + } + + /** + * Requests that an evicted endpoint be recreated. The endpoint is created in the background and + * probing starts immediately. The endpoint will only become eligible for location-aware routing + * once it reaches READY state. + */ + void requestEndpointRecreation(String address) { + if (isShutdown.get() || address == null || address.isEmpty()) { + return; + } + if (defaultEndpointAddress.equals(address)) { + return; + } + + // Only recreate if not already managed. + if (endpoints.containsKey(address)) { + return; + } + + logger.log(Level.INFO, "Recreating previously evicted endpoint for address: {0}", address); + EndpointState state = new EndpointState(address, clock.instant()); + if (endpoints.putIfAbsent(address, state) == null) { + scheduler.submit(() -> createAndStartProbing(address)); + } + } + + /** Returns whether an endpoint is being actively managed. */ + boolean isManaged(String address) { + return endpoints.containsKey(address); + } + + /** Returns the endpoint state for testing. */ + @VisibleForTesting + EndpointState getEndpointState(String address) { + return endpoints.get(address); + } + + /** Returns the number of managed endpoints. */ + @VisibleForTesting + int managedEndpointCount() { + return endpoints.size(); + } + + /** Shuts down the lifecycle manager and all probing. */ + void shutdown() { + if (!isShutdown.compareAndSet(false, true)) { + return; + } + + logger.log(Level.INFO, "Shutting down endpoint lifecycle manager"); + + if (evictionFuture != null) { + evictionFuture.cancel(false); + } + + for (EndpointState state : endpoints.values()) { + if (state.probeFuture != null) { + state.probeFuture.cancel(false); + } + } + endpoints.clear(); + + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java index 3ee4d789592e..2b85789489d8 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java @@ -33,6 +33,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; /** * gRPC implementation of {@link ChannelEndpointCache}. @@ -44,6 +47,8 @@ @InternalApi class GrpcChannelEndpointCache implements ChannelEndpointCache { + private static final Logger logger = Logger.getLogger(GrpcChannelEndpointCache.class.getName()); + /** Timeout for graceful channel shutdown. */ private static final long SHUTDOWN_TIMEOUT_SECONDS = 5; @@ -88,14 +93,24 @@ public ChannelEndpoint get(String address) { // Create a new provider with the same config but different endpoint. // This is thread-safe as withEndpoint() returns a new provider instance. TransportChannelProvider newProvider = createProviderWithAuthorityOverride(addr); - return new GrpcChannelEndpoint(addr, newProvider); + GrpcChannelEndpoint endpoint = new GrpcChannelEndpoint(addr, newProvider); + logger.log(Level.INFO, "Location-aware endpoint created for address: {0}", addr); + return endpoint; } catch (IOException e) { + logger.log( + Level.WARNING, "Failed to create location-aware endpoint for address: " + addr, e); throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, "Failed to create channel for address: " + addr, e); } }); } + @Override + @Nullable + public ChannelEndpoint getIfPresent(String address) { + return servers.get(address); + } + private TransportChannelProvider createProviderWithAuthorityOverride(String address) { InstantiatingGrpcChannelProvider endpointProvider = (InstantiatingGrpcChannelProvider) baseProvider.withEndpoint(address); @@ -210,13 +225,38 @@ public boolean isHealthy() { return false; } // Check connectivity state without triggering a connection attempt. + // Only READY channels are considered healthy for location-aware routing. // Some channel implementations don't support getState(), in which case - // we assume the channel is healthy if it's not shutdown/terminated. + // we treat the endpoint as not ready for location-aware routing (defensive). try { ConnectivityState state = channel.getState(false); - return state != ConnectivityState.SHUTDOWN && state != ConnectivityState.TRANSIENT_FAILURE; - } catch (UnsupportedOperationException ignore) { - return true; + boolean ready = state == ConnectivityState.READY; + if (!ready) { + logger.log( + Level.FINE, + "Location-aware endpoint {0} is not ready for location-aware routing, state: {1}", + new Object[] {address, state}); + } + return ready; + } catch (UnsupportedOperationException e) { + logger.log( + Level.WARNING, + "getState(false) unsupported for location-aware endpoint {0}, treating as not ready", + address); + return false; + } + } + + @Override + public boolean isTransientFailure() { + if (channel.isShutdown() || channel.isTerminated()) { + return false; + } + try { + ConnectivityState state = channel.getState(false); + return state == ConnectivityState.TRANSIENT_FAILURE; + } catch (UnsupportedOperationException e) { + return false; } } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index 59fc03dfd80a..7b4baa24b76d 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -43,6 +43,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; /** @@ -55,6 +57,9 @@ */ @InternalApi final class KeyAwareChannel extends ManagedChannel { + + private static final Logger logger = Logger.getLogger(KeyAwareChannel.class.getName()); + private static final long MAX_TRACKED_READ_ONLY_TRANSACTIONS = 100_000L; private static final String STREAMING_READ_METHOD = "google.spanner.v1.Spanner/StreamingRead"; private static final String STREAMING_SQL_METHOD = @@ -67,6 +72,7 @@ final class KeyAwareChannel extends ManagedChannel { private final ManagedChannel defaultChannel; private final ChannelEndpointCache endpointCache; + @Nullable private final EndpointLifecycleManager lifecycleManager; private final String authority; private final String defaultEndpointAddress; private final Map> channelFinders = @@ -90,6 +96,11 @@ private KeyAwareChannel( this.defaultChannel = endpointCache.defaultChannel().getChannel(); this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress(); this.authority = this.defaultChannel.authority(); + // Only create lifecycle manager for production (non-factory) path. + // Factory path is used by tests with custom caches where background probing + // would interfere with test assertions. + this.lifecycleManager = + (endpointCacheFactory == null) ? new EndpointLifecycleManager(endpointCache) : null; } static KeyAwareChannel create( @@ -118,7 +129,10 @@ private ChannelFinder getOrCreateChannelFinder(String databaseId) { ref = channelFinders.get(databaseId); finder = (ref != null) ? ref.get() : null; if (finder == null) { - finder = new ChannelFinder(endpointCache); + finder = + lifecycleManager != null + ? new ChannelFinder(endpointCache, lifecycleManager) + : new ChannelFinder(endpointCache); channelFinders.put(databaseId, new SoftReference<>(finder)); } } @@ -126,14 +140,36 @@ private ChannelFinder getOrCreateChannelFinder(String databaseId) { return finder; } + /** Captures the session name for probe use and records real traffic to the selected endpoint. */ + private void onRequestRouted( + @Nullable String session, @Nullable ChannelEndpoint selectedEndpoint) { + if (lifecycleManager == null) { + return; + } + // Capture session name for lifecycle manager probes. + if (session != null && !session.isEmpty()) { + lifecycleManager.setMultiplexedSessionName(session); + } + // Record real traffic for idle eviction tracking. + if (selectedEndpoint != null && !defaultEndpointAddress.equals(selectedEndpoint.getAddress())) { + lifecycleManager.recordRealTraffic(selectedEndpoint.getAddress()); + } + } + @Override public ManagedChannel shutdown() { + if (lifecycleManager != null) { + lifecycleManager.shutdown(); + } endpointCache.shutdown(); return this; } @Override public ManagedChannel shutdownNow() { + if (lifecycleManager != null) { + lifecycleManager.shutdown(); + } endpointCache.shutdown(); return this; } @@ -186,7 +222,23 @@ private ChannelEndpoint affinityEndpoint(ByteString transactionId) { if (address == null) { return null; } - return endpointCache.get(address); + // Use non-creating lookup and require READY state for location-aware routing. + ChannelEndpoint endpoint = endpointCache.getIfPresent(address); + if (endpoint == null) { + logger.log( + Level.FINE, + "Affinity endpoint for address {0} not present in cache, falling back to default", + address); + return null; + } + if (!endpoint.isHealthy()) { + logger.log( + Level.FINE, + "Affinity endpoint for address {0} not READY, falling back to default", + address); + return null; + } + return endpoint; } private void clearAffinity(ByteString transactionId) { @@ -411,6 +463,10 @@ public void sendMessage(RequestT message) { selectedEndpoint = endpoint; this.channelFinder = finder; + // Record session name for probing and real traffic for idle eviction. + String session = extractSessionFromMessage(message); + parentChannel.onRequestRouted(session, endpoint); + delegate = endpoint.getChannel().newCall(methodDescriptor, callOptions); if (pendingMessageCompression != null) { delegate.setMessageCompression(pendingMessageCompression); @@ -535,6 +591,22 @@ private void drainPendingRequests() { } } + @Nullable + private static String extractSessionFromMessage(Object message) { + if (message instanceof ReadRequest) { + return ((ReadRequest) message).getSession(); + } else if (message instanceof ExecuteSqlRequest) { + return ((ExecuteSqlRequest) message).getSession(); + } else if (message instanceof BeginTransactionRequest) { + return ((BeginTransactionRequest) message).getSession(); + } else if (message instanceof CommitRequest) { + return ((CommitRequest) message).getSession(); + } else if (message instanceof RollbackRequest) { + return ((RollbackRequest) message).getSession(); + } + return null; + } + void maybeRecordAffinity(ByteString transactionId) { parentChannel.recordAffinity(transactionId, selectedEndpoint, allowDefaultAffinity); } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java index bdbd495aa58c..5cc28cc651e7 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java @@ -37,12 +37,16 @@ import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.IntStream; /** Cache for routing information used by location-aware routing. */ @InternalApi public final class KeyRangeCache { + private static final Logger logger = Logger.getLogger(KeyRangeCache.class.getName()); + private static final int MAX_LOCAL_REPLICA_DISTANCE = 5; private static final int DEFAULT_MIN_ENTRIES_FOR_RANDOM_PICK = 1000; @@ -55,6 +59,7 @@ public enum RangeMode { } private final ChannelEndpointCache endpointCache; + @javax.annotation.Nullable private final EndpointLifecycleManager lifecycleManager; private final NavigableMap ranges = new TreeMap<>(ByteString.unsignedLexicographicalComparator()); private final Map groups = new HashMap<>(); @@ -65,7 +70,14 @@ public enum RangeMode { private volatile int minCacheEntriesForRandomPick = DEFAULT_MIN_ENTRIES_FOR_RANDOM_PICK; public KeyRangeCache(ChannelEndpointCache endpointCache) { + this(endpointCache, null); + } + + public KeyRangeCache( + ChannelEndpointCache endpointCache, + @javax.annotation.Nullable EndpointLifecycleManager lifecycleManager) { this.endpointCache = Objects.requireNonNull(endpointCache); + this.lifecycleManager = lifecycleManager; } @VisibleForTesting @@ -469,21 +481,87 @@ private boolean matches(DirectedReadOptions.ReplicaSelection selection) { } } + /** + * Evaluates whether this tablet should be skipped for location-aware routing. + * + *

State-aware skip logic: + * + *

+ */ boolean shouldSkip(RoutingHint.Builder hintBuilder) { - if (skip || serverAddress.isEmpty() || (endpoint != null && !endpoint.isHealthy())) { - RoutingHint.SkippedTablet.Builder skipped = hintBuilder.addSkippedTabletUidBuilder(); - skipped.setTabletUid(tabletUid); - skipped.setIncarnation(incarnation); + // Server-marked skip or no address: always report. + if (skip || serverAddress.isEmpty()) { + addSkippedTablet(hintBuilder); + return true; + } + + // If the cached endpoint's channel has been shut down (e.g. after idle eviction), + // discard the stale reference so we re-lookup from the cache below. + if (endpoint != null && endpoint.getChannel().isShutdown()) { + logger.log( + Level.FINE, + "Tablet {0} at {1}: cached endpoint is shutdown, clearing stale reference", + new Object[] {tabletUid, serverAddress}); + endpoint = null; + } + + // Lookup without creating: location-aware routing should not trigger foreground endpoint + // creation. + if (endpoint == null) { + endpoint = endpointCache.getIfPresent(serverAddress); + } + + // No endpoint exists yet - skip silently, request background recreation so the + // endpoint becomes available for future requests. + if (endpoint == null) { + logger.log( + Level.FINE, + "Tablet {0} at {1}: no endpoint present, skipping silently", + new Object[] {tabletUid, serverAddress}); + if (lifecycleManager != null) { + lifecycleManager.requestEndpointRecreation(serverAddress); + } + return true; + } + + // READY - usable for location-aware routing. + if (endpoint.isHealthy()) { + return false; + } + + // TRANSIENT_FAILURE - skip and report so server can refresh client cache. + if (endpoint.isTransientFailure()) { + logger.log( + Level.INFO, + "Tablet {0} at {1}: endpoint in TRANSIENT_FAILURE, adding to skipped_tablets", + new Object[] {tabletUid, serverAddress}); + addSkippedTablet(hintBuilder); return true; } - return false; + + // IDLE, CONNECTING, SHUTDOWN, or unsupported - skip silently. + logger.log( + Level.FINE, + "Tablet {0} at {1}: endpoint not ready, skipping silently", + new Object[] {tabletUid, serverAddress}); + return true; + } + + private void addSkippedTablet(RoutingHint.Builder hintBuilder) { + RoutingHint.SkippedTablet.Builder skipped = hintBuilder.addSkippedTabletUidBuilder(); + skipped.setTabletUid(tabletUid); + skipped.setIncarnation(incarnation); } ChannelEndpoint pick(RoutingHint.Builder hintBuilder) { hintBuilder.setTabletUid(tabletUid); - if (endpoint == null && !serverAddress.isEmpty()) { - endpoint = endpointCache.get(serverAddress); - } + // Endpoint must already exist and be READY if shouldSkip returned false. return endpoint; } @@ -567,37 +645,18 @@ ChannelEndpoint fillRoutingHint( directedReadOptions.getReplicasCase() != DirectedReadOptions.ReplicasCase.REPLICAS_NOT_SET; - // Fast path: pick a tablet while holding the lock. If the endpoint is already - // cached on the tablet, return it immediately without releasing the lock. - // If the endpoint needs to be created (blocking network dial), release the - // lock first so other threads are not blocked during channel creation. - CachedTablet selected; + // Select a tablet while holding the lock. With state-aware routing, only READY + // endpoints pass shouldSkip(), so the selected tablet always has a cached + // endpoint. No foreground endpoint creation is needed — the lifecycle manager + // creates endpoints in the background. synchronized (this) { - selected = + CachedTablet selected = selectTabletLocked( preferLeader, hasDirectedReadOptions, hintBuilder, directedReadOptions); if (selected == null) { return null; } - if (selected.endpoint != null || selected.serverAddress.isEmpty()) { - return selected.pick(hintBuilder); - } - // Slow path: endpoint not yet created. Capture the address and release the - // lock before calling endpointCache.get(), which may block on network dial. - hintBuilder.setTabletUid(selected.tabletUid); - } - - String serverAddress = selected.serverAddress; - ChannelEndpoint endpoint = endpointCache.get(serverAddress); - - synchronized (this) { - // Only update if the tablet's address hasn't changed since we released the lock. - if (selected.endpoint == null && selected.serverAddress.equals(serverAddress)) { - selected.endpoint = endpoint; - } - // Re-set tabletUid with the latest value in case update() ran concurrently. - hintBuilder.setTabletUid(selected.tabletUid); - return selected.endpoint; + return selected.pick(hintBuilder); } } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java index 525313f1ab4e..7e9d2476346b 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderGoldenTest.java @@ -131,6 +131,12 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + public ChannelEndpoint getIfPresent(String address) { + // Auto-create for golden tests — simulates lifecycle manager having pre-created endpoints. + return endpoints.computeIfAbsent(address, FakeEndpoint::new); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -158,6 +164,11 @@ public boolean isHealthy() { return !unhealthyServers.contains(address); } + @Override + public boolean isTransientFailure() { + return unhealthyServers.contains(address); + } + @Override public ManagedChannel getChannel() { return new ManagedChannel() { diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java new file mode 100644 index 000000000000..24d92dc9c565 --- /dev/null +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java @@ -0,0 +1,304 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class EndpointLifecycleManagerTest { + + private EndpointLifecycleManager manager; + + @After + public void tearDown() { + if (manager != null) { + manager.shutdown(); + } + } + + @Test + public void endpointCreationStartsProbing() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), Clock.systemUTC()); + + manager.ensureEndpointExists("server1"); + + // Wait for background creation. + Thread.sleep(500); + + // Endpoint should be created in the cache. + assertNotNull(cache.getIfPresent("server1")); + + // Should be managed. + assertTrue(manager.isManaged("server1")); + assertNotNull(manager.getEndpointState("server1")); + assertEquals(1, manager.managedEndpointCount()); + } + + @Test + public void duplicateEnsureEndpointExistsIsNoop() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + manager.ensureEndpointExists("server1"); + manager.ensureEndpointExists("server1"); + manager.ensureEndpointExists("server1"); + + Thread.sleep(300); + + assertEquals(1, manager.managedEndpointCount()); + } + + @Test + public void defaultEndpointIsNotManaged() { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + manager.ensureEndpointExists("default"); + + assertFalse(manager.isManaged("default")); + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void probeTrafficDoesNotUpdateLastRealTrafficAt() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), clock); + + Instant creationTime = clock.instant(); + manager.ensureEndpointExists("server1"); + Thread.sleep(300); + + // Probe traffic should not change lastRealTrafficAt. + EndpointLifecycleManager.EndpointState state = manager.getEndpointState("server1"); + assertNotNull(state); + assertEquals(creationTime, state.lastRealTrafficAt); + } + + @Test + public void realRoutedTrafficUpdatesLastRealTrafficAt() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), clock); + + manager.ensureEndpointExists("server1"); + Thread.sleep(300); + + Instant before = clock.instant(); + clock.advance(Duration.ofMinutes(5)); + manager.recordRealTraffic("server1"); + + EndpointLifecycleManager.EndpointState state = manager.getEndpointState("server1"); + assertNotNull(state); + assertTrue(state.lastRealTrafficAt.isAfter(before)); + } + + @Test + public void endpointWithOnlyProbeTrafficIsEvictedAfterIdleDuration() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + Duration idleDuration = Duration.ofMinutes(30); + manager = + new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock); + + manager.ensureEndpointExists("server1"); + Thread.sleep(300); + + assertTrue(manager.isManaged("server1")); + + // Advance past idle threshold. + clock.advance(Duration.ofMinutes(31)); + + // Trigger eviction check manually. + manager.checkIdleEviction(); + + // Endpoint should be evicted. + assertFalse(manager.isManaged("server1")); + assertNull(cache.getIfPresent("server1")); + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void endpointWithRecentRealTrafficIsNotEvicted() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + Duration idleDuration = Duration.ofMinutes(30); + manager = + new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock); + + manager.ensureEndpointExists("server1"); + Thread.sleep(300); + + // Record real traffic at 20 minutes. + clock.advance(Duration.ofMinutes(20)); + manager.recordRealTraffic("server1"); + + // Advance to 31 minutes (only 11 minutes since last real traffic). + clock.advance(Duration.ofMinutes(11)); + manager.checkIdleEviction(); + + // Should NOT be evicted because last real traffic was 11 minutes ago. + assertTrue(manager.isManaged("server1")); + } + + @Test + public void evictedEndpointIsRecreatedOnDemand() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + TestClock clock = new TestClock(Instant.now()); + Duration idleDuration = Duration.ofMinutes(30); + manager = + new EndpointLifecycleManager(cache, /* probeIntervalSeconds= */ 60, idleDuration, clock); + + manager.ensureEndpointExists("server1"); + Thread.sleep(300); + + // Evict. + clock.advance(Duration.ofMinutes(31)); + manager.checkIdleEviction(); + assertFalse(manager.isManaged("server1")); + + // Recreate. + manager.requestEndpointRecreation("server1"); + Thread.sleep(500); + + assertTrue(manager.isManaged("server1")); + assertNotNull(cache.getIfPresent("server1")); + } + + @Test + public void sessionNameIsUpdatedOnRotation() { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + assertNull(manager.getMultiplexedSessionName()); + + manager.setMultiplexedSessionName("projects/p/instances/i/databases/d/sessions/s1"); + assertEquals( + "projects/p/instances/i/databases/d/sessions/s1", manager.getMultiplexedSessionName()); + + // Session rotation: second set should overwrite so probes use the current session. + manager.setMultiplexedSessionName("projects/p/instances/i/databases/d/sessions/s2"); + assertEquals( + "projects/p/instances/i/databases/d/sessions/s2", manager.getMultiplexedSessionName()); + + // Null and empty should not clear. + manager.setMultiplexedSessionName(null); + assertEquals( + "projects/p/instances/i/databases/d/sessions/s2", manager.getMultiplexedSessionName()); + manager.setMultiplexedSessionName(""); + assertEquals( + "projects/p/instances/i/databases/d/sessions/s2", manager.getMultiplexedSessionName()); + } + + @Test + public void shutdownStopsAllProbing() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), Clock.systemUTC()); + + manager.ensureEndpointExists("server1"); + manager.ensureEndpointExists("server2"); + Thread.sleep(300); + + assertEquals(2, manager.managedEndpointCount()); + + manager.shutdown(); + + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void emptyOrNullAddressIsIgnored() { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + manager.ensureEndpointExists(null); + manager.ensureEndpointExists(""); + + assertEquals(0, manager.managedEndpointCount()); + } + + @Test + public void recordRealTrafficForDefaultEndpointIsIgnored() { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC()); + + // Should not throw or create state. + manager.recordRealTraffic("default"); + manager.recordRealTraffic(null); + assertEquals(0, manager.managedEndpointCount()); + } + + /** Test clock that can be advanced manually. */ + private static final class TestClock extends Clock { + private Instant now; + + TestClock(Instant now) { + this.now = now; + } + + void advance(Duration duration) { + now = now.plus(duration); + } + + @Override + public Instant instant() { + return now; + } + + @Override + public ZoneId getZone() { + return ZoneId.of("UTC"); + } + + @Override + public Clock withZone(ZoneId zone) { + return this; + } + } +} diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java index 56e6d3cfc2bc..74afec18bfc3 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java @@ -114,10 +114,13 @@ public void healthReflectsChannelShutdown() throws Exception { GrpcChannelEndpointCache cache = new GrpcChannelEndpointCache(createProvider("localhost:1234")); try { ChannelEndpoint server = cache.get("localhost:1111"); - assertThat(server.isHealthy()).isTrue(); + // Newly created channel is not READY (likely IDLE), so isHealthy is false for location aware. + // isHealthy now requires READY state for location aware routing. + assertThat(server.isHealthy()).isFalse(); server.getChannel().shutdownNow(); assertThat(server.isHealthy()).isFalse(); + assertThat(server.isTransientFailure()).isFalse(); } finally { cache.shutdown(); } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java index a4919389a85c..b323562a708c 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java @@ -196,7 +196,8 @@ public void timeoutOnCommitClearsTransactionAffinity() throws Exception { commitCall.sendMessage( CommitRequest.newBuilder().setSession(SESSION).setTransactionId(transactionId).build()); - assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(1); + // affinityEndpoint now uses getIfPresent (non-creating), so getCount stays at 0. + assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0); @SuppressWarnings("unchecked") RecordingClientCall commitDelegate = @@ -210,7 +211,8 @@ public void timeoutOnCommitClearsTransactionAffinity() throws Exception { rollbackCall.sendMessage( RollbackRequest.newBuilder().setSession(SESSION).setTransactionId(transactionId).build()); - assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(1); + // Rollback also uses getIfPresent for affinity, so getCount remains 0. + assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0); } @Test @@ -1078,6 +1080,16 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + public ChannelEndpoint getIfPresent(String address) { + if (defaultAddress.equals(address)) { + return defaultEndpoint; + } + // Auto-create for integration tests — simulates lifecycle manager having pre-created + // endpoints. + return endpoints.computeIfAbsent(address, FakeEndpoint::new); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -1139,6 +1151,11 @@ public boolean isHealthy() { return true; } + @Override + public boolean isTransientFailure() { + return false; + } + @Override public ManagedChannel getChannel() { return channel; diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java index 763a36dbfd64..7fa2874ada5b 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheGoldenTest.java @@ -125,6 +125,12 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + public ChannelEndpoint getIfPresent(String address) { + // Auto-create for golden tests — simulates lifecycle manager having pre-created endpoints. + return endpoints.computeIfAbsent(address, FakeEndpoint::new); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -154,6 +160,11 @@ public boolean isHealthy() { return true; } + @Override + public boolean isTransientFailure() { + return false; + } + @Override public ManagedChannel getChannel() { return channel; diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java index 2405aa7a062b..9ba48cb344ec 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java @@ -18,6 +18,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import com.google.protobuf.ByteString; import com.google.spanner.v1.CacheUpdate; @@ -33,6 +35,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -41,53 +44,33 @@ public class KeyRangeCacheTest { @Test - public void skipsUnhealthyTabletAfterItIsCached() { + public void skipsTransientFailureTabletWithSkippedTablet() { FakeEndpointCache endpointCache = new FakeEndpointCache(); KeyRangeCache cache = new KeyRangeCache(endpointCache); - cache.addRanges( - CacheUpdate.newBuilder() - .addRange( - Range.newBuilder() - .setStartKey(bytes("a")) - .setLimitKey(bytes("z")) - .setGroupUid(5) - .setSplitId(1) - .setGeneration(bytes("1"))) - .addGroup( - Group.newBuilder() - .setGroupUid(5) - .setGeneration(bytes("1")) - .setLeaderIndex(0) - .addTablets( - Tablet.newBuilder() - .setTabletUid(1) - .setServerAddress("server1") - .setIncarnation(bytes("1")) - .setDistance(0)) - .addTablets( - Tablet.newBuilder() - .setTabletUid(2) - .setServerAddress("server2") - .setIncarnation(bytes("1")) - .setDistance(0))) - .build()); + cache.addRanges(twoReplicaUpdate()); + // Pre-create endpoints. + endpointCache.get("server1"); + endpointCache.get("server2"); + + // Initial routing works. RoutingHint.Builder initialHint = RoutingHint.newBuilder().setKey(bytes("a")); ChannelEndpoint initialServer = cache.fillRoutingHint( - /* preferLeader= */ false, + false, KeyRangeCache.RangeMode.COVERING_SPLIT, DirectedReadOptions.getDefaultInstance(), initialHint); assertNotNull(initialServer); - endpointCache.setHealthy("server1", false); + // Mark server1 as TRANSIENT_FAILURE. + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); ChannelEndpoint server = cache.fillRoutingHint( - /* preferLeader= */ false, + false, KeyRangeCache.RangeMode.COVERING_SPLIT, DirectedReadOptions.getDefaultInstance(), hint); @@ -125,6 +108,8 @@ public void shrinkToEvictsRanges() { .setIncarnation(bytes("1")))) .build(); cache.addRanges(update); + // Pre-create endpoint so READY state check passes in shouldSkip. + endpointCache.get("server" + i); } checkContents(cache, numRanges, numRanges); @@ -140,6 +125,425 @@ public void shrinkToEvictsRanges() { checkContents(cache, 0, numRanges); } + @Test + public void readyEndpointIsUsableForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + // Pre-create endpoint so getIfPresent finds it. Default state is READY. + endpointCache.get("server1"); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server1", server.getAddress()); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void idleEndpointIsNotUsableForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + // Ensure endpoint exists in cache first. + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.IDLE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // IDLE causes silent skip — falls back to null (default host), no skipped_tablets. + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void connectingEndpointIsNotUsableForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.CONNECTING); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void transientFailureEndpointIsNotUsable() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // TRANSIENT_FAILURE: skip with skipped_tablets. + assertNull(server); + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); + } + + @Test + public void unsupportedGetStateTreatedAsNotReady() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.UNSUPPORTED); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // Unsupported state: skip silently, no skipped_tablets. + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void missingEndpointCausesDefaultHostFallbackWithoutSkippedTablet() { + // Endpoint not in cache at all — getIfPresent returns null. + FakeEndpointCache endpointCache = new FakeEndpointCache(); + endpointCache.setCreateOnGet(false); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void idleEndpointCausesDefaultHostFallbackWithoutSkippedTablet() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.IDLE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void connectingEndpointCausesDefaultHostFallbackWithoutSkippedTablet() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.CONNECTING); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void transientFailureEndpointCausesSkippedTabletPlusDefaultHostFallback() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); + } + + @Test + public void oneUnusableReplicaAndOneReadyReplicaUsesReady() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(twoReplicaUpdate()); + + // Make both endpoints present. + endpointCache.get("server1"); + endpointCache.get("server2"); + + // server1 is IDLE (not ready), server2 is READY. + endpointCache.setState("server1", EndpointHealthState.IDLE); + endpointCache.setState("server2", EndpointHealthState.READY); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server2", server.getAddress()); + // server1 was IDLE, so no skipped_tablets for it. + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void readyEndpointIsUsedForLocationAware() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.READY); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server1", server.getAddress()); + assertEquals(0, hint.getSkippedTabletUidCount()); + } + + @Test + public void transientFailureReplicaSkippedAndReadyReplicaSelected() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(twoReplicaUpdate()); + + endpointCache.get("server1"); + endpointCache.get("server2"); + + endpointCache.setState("server1", EndpointHealthState.TRANSIENT_FAILURE); + endpointCache.setState("server2", EndpointHealthState.READY); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server2", server.getAddress()); + // server1 was TRANSIENT_FAILURE, so it should be in skipped_tablets. + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); + } + + // --- Eviction and recreation tests --- + + @Test + public void staleShutdownEndpointIsClearedAndRelookedUp() { + // Bug 1 regression: after idle eviction shuts down a channel, the tablet's cached + // endpoint reference becomes stale. shouldSkip must detect the shutdown channel, + // discard it, and re-lookup from the cache. + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(singleReplicaUpdate("server1")); + + // Route once so the tablet caches the endpoint reference. + endpointCache.get("server1"); + RoutingHint.Builder hint1 = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint first = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint1); + assertNotNull(first); + assertEquals("server1", first.getAddress()); + + // Simulate idle eviction: shut down the channel and evict from cache. + first.getChannel().shutdownNow(); + endpointCache.evict("server1"); + + // Without the fix, the tablet would keep using the stale shutdown endpoint forever. + // With the fix, shouldSkip detects the shutdown, clears it, and re-lookups from cache. + + // Re-create the endpoint (simulating lifecycle manager recreation). + endpointCache.get("server1"); + endpointCache.setState("server1", EndpointHealthState.READY); + + RoutingHint.Builder hint2 = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint second = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint2); + + // Should find the new READY endpoint. + assertNotNull(second); + assertEquals("server1", second.getAddress()); + assertEquals(0, hint2.getSkippedTabletUidCount()); + } + + @Test + public void missingEndpointTriggersRecreationViaLifecycleManager() { + // Bug 2 regression: when a routing lookup finds no endpoint, it should call + // requestEndpointRecreation so the endpoint becomes available for future requests. + FakeEndpointCache endpointCache = new FakeEndpointCache(); + TrackingLifecycleManager tracking = new TrackingLifecycleManager(endpointCache); + try { + KeyRangeCache cache = new KeyRangeCache(endpointCache, tracking); + cache.addRanges(singleReplicaUpdate("server1")); + + // No endpoint exists in cache. + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + // Should fall back to default. + assertNull(server); + + // Lifecycle manager should have been asked to recreate the endpoint. + assertTrue( + "requestEndpointRecreation should have been called for server1", + tracking.recreationRequested.contains("server1")); + } finally { + tracking.shutdown(); + } + } + + /** Minimal lifecycle manager stub that records recreation requests. */ + private static final class TrackingLifecycleManager extends EndpointLifecycleManager { + final java.util.Set recreationRequested = new java.util.HashSet<>(); + + TrackingLifecycleManager(ChannelEndpointCache cache) { + super(cache); + } + + @Override + void requestEndpointRecreation(String address) { + recreationRequested.add(address); + } + } + + // --- Helper methods --- + + private static CacheUpdate singleReplicaUpdate(String serverAddress) { + return CacheUpdate.newBuilder() + .addRange( + Range.newBuilder() + .setStartKey(bytes("a")) + .setLimitKey(bytes("z")) + .setGroupUid(5) + .setSplitId(1) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(5) + .setGeneration(bytes("1")) + .setLeaderIndex(0) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1) + .setServerAddress(serverAddress) + .setIncarnation(bytes("1")) + .setDistance(0))) + .build(); + } + + private static CacheUpdate twoReplicaUpdate() { + return CacheUpdate.newBuilder() + .addRange( + Range.newBuilder() + .setStartKey(bytes("a")) + .setLimitKey(bytes("z")) + .setGroupUid(5) + .setSplitId(1) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(5) + .setGeneration(bytes("1")) + .setLeaderIndex(0) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1) + .setServerAddress("server1") + .setIncarnation(bytes("1")) + .setDistance(0)) + .addTablets( + Tablet.newBuilder() + .setTabletUid(2) + .setServerAddress("server2") + .setIncarnation(bytes("1")) + .setDistance(0))) + .build(); + } + private static void checkContents(KeyRangeCache cache, int expectedSize, int mustBeInCache) { assertEquals(expectedSize, cache.size()); int hitCount = 0; @@ -147,7 +551,7 @@ private static void checkContents(KeyRangeCache cache, int expectedSize, int mus RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes(String.format("%04d", i))); ChannelEndpoint server = cache.fillRoutingHint( - /* preferLeader= */ false, + false, KeyRangeCache.RangeMode.COVERING_SPLIT, DirectedReadOptions.getDefaultInstance(), hint); @@ -166,9 +570,23 @@ private static ByteString bytes(String value) { return ByteString.copyFromUtf8(value); } - private static final class FakeEndpointCache implements ChannelEndpointCache { + // --- Health state for testing --- + + enum EndpointHealthState { + READY, + IDLE, + CONNECTING, + TRANSIENT_FAILURE, + SHUTDOWN, + UNSUPPORTED + } + + // --- Test doubles --- + + static final class FakeEndpointCache implements ChannelEndpointCache { private final Map endpoints = new HashMap<>(); private final FakeEndpoint defaultEndpoint = new FakeEndpoint("default"); + private boolean createOnGet = true; @Override public ChannelEndpoint defaultChannel() { @@ -180,6 +598,12 @@ public ChannelEndpoint get(String address) { return endpoints.computeIfAbsent(address, FakeEndpoint::new); } + @Override + @Nullable + public ChannelEndpoint getIfPresent(String address) { + return endpoints.get(address); + } + @Override public void evict(String address) { endpoints.remove(address); @@ -190,18 +614,28 @@ public void shutdown() { endpoints.clear(); } - void setHealthy(String address, boolean healthy) { + void setCreateOnGet(boolean createOnGet) { + this.createOnGet = createOnGet; + } + + void setState(String address, EndpointHealthState state) { FakeEndpoint endpoint = endpoints.get(address); if (endpoint != null) { - endpoint.setHealthy(healthy); + endpoint.setState(state); } } + + @Deprecated + void setHealthy(String address, boolean healthy) { + setState( + address, healthy ? EndpointHealthState.READY : EndpointHealthState.TRANSIENT_FAILURE); + } } - private static final class FakeEndpoint implements ChannelEndpoint { + static final class FakeEndpoint implements ChannelEndpoint { private final String address; private final ManagedChannel channel = new FakeManagedChannel(); - private boolean healthy = true; + private EndpointHealthState state = EndpointHealthState.READY; FakeEndpoint(String address) { this.address = address; @@ -214,7 +648,12 @@ public String getAddress() { @Override public boolean isHealthy() { - return healthy; + return state == EndpointHealthState.READY; + } + + @Override + public boolean isTransientFailure() { + return state == EndpointHealthState.TRANSIENT_FAILURE; } @Override @@ -222,8 +661,8 @@ public ManagedChannel getChannel() { return channel; } - void setHealthy(boolean healthy) { - this.healthy = healthy; + void setState(EndpointHealthState state) { + this.state = state; } }