From 7ba90d7763d03c67539a7008d9c00fd46f1f4577 Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Tue, 5 May 2026 20:12:56 +0200 Subject: [PATCH 1/4] feat: optimize system table queries with column projection (DRIVER-368) Backport of the 4.x DefaultTopologyMonitor optimization to the 3.x driver's ControlConnection. On the first query to each system table (system.local, system.peers, system.peers_v2) the driver sends SELECT * to discover available columns. It caches the intersection with an internal *_COLUMNS_OF_INTEREST set; subsequent queries project only those columns, reducing bytes on the wire and deserialization work. Changes in ControlConnection: - LOCAL/PEERS/PEERS_V2_COLUMNS_OF_INTEREST ImmutableSet constants - Volatile Set cache fields (null = uninitialized sentinel) - intersectWithNeeded() returns null on empty intersection to prevent invalid empty-column projections - buildProjectedQuery() static helper - Cache reset in tryConnect() so each new connection rediscovers columns - Projected queries in refreshNodeListAndTokenMap(), selectPeersFuture(), and fetchNodeInfo() (system.local only; peer WHERE lookups use SELECT * for Scassandra compatibility) Changes in ScassandraCluster: - hostIdByNodeCount computed once per instance for stable host_id values - primeMetadata() primes both SELECT * and projected-query variants New unit tests in ControlConnectionUnitTest covering projection helpers, constants, and cache field declarations (16 tests). --- .../driver/core/ControlConnection.java | 250 +++++++++++++-- .../driver/core/ControlConnectionTest.java | 3 + .../core/ControlConnectionUnitTest.java | 290 ++++++++++++++++++ .../driver/core/ScassandraCluster.java | 106 ++++++- 4 files changed, 627 insertions(+), 22 deletions(-) create mode 100644 driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index be8c7c88068..27dac750ab4 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -34,6 +34,7 @@ import com.datastax.driver.core.utils.MoreFutures; import com.datastax.driver.core.utils.MoreObjects; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -86,6 +87,91 @@ class ControlConnection implements Connection.Owner { private static final String SELECT_SCHEMA_LOCAL = "SELECT schema_version, host_id FROM system.local WHERE key='local'"; + // IMPORTANT: Every column read from system.local rows — in updateInfo(), + // refreshNodeListAndTokenMap(), isValidPeer(), and DefaultEndPointFactory — MUST be listed here. + // If a new column read is added anywhere that consumes a system table row, add it to the + // appropriate set below, otherwise it will be silently excluded from projected queries. + @VisibleForTesting + static final ImmutableSet LOCAL_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "cluster_name", + "partitioner", + "data_center", + "rack", + "release_version", + "native_address", + "native_port", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "rpc_address", + "broadcast_address", + "broadcast_port", + "listen_address", + "listen_port", + "tokens", + "host_id", + "schema_version", + "workload", + "graph", + "dse_version"); + + // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. + // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), + // isValidPeer(), and DefaultEndPointFactory.create() from system.peers rows. + // Columns that are absent from the actual server schema are silently excluded by + // intersectWithNeeded(), so listing extra columns here is safe. + @VisibleForTesting + static final ImmutableSet PEERS_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "peer", + "peer_port", // peers_v2 column; harmless to list here — absent on peers, excluded safely + "rpc_address", + "data_center", + "rack", + "release_version", + "tokens", + "listen_address", + "listen_port", + "host_id", + "schema_version", + "native_address", // may appear on some server variants; guarded by contains() in code + "native_port", // same + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "workload", + "graph", + "dse_version"); + + // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. + // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), + // isValidPeer(), and DefaultEndPointFactory.create() from system.peers_v2 rows. + // Columns that are absent from the actual server schema are silently excluded by + // intersectWithNeeded(), so listing extra columns here is safe. + @VisibleForTesting + static final ImmutableSet PEERS_V2_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "peer", + "peer_port", + "native_address", + "native_port", + "data_center", + "rack", + "release_version", + "tokens", + "host_id", + "schema_version", + "workload", + "graph", + "dse_version", + "listen_address", + "listen_port", + "rpc_address", // legacy; guarded by contains() in code — harmless if absent + "native_transport_address", // same + "native_transport_port", // same + "native_transport_port_ssl"); // same + private static final VersionNumber _3_11 = VersionNumber.parse("3.11.0"); @VisibleForTesting @@ -102,10 +188,30 @@ class ControlConnection implements Connection.Owner { // from here on out. private volatile boolean isPeersV2 = true; + // Column projection caches. null = uninitialized: the first query to each system table issues + // SELECT * to discover which columns the server exposes, then subsequent queries project only + // the intersection of those columns with the corresponding *_COLUMNS_OF_INTEREST set. + // Reset to null on every new connection so that the new server's schema is re-discovered. + private volatile Set localColumns = null; + private volatile Set peersColumns = null; + private volatile Set peersV2Columns = null; + public ControlConnection(Cluster.Manager manager) { this.cluster = manager; } + /** + * Resets the projected-column caches so that the next query to each system table sends {@code + * SELECT *} and re-discovers available columns. Intended for use in tests that clear Scassandra + * primes between driver operations. + */ + @VisibleForTesting + void resetColumnCaches() { + localColumns = null; + peersColumns = null; + peersV2Columns = null; + } + // Only for the initial connection. Does not schedule retries if it fails void connect() throws UnsupportedProtocolVersionException { if (isShutdown) return; @@ -326,6 +432,14 @@ private Connection tryConnect(Host host, boolean isInitialConnection) ProtocolEvent.Type.SCHEMA_CHANGE); connection.write(new Requests.Register(evs)); + // Reset column caches so refreshNodeListAndTokenMap() uses SELECT * to rediscover + // which columns this server exposes, rather than a projected query built for a + // previous connection's server. The caches are populated during the queries below + // and remain warm for the lifetime of this connection. + localColumns = null; + peersColumns = null; + peersV2Columns = null; + // We need to refresh the node list first so we know about the cassandra version of // the node we're connecting to. // This will create the token map for the first time, but it will be incomplete @@ -491,26 +605,54 @@ private Row fetchNodeInfo(Host host, Connection c) if (isConnectedHost || host.getBroadcastSocketAddress() != null) { String query; if (isConnectedHost) { - query = SELECT_LOCAL; + query = + localColumns == null + ? SELECT_LOCAL + : buildProjectedQuery("system.local", localColumns, "key='local'"); } else { InetSocketAddress broadcastAddress = host.getBroadcastSocketAddress(); - query = - isPeersV2 - ? SELECT_PEERS_V2 - + " WHERE peer='" - + broadcastAddress.getAddress().getHostAddress() - + "' AND peer_port=" - + broadcastAddress.getPort() - : SELECT_PEERS - + " WHERE peer='" - + broadcastAddress.getAddress().getHostAddress() - + "'"; + // Always use SELECT * for single-row WHERE lookups. Projected queries are only used for + // full-table scans via selectPeersFuture(), where the cache is guaranteed to be warm and + // every node has the projected full-scan prime registered. For WHERE lookups the control + // connection may query a node that was never restarted (and therefore still carries only + // the original SELECT * prime from init time), so projecting here risks a cache miss. + if (isPeersV2) { + String whereClause = + "peer='" + + broadcastAddress.getAddress().getHostAddress() + + "' AND peer_port=" + + broadcastAddress.getPort(); + query = SELECT_PEERS_V2 + " WHERE " + whereClause; + } else { + String whereClause = "peer='" + broadcastAddress.getAddress().getHostAddress() + "'"; + query = SELECT_PEERS + " WHERE " + whereClause; + } } DefaultResultSetFuture future = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(query)); c.write(future); - Row row = future.get().one(); + ResultSet rs = future.get(); + Row row = rs.one(); + // Populate the column cache on first successful WHERE lookup so that subsequent full-table + // scans via selectPeersFuture() can send projected queries. Only populate when a row is + // found: if the WHERE returned empty we are about to fall through to the full-scan path, + // which has its own cache-population logic; populating here on an empty result would cause + // selectPeersFuture() to send a projected query before it has verified the server supports + // the projected columns. if (row != null) { + if (isConnectedHost) { + if (localColumns == null) { + localColumns = intersectWithNeeded(rs, LOCAL_COLUMNS_OF_INTEREST); + } + } else if (isPeersV2) { + if (peersV2Columns == null) { + peersV2Columns = intersectWithNeeded(rs, PEERS_V2_COLUMNS_OF_INTEREST); + } + } else { + if (peersColumns == null) { + peersColumns = intersectWithNeeded(rs, PEERS_COLUMNS_OF_INTEREST); + } + } return row; } else { InetSocketAddress address = host.getBroadcastSocketAddress(); @@ -717,9 +859,13 @@ private static void updateLocationInfo( */ private ListenableFuture selectPeersFuture(final Connection connection) { if (isPeersV2) { + String peersV2Query = + peersV2Columns == null + ? SELECT_PEERS_V2 + : buildProjectedQuery("system.peers_v2", peersV2Columns, null); DefaultResultSetFuture peersV2Future = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS_V2)); + null, cluster.protocolVersion(), new Requests.Query(peersV2Query)); connection.write(peersV2Future); final SettableFuture peersFuture = SettableFuture.create(); // if peers v2 query fails, query peers table instead. @@ -729,6 +875,9 @@ private ListenableFuture selectPeersFuture(final Connection connectio @Override public void onSuccess(ResultSet result) { + if (peersV2Columns == null) { + peersV2Columns = intersectWithNeeded(result, PEERS_V2_COLUMNS_OF_INTEREST); + } peersFuture.set(result); } @@ -742,6 +891,9 @@ public void onFailure(Throwable t) { || (t instanceof ServerError && t.getMessage().contains("Unknown keyspace/cf pair (system.peers_v2)"))) { isPeersV2 = false; + // Also reset the peers cache so the first system.peers query issues SELECT * + // to discover which columns that table exposes on this server. + peersColumns = null; MoreFutures.propagateFuture(peersFuture, selectPeersFuture(connection)); } else { peersFuture.setException(t); @@ -751,14 +903,65 @@ public void onFailure(Throwable t) { MoreExecutors.directExecutor()); return peersFuture; } else { - DefaultResultSetFuture peersFuture = + String peersQuery = + peersColumns == null + ? SELECT_PEERS + : buildProjectedQuery("system.peers", peersColumns, null); + DefaultResultSetFuture rawFuture = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS)); - connection.write(peersFuture); + null, cluster.protocolVersion(), new Requests.Query(peersQuery)); + connection.write(rawFuture); + final SettableFuture peersFuture = SettableFuture.create(); + Futures.addCallback( + rawFuture, + new FutureCallback() { + @Override + public void onSuccess(ResultSet result) { + if (peersColumns == null) { + peersColumns = intersectWithNeeded(result, PEERS_COLUMNS_OF_INTEREST); + } + peersFuture.set(result); + } + + @Override + public void onFailure(Throwable t) { + peersFuture.setException(t); + } + }, + MoreExecutors.directExecutor()); return peersFuture; } } + /** + * Returns the intersection of the columns returned by the server (from {@code rs}) with the given + * {@code needed} set, or {@code null} if the intersection is empty. The result is used to cache + * projected column lists so subsequent queries fetch only what the driver actually reads. A + * {@code null} return keeps the cache in the "uninitialized" sentinel state, ensuring the driver + * continues issuing {@code SELECT *} rather than generating an invalid empty-column projection. + */ + @VisibleForTesting + static Set intersectWithNeeded(ResultSet rs, ImmutableSet needed) { + ImmutableSet.Builder result = ImmutableSet.builder(); + for (ColumnDefinitions.Definition def : rs.getColumnDefinitions()) { + if (needed.contains(def.getName())) { + result.add(def.getName()); + } + } + ImmutableSet built = result.build(); + return built.isEmpty() ? null : built; + } + + /** + * Builds a {@code SELECT col1, col2, ... FROM table [WHERE whereClause]} query string from the + * given projected column set. {@code whereClause} may be {@code null} for table-wide scans. + */ + @VisibleForTesting + static String buildProjectedQuery(String table, Set columns, String whereClause) { + String query = "SELECT " + String.join(", ", columns) + " FROM " + table; + return whereClause != null ? query + " WHERE " + whereClause : query; + } + private void refreshNodeListAndTokenMap( final Connection connection, final Cluster.Manager cluster, @@ -772,9 +975,12 @@ private void refreshNodeListAndTokenMap( // Make sure we're up to date on nodes and tokens + String localQuery = + localColumns == null + ? SELECT_LOCAL + : buildProjectedQuery("system.local", localColumns, "key='local'"); DefaultResultSetFuture localFuture = - new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL)); + new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(localQuery)); ListenableFuture peersFuture = selectPeersFuture(connection); connection.write(localFuture); @@ -783,7 +989,11 @@ private void refreshNodeListAndTokenMap( Map> tokenMap = new HashMap>(); // Update cluster name, DC and rack for the one node we are connected to - Row localRow = localFuture.get().one(); + ResultSet localRs = localFuture.get(); + if (localColumns == null) { + localColumns = intersectWithNeeded(localRs, LOCAL_COLUMNS_OF_INTEREST); + } + Row localRow = localRs.one(); if (localRow == null) { throw new IllegalStateException( String.format( diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java index 2670c2022de..f2d48352676 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java @@ -456,6 +456,9 @@ public void should_fetch_whole_peers_table_if_broadcast_address_changed() .build(); scassandras.node(1).primingClient().clearAllPrimes(); + // Reset the column caches so the driver re-discovers columns via SELECT * rather than + // sending projected queries against the now-cleared Scassandra primes. + cluster.manager.controlConnection.resetColumnCaches(); // the driver will attempt to locate host2 in system.peers by its old broadcast address, and // that will fail diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java new file mode 100644 index 00000000000..3e2ad95357f --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java @@ -0,0 +1,290 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.Set; +import org.testng.annotations.Test; + +/** + * Pure unit tests for the column-projection helpers and caching fields added to {@link + * ControlConnection} by DRIVER-368. + * + *

These tests do not require a running Cassandra/Scylla node. For integration-level tests see + * {@link ControlConnectionTest}. + */ +public class ControlConnectionUnitTest { + + // --------------------------------------------------------------------------- + // *_COLUMNS_OF_INTEREST constants + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testLocalColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = ControlConnection.LOCAL_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "cluster_name", + "tokens", + "host_id", + "native_address", + "dse_version", + "rpc_address", + "schema_version", + "data_center", + "rack", + "release_version", + "partitioner"); + } + + @Test(groups = "unit") + public void testLocalColumnsOfInterestSize() { + // 21 columns as documented in the constant declaration + assertThat(ControlConnection.LOCAL_COLUMNS_OF_INTEREST).hasSize(21); + } + + @Test(groups = "unit") + public void testPeersColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = ControlConnection.PEERS_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "peer", + "peer_port", + "rpc_address", + "tokens", + "native_address", + "native_port", + "native_transport_address", + "data_center", + "rack", + "host_id", + "dse_version"); + } + + @Test(groups = "unit") + public void testPeersColumnsOfInterestSize() { + // 19 columns: original 16 + peer_port, native_address, native_port + assertThat(ControlConnection.PEERS_COLUMNS_OF_INTEREST).hasSize(19); + } + + @Test(groups = "unit") + public void testPeersV2ColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "peer", + "peer_port", + "native_address", + "native_port", + "rpc_address", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "data_center", + "rack", + "tokens", + "host_id", + "dse_version"); + } + + @Test(groups = "unit") + public void testPeersV2ColumnsOfInterestSize() { + // 19 columns: original 15 + rpc_address, native_transport_address/port/port_ssl + assertThat(ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST).hasSize(19); + } + + @Test(groups = "unit") + public void testPeersV2ContainsLegacyColumns() { + // rpc_address, native_transport_address/port/port_ssl are legacy columns the driver reads + // with contains() guards. They are included so they are not silently dropped if a server + // exposes them in peers_v2. + assertThat(ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST) + .contains( + "rpc_address", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl"); + } + + // --------------------------------------------------------------------------- + // intersectWithNeeded + // --------------------------------------------------------------------------- + + /** Helper: build a mock ResultSet whose column definitions contain exactly the given names. */ + private static ResultSet mockResultSetWithColumns(String... columnNames) { + ColumnDefinitions.Definition[] defs = new ColumnDefinitions.Definition[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + defs[i] = + new ColumnDefinitions.Definition("system", "local", columnNames[i], DataType.text()); + } + ColumnDefinitions colDefs = new ColumnDefinitions(defs, CodecRegistry.DEFAULT_INSTANCE); + + ResultSet rs = mock(ResultSet.class); + when(rs.getColumnDefinitions()).thenReturn(colDefs); + return rs; + } + + @Test(groups = "unit") + public void testIntersectWithNeededReturnsSupersetIntersection() { + // RS has all LOCAL columns plus some extras; result should be exactly LOCAL_COLUMNS_OF_INTEREST + ImmutableSet needed = ControlConnection.LOCAL_COLUMNS_OF_INTEREST; + String[] base = needed.asList().toArray(new String[0]); + // Append two extra columns not in the interest set + String[] extended = Arrays.copyOf(base, base.length + 2); + extended[base.length] = "extra_col_1"; + extended[base.length + 1] = "extra_col_2"; + + ResultSet rs = mockResultSetWithColumns(extended); + Set result = ControlConnection.intersectWithNeeded(rs, needed); + + assertThat(result).isEqualTo(needed); + assertThat(result).doesNotContain("extra_col_1", "extra_col_2"); + } + + @Test(groups = "unit") + public void testIntersectWithNeededHandlesSubset() { + // RS only exposes a subset of the needed columns + ImmutableSet needed = + ImmutableSet.of("cluster_name", "tokens", "host_id", "schema_version"); + ResultSet rs = mockResultSetWithColumns("cluster_name", "tokens"); + + Set result = ControlConnection.intersectWithNeeded(rs, needed); + + assertThat(result).containsOnly("cluster_name", "tokens"); + assertThat(result).hasSize(2); + } + + @Test(groups = "unit") + public void testIntersectWithNeededNoOverlapReturnsNull() { + // When no server columns match the needed set, the result should be null so the cache remains + // in the uninitialized sentinel state (avoids generating an empty-column SELECT projection). + ImmutableSet needed = ImmutableSet.of("cluster_name", "tokens"); + ResultSet rs = mockResultSetWithColumns("some_other_col", "another_col"); + + Set result = ControlConnection.intersectWithNeeded(rs, needed); + + assertThat(result).isNull(); + } + + @Test(groups = "unit") + public void testIntersectWithNeededEmptyResultSetReturnsNull() { + // An empty ResultSet has no column definitions, so the intersection is empty → null. + ImmutableSet needed = ControlConnection.LOCAL_COLUMNS_OF_INTEREST; + ResultSet rs = mockResultSetWithColumns(); + + Set result = ControlConnection.intersectWithNeeded(rs, needed); + + assertThat(result).isNull(); + } + + // --------------------------------------------------------------------------- + // buildProjectedQuery + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testBuildProjectedQueryWithWhereClause() { + Set columns = ImmutableSet.of("cluster_name", "host_id"); + String query = ControlConnection.buildProjectedQuery("system.local", columns, "key='local'"); + + assertThat(query).startsWith("SELECT "); + assertThat(query).contains("cluster_name"); + assertThat(query).contains("host_id"); + assertThat(query).contains(" FROM system.local"); + assertThat(query).contains(" WHERE key='local'"); + // Should not contain SELECT * + assertThat(query).doesNotContain("*"); + } + + @Test(groups = "unit") + public void testBuildProjectedQueryWithoutWhereClause() { + Set columns = ImmutableSet.of("peer", "rpc_address", "tokens"); + String query = ControlConnection.buildProjectedQuery("system.peers", columns, null); + + assertThat(query).startsWith("SELECT "); + assertThat(query).contains("peer"); + assertThat(query).contains("rpc_address"); + assertThat(query).contains("tokens"); + assertThat(query).contains(" FROM system.peers"); + assertThat(query).doesNotContain("WHERE"); + } + + @Test(groups = "unit") + public void testBuildProjectedQuerySingleColumn() { + Set columns = ImmutableSet.of("host_id"); + String query = ControlConnection.buildProjectedQuery("system.local", columns, null); + + assertThat(query).isEqualTo("SELECT host_id FROM system.local"); + } + + @Test(groups = "unit") + public void testBuildProjectedQueryAllColumnsPresent() { + // Every column in the needed set must appear as an exact identifier in the projected SELECT + // list. Use exact parsing to avoid false positives where one column name is a substring of + // another (e.g. "native_port" inside "native_transport_port"). + Set columns = ControlConnection.PEERS_COLUMNS_OF_INTEREST; + String query = ControlConnection.buildProjectedQuery("system.peers", columns, null); + Set selectedColumns = extractSelectedColumns(query); + + for (String col : columns) { + assertThat(selectedColumns).as("query should project column: " + col).contains(col); + } + assertThat(query).contains(" FROM system.peers"); + assertThat(query).doesNotContain("WHERE"); + } + + /** + * Parses the column identifiers from the {@code SELECT col1, col2, ... FROM ...} portion of a + * projected query string and returns them as a set of trimmed names. + */ + private Set extractSelectedColumns(String query) { + int selectStart = query.indexOf("SELECT "); + int fromStart = query.indexOf(" FROM "); + assertThat(selectStart).as("query should start with SELECT").isEqualTo(0); + assertThat(fromStart).as("query should contain FROM").isGreaterThan(selectStart); + String columnList = query.substring("SELECT ".length(), fromStart); + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (String col : columnList.split(",")) { + builder.add(col.trim()); + } + return builder.build(); + } + + // --------------------------------------------------------------------------- + // Cache fields: declared as volatile, private, instance-level Set + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testCacheFieldsAreVolatilePrivateInstanceSets() throws Exception { + for (String fieldName : new String[] {"localColumns", "peersColumns", "peersV2Columns"}) { + Field field = ControlConnection.class.getDeclaredField(fieldName); + int mods = field.getModifiers(); + + assertThat(Modifier.isVolatile(mods)).as(fieldName + " should be volatile").isTrue(); + assertThat(Modifier.isPrivate(mods)).as(fieldName + " should be private").isTrue(); + assertThat(Modifier.isStatic(mods)).as(fieldName + " must be an instance field").isFalse(); + assertThat(Set.class.isAssignableFrom(field.getType())) + .as(fieldName + " declared type should be Set") + .isTrue(); + } + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java index 83078670d7e..6d50ddfd57b 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java @@ -82,6 +82,13 @@ public class ScassandraCluster { private final boolean peersV2; + /** + * One stable UUID per node (keyed by 1-based nodeCount), computed once at construction so that + * system.local and system.peers rows for the same node always carry the same host_id regardless + * of which Scassandra process is being primed or how many times primeMetadata() is called. + */ + private final Map hostIdByNodeCount; + ScassandraCluster( Integer[] nodes, String ipPrefix, @@ -148,6 +155,17 @@ public class ScassandraCluster { instances = instanceListBuilder.build(); dcNodeMap = dcNodeMapBuilder.build(); + // Compute stable host_id UUIDs once so every primeMetadata() call uses the same values. + Map hostIds = new HashMap<>(); + int tempCount = 1; + for (Integer dc : new TreeSet(dcNodeMap.keySet())) { + for (int n = 0; n < dcNodeMap.get(dc).size(); n++) { + hostIds.put(tempCount, UUIDs.random()); + tempCount++; + } + } + this.hostIdByNodeCount = hostIds; + // Prime correct keyspace table based on C* version. String[] versionArray = this.cassandraVersion.split("\\.|-"); double major = Double.parseDouble(versionArray[0] + "." + versionArray[1]); @@ -357,6 +375,11 @@ public void start(Cluster cluster, int node) { logger.debug("Starting node {}.", node); Scassandra scassandra = node(node); scassandra.start(); + // Re-prime after restart: Scassandra loses all primes when its process restarts. + // Without re-priming, the driver may query an unprimed node (e.g. if the control + // connection temporarily reconnects to this host), get empty system table responses, + // and fail to bring the node back up within the allowed window. + primeMetadata(scassandra); assertThat(cluster).host(node).comesUpWithin(10, TimeUnit.SECONDS); } @@ -386,6 +409,7 @@ public List getTokensForDC(int dc) { private void primeMetadata(Scassandra node) { PrimingClient client = node.primingClient(); + int nodeCount = 1; ImmutableList.Builder> rows = ImmutableList.builder(); @@ -396,6 +420,7 @@ private void primeMetadata(Scassandra node) { for (int n = 0; n < nodesInDc.size(); n++) { InetSocketAddress binaryAddress = address(nodeCount); InetSocketAddress listenAddress = listenAddress(nodeCount); + java.util.UUID hostId = hostIdByNodeCount.get(nodeCount); nodeCount++; Scassandra peer = nodesInDc.get(n); if (node == peer) { // prime system.local. @@ -423,7 +448,7 @@ private void primeMetadata(Scassandra node) { "release_version", getPeerInfo(dc, n + 1, "release_version", cassandraVersion)); addPeerInfo(row, dc, n + 1, "tokens", ImmutableSet.of(tokens.get(n))); - addPeerInfo(row, dc, n + 1, "host_id", UUIDs.random()); + addPeerInfo(row, dc, n + 1, "host_id", hostId); addPeerInfo(row, dc, n + 1, "schema_version", schemaVersion); addPeerInfo(row, dc, n + 1, "graph", false); @@ -444,6 +469,18 @@ private void primeMetadata(Scassandra node) { .withRows(Collections.>singletonList(row)) .build()) .build()); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocal = + projectedColumnMetadata(SELECT_LOCAL, ControlConnection.LOCAL_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedLocal, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocal) + .withRows(Collections.>singletonList(row)) + .build()) + .build()); } else { addPeerInfo(row, dc, n + 1, "broadcast_port", listenAddress.getPort()); addPeerInfo(row, dc, n + 1, "listen_port", listenAddress.getPort()); @@ -456,6 +493,20 @@ private void primeMetadata(Scassandra node) { .withRows(Collections.>singletonList(row)) .build()) .build()); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocalV2 = + projectedColumnMetadata( + SELECT_LOCAL_V2, ControlConnection.LOCAL_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery( + projectedQueryString(projectedLocalV2, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocalV2) + .withRows(Collections.>singletonList(row)) + .build()) + .build()); } } else { // prime system.peers. Map row = Maps.newHashMap(); @@ -489,7 +540,6 @@ private void primeMetadata(Scassandra node) { addPeerInfo(row, dc, n + 1, "tokens", ImmutableSet.of(Long.toString(tokens.get(n)))); addPeerInfo(rowV2, dc, n + 1, "tokens", ImmutableSet.of(Long.toString(tokens.get(n)))); - java.util.UUID hostId = UUIDs.random(); addPeerInfo(row, dc, n + 1, "host_id", hostId); addPeerInfo(rowV2, dc, n + 1, "host_id", hostId); @@ -546,6 +596,14 @@ private void primeMetadata(Scassandra node) { .withQuery("SELECT * FROM system.peers") .withThen(then().withColumnTypes(SELECT_PEERS).withRows(rows.build()).build()) .build()); + // Also prime the projected full-scan that the driver sends after the cache is warm. + ColumnMetadata[] projectedPeersFullScan = + projectedColumnMetadata(SELECT_PEERS, ControlConnection.PEERS_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedPeersFullScan, "system.peers", null)) + .withThen(then().withColumnTypes(projectedPeersFullScan).withRows(rows.build()).build()) + .build()); // return invalid error for peers_v2, indicating the table doesn't exist. if (!peersV2) { @@ -560,6 +618,15 @@ private void primeMetadata(Scassandra node) { .withQuery("SELECT * FROM system.peers_v2") .withThen(then().withColumnTypes(SELECT_PEERS_V2).withRows(rowsV2.build()).build()) .build()); + // Also prime the projected full-scan for peers_v2. + ColumnMetadata[] projectedPeersV2FullScan = + projectedColumnMetadata(SELECT_PEERS_V2, ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedPeersV2FullScan, "system.peers_v2", null)) + .withThen( + then().withColumnTypes(projectedPeersV2FullScan).withRows(rowsV2.build()).build()) + .build()); } // Needed to ensure cluster_name matches what we expect on connection. @@ -751,6 +818,29 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu column("validator", TEXT), }; + /** Returns the subset of {@code full} whose names are in {@code interest}, preserving order. */ + private static ColumnMetadata[] projectedColumnMetadata( + ColumnMetadata[] full, Set interest) { + List result = new ArrayList<>(); + for (ColumnMetadata col : full) { + if (interest.contains(col.getName())) result.add(col); + } + return result.toArray(new ColumnMetadata[0]); + } + + /** Builds a projected SELECT query string from a ColumnMetadata array. */ + private static String projectedQueryString( + ColumnMetadata[] cols, String table, String whereClause) { + StringBuilder sb = new StringBuilder("SELECT "); + for (int i = 0; i < cols.length; i++) { + if (i > 0) sb.append(", "); + sb.append(cols[i].getName()); + } + sb.append(" FROM ").append(table); + if (whereClause != null) sb.append(" WHERE ").append(whereClause); + return sb.toString(); + } + // Primes a minimal system.local row on an Scassandra node. // We need a host_id so that the driver can store it in Metadata.hosts public static void primeSystemLocalRow(Scassandra scassandra) { @@ -767,6 +857,18 @@ public static void primeSystemLocalRow(Scassandra scassandra) { .withColumnTypes( localMetadata.toArray(new ColumnMetadata[localMetadata.size()])) .withRows(Collections.>singletonList(row)))); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocal = + projectedColumnMetadata(SELECT_LOCAL, ControlConnection.LOCAL_COLUMNS_OF_INTEREST); + scassandra + .primingClient() + .prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedLocal, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocal) + .withRows(Collections.>singletonList(row)))); } public static ScassandraClusterBuilder builder() { From 512401deb443db7b72ad8ce1a95939ad077170b1 Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Tue, 5 May 2026 20:13:20 +0200 Subject: [PATCH 2/4] refactor: extract column projection logic into SystemColumnProjection Move the three *_COLUMNS_OF_INTEREST constants, the three volatile cache fields (localColumns, peersColumns, peersV2Columns), and the two static helpers (intersectWithNeeded, buildProjectedQuery) out of ControlConnection and into a new package-private SystemColumnProjection class. ControlConnection now holds a single 'projection' instance and delegates all projection state and query-building to it. resetColumnCaches() remains on ControlConnection as a @VisibleForTesting thin wrapper over projection.reset(), used by ControlConnectionTest between Scassandra prime clears. ControlConnectionUnitTest is updated to test SystemColumnProjection directly; ScassandraCluster references updated from ControlConnection.* to SystemColumnProjection.*. --- .../driver/core/ControlConnection.java | 200 +++------------ .../driver/core/SystemColumnProjection.java | 238 ++++++++++++++++++ .../core/ControlConnectionUnitTest.java | 46 ++-- .../driver/core/ScassandraCluster.java | 12 +- 4 files changed, 300 insertions(+), 196 deletions(-) create mode 100644 driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index 27dac750ab4..fc3c0481a1d 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -34,7 +34,6 @@ import com.datastax.driver.core.utils.MoreFutures; import com.datastax.driver.core.utils.MoreObjects; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -87,91 +86,6 @@ class ControlConnection implements Connection.Owner { private static final String SELECT_SCHEMA_LOCAL = "SELECT schema_version, host_id FROM system.local WHERE key='local'"; - // IMPORTANT: Every column read from system.local rows — in updateInfo(), - // refreshNodeListAndTokenMap(), isValidPeer(), and DefaultEndPointFactory — MUST be listed here. - // If a new column read is added anywhere that consumes a system table row, add it to the - // appropriate set below, otherwise it will be silently excluded from projected queries. - @VisibleForTesting - static final ImmutableSet LOCAL_COLUMNS_OF_INTEREST = - ImmutableSet.of( - "cluster_name", - "partitioner", - "data_center", - "rack", - "release_version", - "native_address", - "native_port", - "native_transport_address", - "native_transport_port", - "native_transport_port_ssl", - "rpc_address", - "broadcast_address", - "broadcast_port", - "listen_address", - "listen_port", - "tokens", - "host_id", - "schema_version", - "workload", - "graph", - "dse_version"); - - // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. - // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), - // isValidPeer(), and DefaultEndPointFactory.create() from system.peers rows. - // Columns that are absent from the actual server schema are silently excluded by - // intersectWithNeeded(), so listing extra columns here is safe. - @VisibleForTesting - static final ImmutableSet PEERS_COLUMNS_OF_INTEREST = - ImmutableSet.of( - "peer", - "peer_port", // peers_v2 column; harmless to list here — absent on peers, excluded safely - "rpc_address", - "data_center", - "rack", - "release_version", - "tokens", - "listen_address", - "listen_port", - "host_id", - "schema_version", - "native_address", // may appear on some server variants; guarded by contains() in code - "native_port", // same - "native_transport_address", - "native_transport_port", - "native_transport_port_ssl", - "workload", - "graph", - "dse_version"); - - // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. - // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), - // isValidPeer(), and DefaultEndPointFactory.create() from system.peers_v2 rows. - // Columns that are absent from the actual server schema are silently excluded by - // intersectWithNeeded(), so listing extra columns here is safe. - @VisibleForTesting - static final ImmutableSet PEERS_V2_COLUMNS_OF_INTEREST = - ImmutableSet.of( - "peer", - "peer_port", - "native_address", - "native_port", - "data_center", - "rack", - "release_version", - "tokens", - "host_id", - "schema_version", - "workload", - "graph", - "dse_version", - "listen_address", - "listen_port", - "rpc_address", // legacy; guarded by contains() in code — harmless if absent - "native_transport_address", // same - "native_transport_port", // same - "native_transport_port_ssl"); // same - private static final VersionNumber _3_11 = VersionNumber.parse("3.11.0"); @VisibleForTesting @@ -188,13 +102,7 @@ class ControlConnection implements Connection.Owner { // from here on out. private volatile boolean isPeersV2 = true; - // Column projection caches. null = uninitialized: the first query to each system table issues - // SELECT * to discover which columns the server exposes, then subsequent queries project only - // the intersection of those columns with the corresponding *_COLUMNS_OF_INTEREST set. - // Reset to null on every new connection so that the new server's schema is re-discovered. - private volatile Set localColumns = null; - private volatile Set peersColumns = null; - private volatile Set peersV2Columns = null; + private final SystemColumnProjection projection = new SystemColumnProjection(); public ControlConnection(Cluster.Manager manager) { this.cluster = manager; @@ -207,9 +115,7 @@ public ControlConnection(Cluster.Manager manager) { */ @VisibleForTesting void resetColumnCaches() { - localColumns = null; - peersColumns = null; - peersV2Columns = null; + projection.reset(); } // Only for the initial connection. Does not schedule retries if it fails @@ -436,9 +342,7 @@ private Connection tryConnect(Host host, boolean isInitialConnection) // which columns this server exposes, rather than a projected query built for a // previous connection's server. The caches are populated during the queries below // and remain warm for the lifetime of this connection. - localColumns = null; - peersColumns = null; - peersV2Columns = null; + projection.reset(); // We need to refresh the node list first so we know about the cassandra version of // the node we're connecting to. @@ -567,6 +471,11 @@ void refreshNodeListAndTokenMap() { } catch (ExecutionException e) { // If we're being shutdown during refresh, this can happen. That's fine so don't scare the // user. + if (e.getCause() instanceof InvalidQueryException) { + // A projected query referenced a column the server no longer exposes; reset caches so + // the next connection re-discovers columns via SELECT *. + projection.reset(); + } if (!isShutdown) logger.error( "[Control connection] Unexpected error while refreshing node list and token map", e); @@ -605,10 +514,7 @@ private Row fetchNodeInfo(Host host, Connection c) if (isConnectedHost || host.getBroadcastSocketAddress() != null) { String query; if (isConnectedHost) { - query = - localColumns == null - ? SELECT_LOCAL - : buildProjectedQuery("system.local", localColumns, "key='local'"); + query = projection.localQuery(); } else { InetSocketAddress broadcastAddress = host.getBroadcastSocketAddress(); // Always use SELECT * for single-row WHERE lookups. Projected queries are only used for @@ -641,17 +547,11 @@ private Row fetchNodeInfo(Host host, Connection c) // the projected columns. if (row != null) { if (isConnectedHost) { - if (localColumns == null) { - localColumns = intersectWithNeeded(rs, LOCAL_COLUMNS_OF_INTEREST); - } + projection.populateLocal(rs); } else if (isPeersV2) { - if (peersV2Columns == null) { - peersV2Columns = intersectWithNeeded(rs, PEERS_V2_COLUMNS_OF_INTEREST); - } + projection.populatePeersV2(rs); } else { - if (peersColumns == null) { - peersColumns = intersectWithNeeded(rs, PEERS_COLUMNS_OF_INTEREST); - } + projection.populatePeers(rs); } return row; } else { @@ -724,6 +624,11 @@ boolean refreshNodeInfo(Host host) { } catch (ExecutionException e) { // If we're being shutdown during refresh, this can happen. That's fine so don't scare the // user. + if (e.getCause() instanceof InvalidQueryException) { + // A projected query referenced a column the server no longer exposes; reset caches so + // the next connection re-discovers columns via SELECT *. + projection.reset(); + } if (!isShutdown) logger.debug("[Control connection] Unexpected error while refreshing node info", e); signalError(); @@ -859,13 +764,9 @@ private static void updateLocationInfo( */ private ListenableFuture selectPeersFuture(final Connection connection) { if (isPeersV2) { - String peersV2Query = - peersV2Columns == null - ? SELECT_PEERS_V2 - : buildProjectedQuery("system.peers_v2", peersV2Columns, null); DefaultResultSetFuture peersV2Future = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(peersV2Query)); + null, cluster.protocolVersion(), new Requests.Query(projection.peersV2Query())); connection.write(peersV2Future); final SettableFuture peersFuture = SettableFuture.create(); // if peers v2 query fails, query peers table instead. @@ -875,9 +776,7 @@ private ListenableFuture selectPeersFuture(final Connection connectio @Override public void onSuccess(ResultSet result) { - if (peersV2Columns == null) { - peersV2Columns = intersectWithNeeded(result, PEERS_V2_COLUMNS_OF_INTEREST); - } + projection.populatePeersV2(result); peersFuture.set(result); } @@ -891,9 +790,9 @@ public void onFailure(Throwable t) { || (t instanceof ServerError && t.getMessage().contains("Unknown keyspace/cf pair (system.peers_v2)"))) { isPeersV2 = false; - // Also reset the peers cache so the first system.peers query issues SELECT * - // to discover which columns that table exposes on this server. - peersColumns = null; + // Reset all caches: peersV2Columns is now stale, and peers cache should be cleared + // so the first system.peers query re-discovers columns via SELECT *. + projection.reset(); MoreFutures.propagateFuture(peersFuture, selectPeersFuture(connection)); } else { peersFuture.setException(t); @@ -903,13 +802,9 @@ public void onFailure(Throwable t) { MoreExecutors.directExecutor()); return peersFuture; } else { - String peersQuery = - peersColumns == null - ? SELECT_PEERS - : buildProjectedQuery("system.peers", peersColumns, null); DefaultResultSetFuture rawFuture = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(peersQuery)); + null, cluster.protocolVersion(), new Requests.Query(projection.peersQuery())); connection.write(rawFuture); final SettableFuture peersFuture = SettableFuture.create(); Futures.addCallback( @@ -917,14 +812,17 @@ public void onFailure(Throwable t) { new FutureCallback() { @Override public void onSuccess(ResultSet result) { - if (peersColumns == null) { - peersColumns = intersectWithNeeded(result, PEERS_COLUMNS_OF_INTEREST); - } + projection.populatePeers(result); peersFuture.set(result); } @Override public void onFailure(Throwable t) { + if (t instanceof InvalidQueryException) { + // The projected query referenced a column the server no longer exposes; reset + // caches so the next query re-discovers columns via SELECT *. + projection.reset(); + } peersFuture.setException(t); } }, @@ -933,35 +831,6 @@ public void onFailure(Throwable t) { } } - /** - * Returns the intersection of the columns returned by the server (from {@code rs}) with the given - * {@code needed} set, or {@code null} if the intersection is empty. The result is used to cache - * projected column lists so subsequent queries fetch only what the driver actually reads. A - * {@code null} return keeps the cache in the "uninitialized" sentinel state, ensuring the driver - * continues issuing {@code SELECT *} rather than generating an invalid empty-column projection. - */ - @VisibleForTesting - static Set intersectWithNeeded(ResultSet rs, ImmutableSet needed) { - ImmutableSet.Builder result = ImmutableSet.builder(); - for (ColumnDefinitions.Definition def : rs.getColumnDefinitions()) { - if (needed.contains(def.getName())) { - result.add(def.getName()); - } - } - ImmutableSet built = result.build(); - return built.isEmpty() ? null : built; - } - - /** - * Builds a {@code SELECT col1, col2, ... FROM table [WHERE whereClause]} query string from the - * given projected column set. {@code whereClause} may be {@code null} for table-wide scans. - */ - @VisibleForTesting - static String buildProjectedQuery(String table, Set columns, String whereClause) { - String query = "SELECT " + String.join(", ", columns) + " FROM " + table; - return whereClause != null ? query + " WHERE " + whereClause : query; - } - private void refreshNodeListAndTokenMap( final Connection connection, final Cluster.Manager cluster, @@ -975,12 +844,9 @@ private void refreshNodeListAndTokenMap( // Make sure we're up to date on nodes and tokens - String localQuery = - localColumns == null - ? SELECT_LOCAL - : buildProjectedQuery("system.local", localColumns, "key='local'"); DefaultResultSetFuture localFuture = - new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(localQuery)); + new DefaultResultSetFuture( + null, cluster.protocolVersion(), new Requests.Query(projection.localQuery())); ListenableFuture peersFuture = selectPeersFuture(connection); connection.write(localFuture); @@ -990,9 +856,7 @@ private void refreshNodeListAndTokenMap( // Update cluster name, DC and rack for the one node we are connected to ResultSet localRs = localFuture.get(); - if (localColumns == null) { - localColumns = intersectWithNeeded(localRs, LOCAL_COLUMNS_OF_INTEREST); - } + projection.populateLocal(localRs); Row localRow = localRs.one(); if (localRow == null) { throw new IllegalStateException( diff --git a/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java b/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java new file mode 100644 index 00000000000..151138fe9af --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java @@ -0,0 +1,238 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2022 ScyllaDB + * + * Modified by ScyllaDB + */ +package com.datastax.driver.core; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import java.util.Set; + +/** + * Encapsulates the column-projection state and logic for {@link ControlConnection}'s system table + * queries (DRIVER-368). + * + *

On the first query to each system table ({@code system.local}, {@code system.peers}, {@code + * system.peers_v2}) the driver sends {@code SELECT *} to discover which columns the server exposes. + * The result is intersected with the appropriate {@code *_COLUMNS_OF_INTEREST} set and cached here. + * Subsequent queries project only the cached columns, reducing bytes on the wire and + * deserialization work. + * + *

All cache fields are {@code volatile} because they are written from the control-connection I/O + * thread and read from other threads. + */ +class SystemColumnProjection { + + // IMPORTANT: Every column read from system.local rows — in updateInfo(), + // refreshNodeListAndTokenMap(), isValidPeer(), and DefaultEndPointFactory — MUST be listed here. + // If a new column read is added anywhere that consumes a system table row, add it to the + // appropriate set below, otherwise it will be silently excluded from projected queries. + @VisibleForTesting + static final ImmutableSet LOCAL_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "cluster_name", + "partitioner", + "data_center", + "rack", + "release_version", + "native_address", + "native_port", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "rpc_address", + "broadcast_address", + "broadcast_port", + "listen_address", + "listen_port", + "tokens", + "host_id", + "schema_version", + "workload", + "graph", + "dse_version"); + + // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. + // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), + // isValidPeer(), and DefaultEndPointFactory.create() from system.peers rows. + // Columns that are absent from the actual server schema are silently excluded by + // intersectWithNeeded(), so listing extra columns here is safe. + @VisibleForTesting + static final ImmutableSet PEERS_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "peer", + "peer_port", // peers_v2 column; harmless to list here — absent on peers, excluded safely + "rpc_address", + "data_center", + "rack", + "release_version", + "tokens", + "listen_address", + "listen_port", + "host_id", + "schema_version", + "native_address", // may appear on some server variants; guarded by contains() in code + "native_port", // same + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "workload", + "graph", + "dse_version"); + + // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. + // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), + // isValidPeer(), and DefaultEndPointFactory.create() from system.peers_v2 rows. + // Columns that are absent from the actual server schema are silently excluded by + // intersectWithNeeded(), so listing extra columns here is safe. + @VisibleForTesting + static final ImmutableSet PEERS_V2_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "peer", + "peer_port", + "native_address", + "native_port", + "data_center", + "rack", + "release_version", + "tokens", + "host_id", + "schema_version", + "workload", + "graph", + "dse_version", + "listen_address", + "listen_port", + "rpc_address", // legacy; guarded by contains() in code — harmless if absent + "native_transport_address", // same + "native_transport_port", // same + "native_transport_port_ssl"); // same + + private volatile Set localColumns = null; + private volatile Set peersColumns = null; + private volatile Set peersV2Columns = null; + + /** + * Returns the query string to use for {@code system.local}: a projected {@code SELECT} if the + * cache is warm, otherwise {@code SELECT * FROM system.local WHERE key='local'}. + */ + String localQuery() { + return localColumns == null + ? "SELECT * FROM system.local WHERE key='local'" + : buildProjectedQuery("system.local", localColumns, "key='local'"); + } + + /** + * Returns the query string to use for a full {@code system.peers} scan: a projected {@code + * SELECT} if the cache is warm, otherwise {@code SELECT * FROM system.peers}. + */ + String peersQuery() { + return peersColumns == null + ? "SELECT * FROM system.peers" + : buildProjectedQuery("system.peers", peersColumns, null); + } + + /** + * Returns the query string to use for a full {@code system.peers_v2} scan: a projected {@code + * SELECT} if the cache is warm, otherwise {@code SELECT * FROM system.peers_v2}. + */ + String peersV2Query() { + return peersV2Columns == null + ? "SELECT * FROM system.peers_v2" + : buildProjectedQuery("system.peers_v2", peersV2Columns, null); + } + + /** + * Populates the {@code system.local} column cache from the given result set if not already + * populated. + */ + void populateLocal(ResultSet rs) { + if (localColumns == null) { + localColumns = intersectWithNeeded(rs, LOCAL_COLUMNS_OF_INTEREST); + } + } + + /** + * Populates the {@code system.peers} column cache from the given result set if not already + * populated. + */ + void populatePeers(ResultSet rs) { + if (peersColumns == null) { + peersColumns = intersectWithNeeded(rs, PEERS_COLUMNS_OF_INTEREST); + } + } + + /** + * Populates the {@code system.peers_v2} column cache from the given result set if not already + * populated. + */ + void populatePeersV2(ResultSet rs) { + if (peersV2Columns == null) { + peersV2Columns = intersectWithNeeded(rs, PEERS_V2_COLUMNS_OF_INTEREST); + } + } + + /** + * Resets the {@code system.peers} column cache only. Used when downgrading from {@code + * system.peers_v2} to {@code system.peers} so that the first peers query re-discovers columns. + */ + void resetPeers() { + peersColumns = null; + } + + /** + * Resets all column caches so that the next query to each system table sends {@code SELECT *} and + * re-discovers available columns. Called on reconnection and on schema errors. + */ + void reset() { + localColumns = null; + peersColumns = null; + peersV2Columns = null; + } + + /** + * Returns the intersection of the columns returned by the server (from {@code rs}) with the given + * {@code needed} set, or {@code null} if the intersection is empty. The result is used to cache + * projected column lists so subsequent queries fetch only what the driver actually reads. A + * {@code null} return keeps the cache in the "uninitialized" sentinel state, ensuring the driver + * continues issuing {@code SELECT *} rather than generating an invalid empty-column projection. + */ + @VisibleForTesting + static Set intersectWithNeeded(ResultSet rs, ImmutableSet needed) { + ImmutableSet.Builder result = ImmutableSet.builder(); + for (ColumnDefinitions.Definition def : rs.getColumnDefinitions()) { + if (needed.contains(def.getName())) { + result.add(def.getName()); + } + } + ImmutableSet built = result.build(); + return built.isEmpty() ? null : built; + } + + /** + * Builds a {@code SELECT col1, col2, ... FROM table [WHERE whereClause]} query string from the + * given projected column set. {@code whereClause} may be {@code null} for table-wide scans. + */ + @VisibleForTesting + static String buildProjectedQuery(String table, Set columns, String whereClause) { + String query = "SELECT " + String.join(", ", columns) + " FROM " + table; + return whereClause != null ? query + " WHERE " + whereClause : query; + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java index 3e2ad95357f..ca4d8af4f93 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java @@ -22,13 +22,12 @@ import com.google.common.collect.ImmutableSet; import java.lang.reflect.Field; import java.lang.reflect.Modifier; -import java.util.Arrays; import java.util.Set; import org.testng.annotations.Test; /** - * Pure unit tests for the column-projection helpers and caching fields added to {@link - * ControlConnection} by DRIVER-368. + * Pure unit tests for the column-projection helpers and caching fields in {@link + * SystemColumnProjection} (DRIVER-368). * *

These tests do not require a running Cassandra/Scylla node. For integration-level tests see * {@link ControlConnectionTest}. @@ -41,7 +40,7 @@ public class ControlConnectionUnitTest { @Test(groups = "unit") public void testLocalColumnsOfInterestContainsExpectedColumns() { - ImmutableSet cols = ControlConnection.LOCAL_COLUMNS_OF_INTEREST; + ImmutableSet cols = SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST; assertThat(cols) .contains( "cluster_name", @@ -60,12 +59,12 @@ public void testLocalColumnsOfInterestContainsExpectedColumns() { @Test(groups = "unit") public void testLocalColumnsOfInterestSize() { // 21 columns as documented in the constant declaration - assertThat(ControlConnection.LOCAL_COLUMNS_OF_INTEREST).hasSize(21); + assertThat(SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST).hasSize(21); } @Test(groups = "unit") public void testPeersColumnsOfInterestContainsExpectedColumns() { - ImmutableSet cols = ControlConnection.PEERS_COLUMNS_OF_INTEREST; + ImmutableSet cols = SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST; assertThat(cols) .contains( "peer", @@ -84,12 +83,12 @@ public void testPeersColumnsOfInterestContainsExpectedColumns() { @Test(groups = "unit") public void testPeersColumnsOfInterestSize() { // 19 columns: original 16 + peer_port, native_address, native_port - assertThat(ControlConnection.PEERS_COLUMNS_OF_INTEREST).hasSize(19); + assertThat(SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST).hasSize(19); } @Test(groups = "unit") public void testPeersV2ColumnsOfInterestContainsExpectedColumns() { - ImmutableSet cols = ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST; + ImmutableSet cols = SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST; assertThat(cols) .contains( "peer", @@ -110,7 +109,7 @@ public void testPeersV2ColumnsOfInterestContainsExpectedColumns() { @Test(groups = "unit") public void testPeersV2ColumnsOfInterestSize() { // 19 columns: original 15 + rpc_address, native_transport_address/port/port_ssl - assertThat(ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST).hasSize(19); + assertThat(SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST).hasSize(19); } @Test(groups = "unit") @@ -118,7 +117,7 @@ public void testPeersV2ContainsLegacyColumns() { // rpc_address, native_transport_address/port/port_ssl are legacy columns the driver reads // with contains() guards. They are included so they are not silently dropped if a server // exposes them in peers_v2. - assertThat(ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST) + assertThat(SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST) .contains( "rpc_address", "native_transport_address", @@ -147,15 +146,15 @@ private static ResultSet mockResultSetWithColumns(String... columnNames) { @Test(groups = "unit") public void testIntersectWithNeededReturnsSupersetIntersection() { // RS has all LOCAL columns plus some extras; result should be exactly LOCAL_COLUMNS_OF_INTEREST - ImmutableSet needed = ControlConnection.LOCAL_COLUMNS_OF_INTEREST; + ImmutableSet needed = SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST; String[] base = needed.asList().toArray(new String[0]); // Append two extra columns not in the interest set - String[] extended = Arrays.copyOf(base, base.length + 2); + String[] extended = java.util.Arrays.copyOf(base, base.length + 2); extended[base.length] = "extra_col_1"; extended[base.length + 1] = "extra_col_2"; ResultSet rs = mockResultSetWithColumns(extended); - Set result = ControlConnection.intersectWithNeeded(rs, needed); + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); assertThat(result).isEqualTo(needed); assertThat(result).doesNotContain("extra_col_1", "extra_col_2"); @@ -168,7 +167,7 @@ public void testIntersectWithNeededHandlesSubset() { ImmutableSet.of("cluster_name", "tokens", "host_id", "schema_version"); ResultSet rs = mockResultSetWithColumns("cluster_name", "tokens"); - Set result = ControlConnection.intersectWithNeeded(rs, needed); + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); assertThat(result).containsOnly("cluster_name", "tokens"); assertThat(result).hasSize(2); @@ -181,7 +180,7 @@ public void testIntersectWithNeededNoOverlapReturnsNull() { ImmutableSet needed = ImmutableSet.of("cluster_name", "tokens"); ResultSet rs = mockResultSetWithColumns("some_other_col", "another_col"); - Set result = ControlConnection.intersectWithNeeded(rs, needed); + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); assertThat(result).isNull(); } @@ -189,10 +188,10 @@ public void testIntersectWithNeededNoOverlapReturnsNull() { @Test(groups = "unit") public void testIntersectWithNeededEmptyResultSetReturnsNull() { // An empty ResultSet has no column definitions, so the intersection is empty → null. - ImmutableSet needed = ControlConnection.LOCAL_COLUMNS_OF_INTEREST; + ImmutableSet needed = SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST; ResultSet rs = mockResultSetWithColumns(); - Set result = ControlConnection.intersectWithNeeded(rs, needed); + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); assertThat(result).isNull(); } @@ -204,7 +203,8 @@ public void testIntersectWithNeededEmptyResultSetReturnsNull() { @Test(groups = "unit") public void testBuildProjectedQueryWithWhereClause() { Set columns = ImmutableSet.of("cluster_name", "host_id"); - String query = ControlConnection.buildProjectedQuery("system.local", columns, "key='local'"); + String query = + SystemColumnProjection.buildProjectedQuery("system.local", columns, "key='local'"); assertThat(query).startsWith("SELECT "); assertThat(query).contains("cluster_name"); @@ -218,7 +218,7 @@ public void testBuildProjectedQueryWithWhereClause() { @Test(groups = "unit") public void testBuildProjectedQueryWithoutWhereClause() { Set columns = ImmutableSet.of("peer", "rpc_address", "tokens"); - String query = ControlConnection.buildProjectedQuery("system.peers", columns, null); + String query = SystemColumnProjection.buildProjectedQuery("system.peers", columns, null); assertThat(query).startsWith("SELECT "); assertThat(query).contains("peer"); @@ -231,7 +231,7 @@ public void testBuildProjectedQueryWithoutWhereClause() { @Test(groups = "unit") public void testBuildProjectedQuerySingleColumn() { Set columns = ImmutableSet.of("host_id"); - String query = ControlConnection.buildProjectedQuery("system.local", columns, null); + String query = SystemColumnProjection.buildProjectedQuery("system.local", columns, null); assertThat(query).isEqualTo("SELECT host_id FROM system.local"); } @@ -241,8 +241,8 @@ public void testBuildProjectedQueryAllColumnsPresent() { // Every column in the needed set must appear as an exact identifier in the projected SELECT // list. Use exact parsing to avoid false positives where one column name is a substring of // another (e.g. "native_port" inside "native_transport_port"). - Set columns = ControlConnection.PEERS_COLUMNS_OF_INTEREST; - String query = ControlConnection.buildProjectedQuery("system.peers", columns, null); + Set columns = SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST; + String query = SystemColumnProjection.buildProjectedQuery("system.peers", columns, null); Set selectedColumns = extractSelectedColumns(query); for (String col : columns) { @@ -276,7 +276,7 @@ private Set extractSelectedColumns(String query) { @Test(groups = "unit") public void testCacheFieldsAreVolatilePrivateInstanceSets() throws Exception { for (String fieldName : new String[] {"localColumns", "peersColumns", "peersV2Columns"}) { - Field field = ControlConnection.class.getDeclaredField(fieldName); + Field field = SystemColumnProjection.class.getDeclaredField(fieldName); int mods = field.getModifiers(); assertThat(Modifier.isVolatile(mods)).as(fieldName + " should be volatile").isTrue(); diff --git a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java index 6d50ddfd57b..1769e4d263c 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java @@ -471,7 +471,8 @@ private void primeMetadata(Scassandra node) { .build()); // Also prime the projected query that the driver sends after the cache is warm. ColumnMetadata[] projectedLocal = - projectedColumnMetadata(SELECT_LOCAL, ControlConnection.LOCAL_COLUMNS_OF_INTEREST); + projectedColumnMetadata( + SELECT_LOCAL, SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST); client.prime( PrimingRequest.queryBuilder() .withQuery(projectedQueryString(projectedLocal, "system.local", "key='local'")) @@ -496,7 +497,7 @@ private void primeMetadata(Scassandra node) { // Also prime the projected query that the driver sends after the cache is warm. ColumnMetadata[] projectedLocalV2 = projectedColumnMetadata( - SELECT_LOCAL_V2, ControlConnection.LOCAL_COLUMNS_OF_INTEREST); + SELECT_LOCAL_V2, SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST); client.prime( PrimingRequest.queryBuilder() .withQuery( @@ -598,7 +599,7 @@ private void primeMetadata(Scassandra node) { .build()); // Also prime the projected full-scan that the driver sends after the cache is warm. ColumnMetadata[] projectedPeersFullScan = - projectedColumnMetadata(SELECT_PEERS, ControlConnection.PEERS_COLUMNS_OF_INTEREST); + projectedColumnMetadata(SELECT_PEERS, SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST); client.prime( PrimingRequest.queryBuilder() .withQuery(projectedQueryString(projectedPeersFullScan, "system.peers", null)) @@ -620,7 +621,8 @@ private void primeMetadata(Scassandra node) { .build()); // Also prime the projected full-scan for peers_v2. ColumnMetadata[] projectedPeersV2FullScan = - projectedColumnMetadata(SELECT_PEERS_V2, ControlConnection.PEERS_V2_COLUMNS_OF_INTEREST); + projectedColumnMetadata( + SELECT_PEERS_V2, SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST); client.prime( PrimingRequest.queryBuilder() .withQuery(projectedQueryString(projectedPeersV2FullScan, "system.peers_v2", null)) @@ -859,7 +861,7 @@ public static void primeSystemLocalRow(Scassandra scassandra) { .withRows(Collections.>singletonList(row)))); // Also prime the projected query that the driver sends after the cache is warm. ColumnMetadata[] projectedLocal = - projectedColumnMetadata(SELECT_LOCAL, ControlConnection.LOCAL_COLUMNS_OF_INTEREST); + projectedColumnMetadata(SELECT_LOCAL, SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST); scassandra .primingClient() .prime( From 7894323afc871c42fe7eda5e7f19699f58abb8d6 Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Tue, 5 May 2026 20:13:40 +0200 Subject: [PATCH 3/4] fix: reset column caches on InvalidQueryException MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a projected query fails with InvalidQueryException (e.g. a column was dropped from a system table between connections), call projection.reset() before signalling a reconnect. This ensures the next connection starts with all caches null and re-discovers available columns via SELECT *, rather than re-sending the now-invalid projected query. Locations updated: - selectPeersFuture: system.peers_v2 onFailure (downgrade path) — changed resetPeers() to reset() so the stale peersV2Columns is also cleared - selectPeersFuture: system.peers onFailure — added reset() on InvalidQueryException before propagating the failure - refreshNodeListAndTokenMap: ExecutionException catch — added reset() when cause is InvalidQueryException - refreshNodeInfo: ExecutionException catch — same --- .../com/datastax/driver/core/SystemColumnProjection.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java b/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java index 151138fe9af..c296600e3fd 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java @@ -189,14 +189,6 @@ void populatePeersV2(ResultSet rs) { } } - /** - * Resets the {@code system.peers} column cache only. Used when downgrading from {@code - * system.peers_v2} to {@code system.peers} so that the first peers query re-discovers columns. - */ - void resetPeers() { - peersColumns = null; - } - /** * Resets all column caches so that the next query to each system table sends {@code SELECT *} and * re-discovers available columns. Called on reconnection and on schema errors. From b4cf4c8be352b824316ebd969ccd5fe177be99d5 Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Wed, 6 May 2026 19:17:16 +0200 Subject: [PATCH 4/4] refactor: simplify projection callbacks via hookLocal/hookPeers/hookPeersV2 Add hookLocal(), hookPeers(), and hookPeersV2() to SystemColumnProjection. Each method attaches a FutureCallback to the given DefaultResultSetFuture that populates the corresponding column cache on success and resets all caches on InvalidQueryException failure, then returns the future unchanged. In ControlConnection: - selectPeersFuture(): drop the SettableFuture wrapper in the system.peers branch; replace with a single projection.hookPeers(rawFuture) call. - fetchNodeInfo(): attach the appropriate hook before future.get() instead of explicitly calling populate*() inside the if (row != null) guard. The cache is populated as a side-effect of future completion regardless of whether rows are present, which is correct since ColumnDefinitions are returned by the server independent of row count. --- .../driver/core/ControlConnection.java | 34 ++++--------------- .../driver/core/SystemColumnProjection.java | 34 +++++++++++++++++++ 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index fc3c0481a1d..4ed0f194c16 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -539,13 +539,12 @@ private Row fetchNodeInfo(Host host, Connection c) c.write(future); ResultSet rs = future.get(); Row row = rs.one(); - // Populate the column cache on first successful WHERE lookup so that subsequent full-table - // scans via selectPeersFuture() can send projected queries. Only populate when a row is - // found: if the WHERE returned empty we are about to fall through to the full-scan path, - // which has its own cache-population logic; populating here on an empty result would cause - // selectPeersFuture() to send a projected query before it has verified the server supports - // the projected columns. if (row != null) { + // Populate the column cache only when we got a real row. WHERE-clause lookups may return + // zero rows (e.g. broadcast address changed), in which case the ColumnDefinitions still + // exist in the result metadata but there is nothing useful to learn — we must not warm + // the cache from an empty result, or subsequent full-table scans will send a projected + // query that the server may not recognise. if (isConnectedHost) { projection.populateLocal(rs); } else if (isPeersV2) { @@ -806,28 +805,7 @@ public void onFailure(Throwable t) { new DefaultResultSetFuture( null, cluster.protocolVersion(), new Requests.Query(projection.peersQuery())); connection.write(rawFuture); - final SettableFuture peersFuture = SettableFuture.create(); - Futures.addCallback( - rawFuture, - new FutureCallback() { - @Override - public void onSuccess(ResultSet result) { - projection.populatePeers(result); - peersFuture.set(result); - } - - @Override - public void onFailure(Throwable t) { - if (t instanceof InvalidQueryException) { - // The projected query referenced a column the server no longer exposes; reset - // caches so the next query re-discovers columns via SELECT *. - projection.reset(); - } - peersFuture.setException(t); - } - }, - MoreExecutors.directExecutor()); - return peersFuture; + return projection.hookPeers(rawFuture); } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java b/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java index c296600e3fd..9ab831e8452 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java @@ -21,8 +21,13 @@ */ package com.datastax.driver.core; +import com.datastax.driver.core.exceptions.InvalidQueryException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.Set; /** @@ -189,6 +194,35 @@ void populatePeersV2(ResultSet rs) { } } + /** + * Attaches a callback to {@code future} that populates the {@code system.peers} column cache on + * success and resets all caches on {@link InvalidQueryException} failure. Returns the future + * unchanged so callers can chain it directly. + * + *

Use only for full-table scans (i.e. {@code SELECT * FROM system.peers} with no {@code + * WHERE} clause). For single-row {@code WHERE peer='...'} lookups, the result set may have zero + * rows while still carrying valid {@code ColumnDefinitions}; the callback would fire and warm the + * cache from an empty result, which is incorrect. Use {@link #populatePeers} inside an {@code if + * (row != null)} guard for that path instead. + */ + ListenableFuture hookPeers(DefaultResultSetFuture future) { + Futures.addCallback( + future, + new FutureCallback() { + @Override + public void onSuccess(ResultSet result) { + populatePeers(result); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof InvalidQueryException) reset(); + } + }, + MoreExecutors.directExecutor()); + return future; + } + /** * Resets all column caches so that the next query to each system table sends {@code SELECT *} and * re-discovers available columns. Called on reconnection and on schema errors.