diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java index c5a482c5d50..cbc2703458f 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java @@ -1119,7 +1119,27 @@ public enum DefaultDriverOption implements DriverOption { * *

Value type: boolean */ - CLIENT_ROUTES_SHARD_AWARENESS_ENABLED("advanced.client-routes.shard-awarness-enabled"); + CLIENT_ROUTES_SHARD_AWARENESS_ENABLED("advanced.client-routes.shard-awarness-enabled"), + + /** + * Whether the driver may fall back to a direct connection when no client route is available for a + * node. + * + *

When {@code true} (the default), nodes that have no matching entry in {@code + * system.client_routes} — or whose proxy address cannot be reached or resolved — are contacted + * directly using their broadcast address. This preserves backward-compatible mixed proxy/direct + * topologies where some nodes are behind the private endpoint and others are not. + * + *

When {@code false}, the driver never falls back to a direct connection. Any node without a + * reachable route is treated as unreachable: it stays DOWN and the reconnection loop retries + * until a {@code CLIENT_ROUTES_CHANGE} event publishes the route. Note that setting this to + * {@code false} does not actively close existing direct connections; connection pools + * that were established before the flag was applied may continue to operate until naturally + * recycled. + * + *

Value type: boolean + */ + CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK("advanced.client-routes.direct-connection-fallback"); private final String path; diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java index 60190fc1cce..e715ac94c30 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java @@ -400,6 +400,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) { // values) with no sensible scalar default, analogous to how CONFIG_RELOAD_INTERVAL is omitted. map.put(TypedDriverOption.CLIENT_ROUTES_NATIVE_TRANSPORT_PORT, 9042); map.put(TypedDriverOption.CLIENT_ROUTES_SHARD_AWARENESS_ENABLED, false); + map.put(TypedDriverOption.CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK, true); } @Immutable diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java index db5edb5b947..1060efe4cb3 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java @@ -954,6 +954,15 @@ public String toString() { new TypedDriverOption<>( DefaultDriverOption.CLIENT_ROUTES_SHARD_AWARENESS_ENABLED, GenericType.BOOLEAN); + /** + * Whether the driver may fall back to a direct connection when no client route is available for a + * node. When {@code true} (default), nodes without a reachable route are contacted directly via + * their broadcast address. When {@code false}, no fallback is attempted and the node stays DOWN. + */ + public static final TypedDriverOption CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK = + new TypedDriverOption<>( + DefaultDriverOption.CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK, GenericType.BOOLEAN); + private static Iterable> introspectBuiltInValues() { try { ImmutableList.Builder> result = ImmutableList.builder(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPoint.java index 15d825b2efc..18eb2c682fa 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPoint.java @@ -27,12 +27,17 @@ import java.net.SocketAddress; import java.util.Objects; import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClientRoutesEndPoint implements EndPoint { + private static final Logger LOG = LoggerFactory.getLogger(ClientRoutesEndPoint.class); + private final UUID hostId; private final ClientRoutesTopologyMonitor topologyMonitor; private final String metricPrefix; @NonNull private final EndPoint fallbackEndPoint; + private final boolean directConnectionFallback; /** * @param topologyMonitor the topology monitor used to resolve the endpoint address on demand. @@ -40,20 +45,24 @@ public class ClientRoutesEndPoint implements EndPoint { * @param broadcastInetAddress the node's broadcast address (from system.peers or system.local), * used to build a stable metric prefix. May be {@code null} if the address could not be * determined, in which case the hostId is used as the metric prefix instead. - * @param fallbackEndPoint the default endpoint to fall back to when {@code - * topologyMonitor.resolve()} returns {@code null}, i.e. when this node is not accessed via a - * cloud private endpoint. Must not be {@code null}. + * @param fallbackEndPoint the endpoint to use when {@code topologyMonitor.resolve()} returns + * {@code null} and {@code directConnectionFallback} is {@code true}. Always required. + * @param directConnectionFallback when {@code true}, {@link #resolve()} falls back to {@code + * fallbackEndPoint} if no client route is found. When {@code false}, throws instead, keeping + * the node DOWN until a route is published. */ public ClientRoutesEndPoint( @NonNull ClientRoutesTopologyMonitor topologyMonitor, @NonNull UUID hostId, @Nullable InetAddress broadcastInetAddress, - @NonNull EndPoint fallbackEndPoint) { + @NonNull EndPoint fallbackEndPoint, + boolean directConnectionFallback) { this.topologyMonitor = Objects.requireNonNull(topologyMonitor, "Topology monitor cannot be null"); this.hostId = Objects.requireNonNull(hostId, "HOST uuid cannot be null"); this.fallbackEndPoint = Objects.requireNonNull(fallbackEndPoint, "Fallback endpoint cannot be null"); + this.directConnectionFallback = directConnectionFallback; this.metricPrefix = buildMetricPrefix(broadcastInetAddress, hostId); } @@ -73,7 +82,24 @@ public SocketAddress resolve() { } catch (IOException e) { throw new UncheckedIOException("DNS resolution failed for host_id=" + hostId, e); } - return fallbackEndPoint.resolve(); + if (directConnectionFallback) { + // Default (backward-compatible) mode: fall back to the node's broadcast address. + // This supports mixed proxy/direct topologies where some nodes are behind the private + // endpoint and others are reached directly. + return fallbackEndPoint.resolve(); + } + // direct-connection-fallback=false: the driver must not bypass the proxy infrastructure. + // The node will remain DOWN and the reconnection loop will retry until a + // CLIENT_ROUTES_CHANGE event populates the route. + LOG.warn( + "No client route entry found for host_id={}. " + + "The node will remain DOWN until a route is published via CLIENT_ROUTES_CHANGE.", + hostId); + throw new IllegalStateException( + "No client route entry found for host_id=" + + hostId + + ". Direct connection fallback is disabled" + + " (advanced.client-routes.direct-connection-fallback = false)."); } @Override diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitor.java index 569637e0873..756ea45c083 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitor.java @@ -82,6 +82,7 @@ public class ClientRoutesTopologyMonitor extends DefaultTopologyMonitor { private final String logPrefix; private final AtomicReference> resolvedRoutesCache; private final boolean useSSL; + private final boolean directConnectionFallback; private volatile boolean closed = false; private final AtomicInteger consecutiveEmptyResults = new AtomicInteger(0); @@ -147,6 +148,11 @@ public ClientRoutesTopologyMonitor( this.logPrefix = context.getSessionName(); this.resolvedRoutesCache = new AtomicReference<>(Collections.emptyMap()); this.useSSL = context.getSslEngineFactory().isPresent(); + this.directConnectionFallback = + context + .getConfig() + .getDefaultProfile() + .getBoolean(DefaultDriverOption.CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK); } @Override @@ -459,14 +465,15 @@ protected EndPoint buildNodeEndPoint( UUID hostId = row.getUuid("host_id"); if (hostId == null) { LOG.warn( - "[{}] host_id is null in system row for address {} — cannot assign a client route. " - + "This may indicate corrupted system tables. " - + "Falling back to default endpoint resolution.", + "[{}] host_id is null in system row for address {} — cannot build a client-routes" + + " endpoint. This may indicate corrupted system tables. The node will be ignored.", logPrefix, broadcastRpcAddress); - return super.buildNodeEndPoint(row, broadcastRpcAddress, localEndPoint); + throw new IllegalStateException( + "host_id is null in system row for address " + + broadcastRpcAddress + + "; cannot build a ClientRoutesEndPoint without a host_id"); } - EndPoint fallback = super.buildNodeEndPoint(row, broadcastRpcAddress, localEndPoint); InetAddress broadcastInetAddress = null; if (broadcastRpcAddress != null) { broadcastInetAddress = broadcastRpcAddress.getAddress(); @@ -477,7 +484,9 @@ protected EndPoint buildNodeEndPoint( if (broadcastInetAddress == null) { broadcastInetAddress = row.getInetAddress("peer"); } - return new ClientRoutesEndPoint(this, hostId, broadcastInetAddress, fallback); + EndPoint fallback = super.buildNodeEndPoint(row, broadcastRpcAddress, localEndPoint); + return new ClientRoutesEndPoint( + this, hostId, broadcastInetAddress, fallback, directConnectionFallback); } /** diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 92c5b98118f..6b4206386e7 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -1173,6 +1173,21 @@ datastax-java-driver { # Default: false shard-awarness-enabled = false + # When true (default), nodes that have no matching entry in system.client_routes — or whose + # proxy address cannot be reached or resolved — are contacted directly using their broadcast + # address. This preserves backward-compatible mixed proxy/direct topologies where some nodes + # are behind the private endpoint and others are not. + # + # When false, the driver never falls back to a direct connection. Any node without a reachable + # route stays DOWN and the reconnection loop retries until a CLIENT_ROUTES_CHANGE event + # publishes the route. Note that setting this to false does NOT actively close existing direct + # connections; pools established before this flag was applied may continue operating until + # naturally recycled. + # + # Required: no + # Default: true + direct-connection-fallback = true + } # Whether to resolve the addresses passed to `basic.contact-points`. diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPointTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPointTest.java index f31dd2861ed..a13142ef2e1 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPointTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPointTest.java @@ -47,22 +47,39 @@ public void should_resolve_via_topology_monitor() throws UnknownHostException { when(topologyMonitor.resolve(hostId)).thenReturn(expected); ClientRoutesEndPoint ep = - new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true); assertThat(ep.resolve()).isEqualTo(expected); } @Test - public void should_fallback_when_resolve_returns_null() throws UnknownHostException { + public void should_throw_when_direct_connection_fallback_disabled_and_no_route() + throws UnknownHostException { UUID hostId = UUID.randomUUID(); - InetSocketAddress fallbackAddr = new InetSocketAddress("10.0.0.1", 9042); when(topologyMonitor.resolve(hostId)).thenReturn(null); - when(fallbackEndPoint.resolve()).thenReturn(fallbackAddr); ClientRoutesEndPoint ep = - new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, false); - assertThat(ep.resolve()).isEqualTo(fallbackAddr); + assertThatThrownBy(ep::resolve) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No client route entry found") + .hasMessageContaining(hostId.toString()) + .hasMessageContaining("direct-connection-fallback"); + } + + @Test + public void should_fall_back_to_broadcast_when_direct_connection_fallback_enabled() + throws UnknownHostException { + UUID hostId = UUID.randomUUID(); + InetSocketAddress fallbackAddress = new InetSocketAddress("10.0.0.1", 9042); + when(topologyMonitor.resolve(hostId)).thenReturn(null); + when(fallbackEndPoint.resolve()).thenReturn(fallbackAddress); + + ClientRoutesEndPoint ep = + new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true); + + assertThat(ep.resolve()).isEqualTo(fallbackAddress); } @Test @@ -71,7 +88,7 @@ public void should_wrap_io_exceptions_in_unchecked_io_exception() throws Unknown when(topologyMonitor.resolve(hostId)).thenThrow(new UnknownHostException("no-such-host")); ClientRoutesEndPoint ep = - new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true); assertThatThrownBy(ep::resolve) .isInstanceOf(UncheckedIOException.class) @@ -86,7 +103,7 @@ public void should_reflect_route_changes_on_subsequent_resolve() throws UnknownH when(topologyMonitor.resolve(hostId)).thenReturn(addr1); ClientRoutesEndPoint ep = - new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true); assertThat(ep.resolve()).isEqualTo(addr1); @@ -102,9 +119,9 @@ public void should_reflect_route_changes_on_subsequent_resolve() throws UnknownH public void should_be_equal_when_same_host_id() { UUID hostId = UUID.randomUUID(); ClientRoutesEndPoint ep1 = - new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true); ClientRoutesEndPoint ep2 = - new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true); assertThat(ep1).isEqualTo(ep2); assertThat(ep1.hashCode()).isEqualTo(ep2.hashCode()); @@ -113,9 +130,9 @@ public void should_be_equal_when_same_host_id() { @Test public void should_not_be_equal_when_different_host_id() { ClientRoutesEndPoint ep1 = - new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, fallbackEndPoint, true); ClientRoutesEndPoint ep2 = - new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, fallbackEndPoint, true); assertThat(ep1).isNotEqualTo(ep2); } @@ -124,7 +141,7 @@ public void should_not_be_equal_when_different_host_id() { public void should_not_be_equal_to_non_client_routes_endpoint() { UUID hostId = UUID.randomUUID(); ClientRoutesEndPoint ep = - new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true); assertThat(ep).isNotEqualTo("not an endpoint"); assertThat(ep).isNotEqualTo(null); @@ -136,7 +153,7 @@ public void should_not_be_equal_to_non_client_routes_endpoint() { public void should_use_host_id_as_metric_prefix_when_address_is_null() { UUID hostId = UUID.randomUUID(); ClientRoutesEndPoint ep = - new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true); assertThat(ep.asMetricPrefix()).isEqualTo(hostId.toString()); } @@ -146,7 +163,7 @@ public void should_format_ipv4_metric_prefix() throws Exception { UUID hostId = UUID.randomUUID(); InetAddress ipv4 = InetAddress.getByAddress(new byte[] {10, 0, 0, 1}); ClientRoutesEndPoint ep = - new ClientRoutesEndPoint(topologyMonitor, hostId, ipv4, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, hostId, ipv4, fallbackEndPoint, true); assertThat(ep.asMetricPrefix()).isEqualTo("10_0_0_1_" + hostId); } @@ -157,7 +174,7 @@ public void should_format_ipv6_metric_prefix() throws Exception { InetAddress ipv6 = InetAddress.getByAddress(new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}); ClientRoutesEndPoint ep = - new ClientRoutesEndPoint(topologyMonitor, hostId, ipv6, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, hostId, ipv6, fallbackEndPoint, true); // IPv6 keeps colons (consistent with DefaultEndPoint), dots replaced by underscores assertThat(ep.asMetricPrefix()).isEqualTo("0:0:0:0:0:0:0:1_" + hostId); @@ -169,7 +186,7 @@ public void should_format_ipv6_metric_prefix() throws Exception { public void should_return_host_id_as_string() { UUID hostId = UUID.randomUUID(); ClientRoutesEndPoint ep = - new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint); + new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true); assertThat(ep.toString()).isEqualTo("ClientRoutesEndPoint(" + hostId + ")"); } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitorTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitorTest.java index a1ba4617ef5..3e37ac22eef 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitorTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitorTest.java @@ -38,7 +38,6 @@ import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import edu.umd.cs.findbugs.annotations.NonNull; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; @@ -143,6 +142,8 @@ public void setup() { when(defaultProfile.getDuration(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT)) .thenReturn(Duration.ofSeconds(5)); when(defaultProfile.getBoolean(DefaultDriverOption.RECONNECT_ON_INIT)).thenReturn(false); + when(defaultProfile.getBoolean(DefaultDriverOption.CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK)) + .thenReturn(true); when(context.getSslEngineFactory()).thenReturn(Optional.empty()); ClientRoutesConfig config = ClientRoutesConfig.builder() @@ -163,6 +164,21 @@ private void initHandler() { handler.init(); } + /** + * Creates a fresh handler with the given {@code directConnectionFallback} setting, sharing all + * other context stubs from {@link #setup()}. + */ + private TestableClientRoutesTopologyMonitor createHandlerWithDirectConnectionFallback( + boolean directConnectionFallback) { + when(defaultProfile.getBoolean(DefaultDriverOption.CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK)) + .thenReturn(directConnectionFallback); + ClientRoutesConfig config = + ClientRoutesConfig.builder() + .addEndpoint(new ClientRouteProxy(connectionId, "host1")) + .build(); + return new TestableClientRoutesTopologyMonitor(context, config); + } + // ---- resolve() ------------------------------------------------------- @Test @@ -1041,19 +1057,16 @@ public void should_not_propagate_exception_when_query_fails() throws Exception { // ---- buildNodeEndPoint fallback ----------------------------------------- @Test - public void should_build_default_endpoint_when_host_id_is_null() { - // row.getUuid("host_id") returns null, triggering the hostId == null - // branch in buildNodeEndPoint which delegates to super.buildNodeEndPoint(). + public void should_throw_when_host_id_is_null() { + // row.getUuid("host_id") returns null — with client routes configured, the driver + // must not fall back to direct broadcast address, so buildNodeEndPoint throws. AdminRow row = Mockito.mock(AdminRow.class); when(row.getUuid("host_id")).thenReturn(null); - when(row.contains("peer")).thenReturn(false); // local-node row → super returns localEndPoint EndPoint localEndPoint = Mockito.mock(EndPoint.class); - EndPoint result = handler.buildNodeEndPoint(row, null, localEndPoint); - - // hostId == null branch → super.buildNodeEndPoint() is called → returns localEndPoint - assertThat(result).isNotInstanceOf(ClientRoutesEndPoint.class); - assertThat(result).isSameAs(localEndPoint); + assertThatThrownBy(() -> handler.buildNodeEndPoint(row, null, localEndPoint)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("host_id is null"); } @Test @@ -1065,7 +1078,6 @@ public void should_build_client_routes_endpoint_when_host_id_non_null() { UUID hostId = UUID.randomUUID(); AdminRow row = Mockito.mock(AdminRow.class); when(row.getUuid("host_id")).thenReturn(hostId); - when(row.contains("peer")).thenReturn(false); EndPoint localEndPoint = Mockito.mock(EndPoint.class); EndPoint result = handler.buildNodeEndPoint(row, null, localEndPoint); @@ -1138,24 +1150,44 @@ hostId1, new ClientRouteRecord(hostId1, "127.0.0.1", 9042), } @Test - public void should_resolve_to_fallback_when_no_route_for_host_id() { - // Simulates a node that is not accessed via PrivateLink (no route in cache for its host_id). - // resolve() must return the regular endpoint address (the fallback), not throw. + public void should_throw_when_no_route_for_host_id_and_direct_connection_fallback_disabled() { + // With direct-connection-fallback=false: resolve() must throw instead of falling back to + // broadcast address, to prevent the driver from bypassing proxy infrastructure. + TestableClientRoutesTopologyMonitor noFallbackHandler = + createHandlerWithDirectConnectionFallback(false); + UUID hostId = UUID.randomUUID(); + AdminRow row = Mockito.mock(AdminRow.class); + when(row.getUuid("host_id")).thenReturn(hostId); + EndPoint localEndPoint = Mockito.mock(EndPoint.class); + + EndPoint endpoint = noFallbackHandler.buildNodeEndPoint(row, null, localEndPoint); + assertThat(endpoint).isInstanceOf(ClientRoutesEndPoint.class); + + // Cache is empty (no route for this host_id) → must throw, not fall back + assertThatThrownBy(endpoint::resolve) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No client route entry found") + .hasMessageContaining(hostId.toString()); + } + + @Test + public void should_fall_back_to_broadcast_when_no_route_and_direct_connection_fallback_enabled() + throws Exception { + // With direct-connection-fallback=true (default): resolve() delegates to the fallback endpoint + // when no route exists, supporting mixed proxy/direct topologies. UUID hostId = UUID.randomUUID(); - InetSocketAddress fallbackAddress = new InetSocketAddress("127.0.0.99", 9999); AdminRow row = Mockito.mock(AdminRow.class); when(row.getUuid("host_id")).thenReturn(hostId); - when(row.contains("peer")).thenReturn(false); EndPoint localEndPoint = Mockito.mock(EndPoint.class); - when(localEndPoint.resolve()).thenReturn(fallbackAddress); + InetSocketAddress directAddress = new InetSocketAddress("10.0.0.1", 9042); + when(localEndPoint.resolve()).thenReturn(directAddress); + // handler uses directConnectionFallback=true (set in setup()) EndPoint endpoint = handler.buildNodeEndPoint(row, null, localEndPoint); assertThat(endpoint).isInstanceOf(ClientRoutesEndPoint.class); - // Cache is empty (no PrivateLink route) → resolves to the regular endpoint address - SocketAddress resolved = ((ClientRoutesEndPoint) endpoint).resolve(); - assertThat(resolved).isEqualTo(fallbackAddress); - Mockito.verify(localEndPoint).resolve(); + // Cache is empty → falls back to the direct broadcast address + assertThat(endpoint.resolve()).isEqualTo(directAddress); } // ---- savePort() --------------------------------------------------------