diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index be8c7c88068..4ed0f194c16 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -102,10 +102,22 @@ class ControlConnection implements Connection.Owner { // from here on out. private volatile boolean isPeersV2 = true; + private final SystemColumnProjection projection = new SystemColumnProjection(); + public ControlConnection(Cluster.Manager manager) { this.cluster = manager; } + /** + * Resets the projected-column caches so that the next query to each system table sends {@code + * SELECT *} and re-discovers available columns. Intended for use in tests that clear Scassandra + * primes between driver operations. + */ + @VisibleForTesting + void resetColumnCaches() { + projection.reset(); + } + // Only for the initial connection. Does not schedule retries if it fails void connect() throws UnsupportedProtocolVersionException { if (isShutdown) return; @@ -326,6 +338,12 @@ private Connection tryConnect(Host host, boolean isInitialConnection) ProtocolEvent.Type.SCHEMA_CHANGE); connection.write(new Requests.Register(evs)); + // Reset column caches so refreshNodeListAndTokenMap() uses SELECT * to rediscover + // which columns this server exposes, rather than a projected query built for a + // previous connection's server. The caches are populated during the queries below + // and remain warm for the lifetime of this connection. + projection.reset(); + // We need to refresh the node list first so we know about the cassandra version of // the node we're connecting to. // This will create the token map for the first time, but it will be incomplete @@ -453,6 +471,11 @@ void refreshNodeListAndTokenMap() { } catch (ExecutionException e) { // If we're being shutdown during refresh, this can happen. That's fine so don't scare the // user. + if (e.getCause() instanceof InvalidQueryException) { + // A projected query referenced a column the server no longer exposes; reset caches so + // the next connection re-discovers columns via SELECT *. + projection.reset(); + } if (!isShutdown) logger.error( "[Control connection] Unexpected error while refreshing node list and token map", e); @@ -491,26 +514,44 @@ private Row fetchNodeInfo(Host host, Connection c) if (isConnectedHost || host.getBroadcastSocketAddress() != null) { String query; if (isConnectedHost) { - query = SELECT_LOCAL; + query = projection.localQuery(); } else { InetSocketAddress broadcastAddress = host.getBroadcastSocketAddress(); - query = - isPeersV2 - ? SELECT_PEERS_V2 - + " WHERE peer='" - + broadcastAddress.getAddress().getHostAddress() - + "' AND peer_port=" - + broadcastAddress.getPort() - : SELECT_PEERS - + " WHERE peer='" - + broadcastAddress.getAddress().getHostAddress() - + "'"; + // Always use SELECT * for single-row WHERE lookups. Projected queries are only used for + // full-table scans via selectPeersFuture(), where the cache is guaranteed to be warm and + // every node has the projected full-scan prime registered. For WHERE lookups the control + // connection may query a node that was never restarted (and therefore still carries only + // the original SELECT * prime from init time), so projecting here risks a cache miss. + if (isPeersV2) { + String whereClause = + "peer='" + + broadcastAddress.getAddress().getHostAddress() + + "' AND peer_port=" + + broadcastAddress.getPort(); + query = SELECT_PEERS_V2 + " WHERE " + whereClause; + } else { + String whereClause = "peer='" + broadcastAddress.getAddress().getHostAddress() + "'"; + query = SELECT_PEERS + " WHERE " + whereClause; + } } DefaultResultSetFuture future = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(query)); c.write(future); - Row row = future.get().one(); + ResultSet rs = future.get(); + Row row = rs.one(); if (row != null) { + // Populate the column cache only when we got a real row. WHERE-clause lookups may return + // zero rows (e.g. broadcast address changed), in which case the ColumnDefinitions still + // exist in the result metadata but there is nothing useful to learn — we must not warm + // the cache from an empty result, or subsequent full-table scans will send a projected + // query that the server may not recognise. + if (isConnectedHost) { + projection.populateLocal(rs); + } else if (isPeersV2) { + projection.populatePeersV2(rs); + } else { + projection.populatePeers(rs); + } return row; } else { InetSocketAddress address = host.getBroadcastSocketAddress(); @@ -582,6 +623,11 @@ boolean refreshNodeInfo(Host host) { } catch (ExecutionException e) { // If we're being shutdown during refresh, this can happen. That's fine so don't scare the // user. + if (e.getCause() instanceof InvalidQueryException) { + // A projected query referenced a column the server no longer exposes; reset caches so + // the next connection re-discovers columns via SELECT *. + projection.reset(); + } if (!isShutdown) logger.debug("[Control connection] Unexpected error while refreshing node info", e); signalError(); @@ -719,7 +765,7 @@ private ListenableFuture selectPeersFuture(final Connection connectio if (isPeersV2) { DefaultResultSetFuture peersV2Future = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS_V2)); + null, cluster.protocolVersion(), new Requests.Query(projection.peersV2Query())); connection.write(peersV2Future); final SettableFuture peersFuture = SettableFuture.create(); // if peers v2 query fails, query peers table instead. @@ -729,6 +775,7 @@ private ListenableFuture selectPeersFuture(final Connection connectio @Override public void onSuccess(ResultSet result) { + projection.populatePeersV2(result); peersFuture.set(result); } @@ -742,6 +789,9 @@ public void onFailure(Throwable t) { || (t instanceof ServerError && t.getMessage().contains("Unknown keyspace/cf pair (system.peers_v2)"))) { isPeersV2 = false; + // Reset all caches: peersV2Columns is now stale, and peers cache should be cleared + // so the first system.peers query re-discovers columns via SELECT *. + projection.reset(); MoreFutures.propagateFuture(peersFuture, selectPeersFuture(connection)); } else { peersFuture.setException(t); @@ -751,11 +801,11 @@ public void onFailure(Throwable t) { MoreExecutors.directExecutor()); return peersFuture; } else { - DefaultResultSetFuture peersFuture = + DefaultResultSetFuture rawFuture = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS)); - connection.write(peersFuture); - return peersFuture; + null, cluster.protocolVersion(), new Requests.Query(projection.peersQuery())); + connection.write(rawFuture); + return projection.hookPeers(rawFuture); } } @@ -774,7 +824,7 @@ private void refreshNodeListAndTokenMap( DefaultResultSetFuture localFuture = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL)); + null, cluster.protocolVersion(), new Requests.Query(projection.localQuery())); ListenableFuture peersFuture = selectPeersFuture(connection); connection.write(localFuture); @@ -783,7 +833,9 @@ private void refreshNodeListAndTokenMap( Map> tokenMap = new HashMap>(); // Update cluster name, DC and rack for the one node we are connected to - Row localRow = localFuture.get().one(); + ResultSet localRs = localFuture.get(); + projection.populateLocal(localRs); + Row localRow = localRs.one(); if (localRow == null) { throw new IllegalStateException( String.format( diff --git a/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java b/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java new file mode 100644 index 00000000000..9ab831e8452 --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java @@ -0,0 +1,264 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2022 ScyllaDB + * + * Modified by ScyllaDB + */ +package com.datastax.driver.core; + +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Set; + +/** + * Encapsulates the column-projection state and logic for {@link ControlConnection}'s system table + * queries (DRIVER-368). + * + *

On the first query to each system table ({@code system.local}, {@code system.peers}, {@code + * system.peers_v2}) the driver sends {@code SELECT *} to discover which columns the server exposes. + * The result is intersected with the appropriate {@code *_COLUMNS_OF_INTEREST} set and cached here. + * Subsequent queries project only the cached columns, reducing bytes on the wire and + * deserialization work. + * + *

All cache fields are {@code volatile} because they are written from the control-connection I/O + * thread and read from other threads. + */ +class SystemColumnProjection { + + // IMPORTANT: Every column read from system.local rows — in updateInfo(), + // refreshNodeListAndTokenMap(), isValidPeer(), and DefaultEndPointFactory — MUST be listed here. + // If a new column read is added anywhere that consumes a system table row, add it to the + // appropriate set below, otherwise it will be silently excluded from projected queries. + @VisibleForTesting + static final ImmutableSet LOCAL_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "cluster_name", + "partitioner", + "data_center", + "rack", + "release_version", + "native_address", + "native_port", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "rpc_address", + "broadcast_address", + "broadcast_port", + "listen_address", + "listen_port", + "tokens", + "host_id", + "schema_version", + "workload", + "graph", + "dse_version"); + + // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. + // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), + // isValidPeer(), and DefaultEndPointFactory.create() from system.peers rows. + // Columns that are absent from the actual server schema are silently excluded by + // intersectWithNeeded(), so listing extra columns here is safe. + @VisibleForTesting + static final ImmutableSet PEERS_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "peer", + "peer_port", // peers_v2 column; harmless to list here — absent on peers, excluded safely + "rpc_address", + "data_center", + "rack", + "release_version", + "tokens", + "listen_address", + "listen_port", + "host_id", + "schema_version", + "native_address", // may appear on some server variants; guarded by contains() in code + "native_port", // same + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "workload", + "graph", + "dse_version"); + + // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. + // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), + // isValidPeer(), and DefaultEndPointFactory.create() from system.peers_v2 rows. + // Columns that are absent from the actual server schema are silently excluded by + // intersectWithNeeded(), so listing extra columns here is safe. + @VisibleForTesting + static final ImmutableSet PEERS_V2_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "peer", + "peer_port", + "native_address", + "native_port", + "data_center", + "rack", + "release_version", + "tokens", + "host_id", + "schema_version", + "workload", + "graph", + "dse_version", + "listen_address", + "listen_port", + "rpc_address", // legacy; guarded by contains() in code — harmless if absent + "native_transport_address", // same + "native_transport_port", // same + "native_transport_port_ssl"); // same + + private volatile Set localColumns = null; + private volatile Set peersColumns = null; + private volatile Set peersV2Columns = null; + + /** + * Returns the query string to use for {@code system.local}: a projected {@code SELECT} if the + * cache is warm, otherwise {@code SELECT * FROM system.local WHERE key='local'}. + */ + String localQuery() { + return localColumns == null + ? "SELECT * FROM system.local WHERE key='local'" + : buildProjectedQuery("system.local", localColumns, "key='local'"); + } + + /** + * Returns the query string to use for a full {@code system.peers} scan: a projected {@code + * SELECT} if the cache is warm, otherwise {@code SELECT * FROM system.peers}. + */ + String peersQuery() { + return peersColumns == null + ? "SELECT * FROM system.peers" + : buildProjectedQuery("system.peers", peersColumns, null); + } + + /** + * Returns the query string to use for a full {@code system.peers_v2} scan: a projected {@code + * SELECT} if the cache is warm, otherwise {@code SELECT * FROM system.peers_v2}. + */ + String peersV2Query() { + return peersV2Columns == null + ? "SELECT * FROM system.peers_v2" + : buildProjectedQuery("system.peers_v2", peersV2Columns, null); + } + + /** + * Populates the {@code system.local} column cache from the given result set if not already + * populated. + */ + void populateLocal(ResultSet rs) { + if (localColumns == null) { + localColumns = intersectWithNeeded(rs, LOCAL_COLUMNS_OF_INTEREST); + } + } + + /** + * Populates the {@code system.peers} column cache from the given result set if not already + * populated. + */ + void populatePeers(ResultSet rs) { + if (peersColumns == null) { + peersColumns = intersectWithNeeded(rs, PEERS_COLUMNS_OF_INTEREST); + } + } + + /** + * Populates the {@code system.peers_v2} column cache from the given result set if not already + * populated. + */ + void populatePeersV2(ResultSet rs) { + if (peersV2Columns == null) { + peersV2Columns = intersectWithNeeded(rs, PEERS_V2_COLUMNS_OF_INTEREST); + } + } + + /** + * Attaches a callback to {@code future} that populates the {@code system.peers} column cache on + * success and resets all caches on {@link InvalidQueryException} failure. Returns the future + * unchanged so callers can chain it directly. + * + *

Use only for full-table scans (i.e. {@code SELECT * FROM system.peers} with no {@code + * WHERE} clause). For single-row {@code WHERE peer='...'} lookups, the result set may have zero + * rows while still carrying valid {@code ColumnDefinitions}; the callback would fire and warm the + * cache from an empty result, which is incorrect. Use {@link #populatePeers} inside an {@code if + * (row != null)} guard for that path instead. + */ + ListenableFuture hookPeers(DefaultResultSetFuture future) { + Futures.addCallback( + future, + new FutureCallback() { + @Override + public void onSuccess(ResultSet result) { + populatePeers(result); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof InvalidQueryException) reset(); + } + }, + MoreExecutors.directExecutor()); + return future; + } + + /** + * Resets all column caches so that the next query to each system table sends {@code SELECT *} and + * re-discovers available columns. Called on reconnection and on schema errors. + */ + void reset() { + localColumns = null; + peersColumns = null; + peersV2Columns = null; + } + + /** + * Returns the intersection of the columns returned by the server (from {@code rs}) with the given + * {@code needed} set, or {@code null} if the intersection is empty. The result is used to cache + * projected column lists so subsequent queries fetch only what the driver actually reads. A + * {@code null} return keeps the cache in the "uninitialized" sentinel state, ensuring the driver + * continues issuing {@code SELECT *} rather than generating an invalid empty-column projection. + */ + @VisibleForTesting + static Set intersectWithNeeded(ResultSet rs, ImmutableSet needed) { + ImmutableSet.Builder result = ImmutableSet.builder(); + for (ColumnDefinitions.Definition def : rs.getColumnDefinitions()) { + if (needed.contains(def.getName())) { + result.add(def.getName()); + } + } + ImmutableSet built = result.build(); + return built.isEmpty() ? null : built; + } + + /** + * Builds a {@code SELECT col1, col2, ... FROM table [WHERE whereClause]} query string from the + * given projected column set. {@code whereClause} may be {@code null} for table-wide scans. + */ + @VisibleForTesting + static String buildProjectedQuery(String table, Set columns, String whereClause) { + String query = "SELECT " + String.join(", ", columns) + " FROM " + table; + return whereClause != null ? query + " WHERE " + whereClause : query; + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java index 2670c2022de..f2d48352676 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java @@ -456,6 +456,9 @@ public void should_fetch_whole_peers_table_if_broadcast_address_changed() .build(); scassandras.node(1).primingClient().clearAllPrimes(); + // Reset the column caches so the driver re-discovers columns via SELECT * rather than + // sending projected queries against the now-cleared Scassandra primes. + cluster.manager.controlConnection.resetColumnCaches(); // the driver will attempt to locate host2 in system.peers by its old broadcast address, and // that will fail diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java new file mode 100644 index 00000000000..ca4d8af4f93 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java @@ -0,0 +1,290 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Set; +import org.testng.annotations.Test; + +/** + * Pure unit tests for the column-projection helpers and caching fields in {@link + * SystemColumnProjection} (DRIVER-368). + * + *

These tests do not require a running Cassandra/Scylla node. For integration-level tests see + * {@link ControlConnectionTest}. + */ +public class ControlConnectionUnitTest { + + // --------------------------------------------------------------------------- + // *_COLUMNS_OF_INTEREST constants + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testLocalColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "cluster_name", + "tokens", + "host_id", + "native_address", + "dse_version", + "rpc_address", + "schema_version", + "data_center", + "rack", + "release_version", + "partitioner"); + } + + @Test(groups = "unit") + public void testLocalColumnsOfInterestSize() { + // 21 columns as documented in the constant declaration + assertThat(SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST).hasSize(21); + } + + @Test(groups = "unit") + public void testPeersColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "peer", + "peer_port", + "rpc_address", + "tokens", + "native_address", + "native_port", + "native_transport_address", + "data_center", + "rack", + "host_id", + "dse_version"); + } + + @Test(groups = "unit") + public void testPeersColumnsOfInterestSize() { + // 19 columns: original 16 + peer_port, native_address, native_port + assertThat(SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST).hasSize(19); + } + + @Test(groups = "unit") + public void testPeersV2ColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "peer", + "peer_port", + "native_address", + "native_port", + "rpc_address", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "data_center", + "rack", + "tokens", + "host_id", + "dse_version"); + } + + @Test(groups = "unit") + public void testPeersV2ColumnsOfInterestSize() { + // 19 columns: original 15 + rpc_address, native_transport_address/port/port_ssl + assertThat(SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST).hasSize(19); + } + + @Test(groups = "unit") + public void testPeersV2ContainsLegacyColumns() { + // rpc_address, native_transport_address/port/port_ssl are legacy columns the driver reads + // with contains() guards. They are included so they are not silently dropped if a server + // exposes them in peers_v2. + assertThat(SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST) + .contains( + "rpc_address", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl"); + } + + // --------------------------------------------------------------------------- + // intersectWithNeeded + // --------------------------------------------------------------------------- + + /** Helper: build a mock ResultSet whose column definitions contain exactly the given names. */ + private static ResultSet mockResultSetWithColumns(String... columnNames) { + ColumnDefinitions.Definition[] defs = new ColumnDefinitions.Definition[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + defs[i] = + new ColumnDefinitions.Definition("system", "local", columnNames[i], DataType.text()); + } + ColumnDefinitions colDefs = new ColumnDefinitions(defs, CodecRegistry.DEFAULT_INSTANCE); + + ResultSet rs = mock(ResultSet.class); + when(rs.getColumnDefinitions()).thenReturn(colDefs); + return rs; + } + + @Test(groups = "unit") + public void testIntersectWithNeededReturnsSupersetIntersection() { + // RS has all LOCAL columns plus some extras; result should be exactly LOCAL_COLUMNS_OF_INTEREST + ImmutableSet needed = SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST; + String[] base = needed.asList().toArray(new String[0]); + // Append two extra columns not in the interest set + String[] extended = java.util.Arrays.copyOf(base, base.length + 2); + extended[base.length] = "extra_col_1"; + extended[base.length + 1] = "extra_col_2"; + + ResultSet rs = mockResultSetWithColumns(extended); + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); + + assertThat(result).isEqualTo(needed); + assertThat(result).doesNotContain("extra_col_1", "extra_col_2"); + } + + @Test(groups = "unit") + public void testIntersectWithNeededHandlesSubset() { + // RS only exposes a subset of the needed columns + ImmutableSet needed = + ImmutableSet.of("cluster_name", "tokens", "host_id", "schema_version"); + ResultSet rs = mockResultSetWithColumns("cluster_name", "tokens"); + + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); + + assertThat(result).containsOnly("cluster_name", "tokens"); + assertThat(result).hasSize(2); + } + + @Test(groups = "unit") + public void testIntersectWithNeededNoOverlapReturnsNull() { + // When no server columns match the needed set, the result should be null so the cache remains + // in the uninitialized sentinel state (avoids generating an empty-column SELECT projection). + ImmutableSet needed = ImmutableSet.of("cluster_name", "tokens"); + ResultSet rs = mockResultSetWithColumns("some_other_col", "another_col"); + + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); + + assertThat(result).isNull(); + } + + @Test(groups = "unit") + public void testIntersectWithNeededEmptyResultSetReturnsNull() { + // An empty ResultSet has no column definitions, so the intersection is empty → null. + ImmutableSet needed = SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST; + ResultSet rs = mockResultSetWithColumns(); + + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); + + assertThat(result).isNull(); + } + + // --------------------------------------------------------------------------- + // buildProjectedQuery + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testBuildProjectedQueryWithWhereClause() { + Set columns = ImmutableSet.of("cluster_name", "host_id"); + String query = + SystemColumnProjection.buildProjectedQuery("system.local", columns, "key='local'"); + + assertThat(query).startsWith("SELECT "); + assertThat(query).contains("cluster_name"); + assertThat(query).contains("host_id"); + assertThat(query).contains(" FROM system.local"); + assertThat(query).contains(" WHERE key='local'"); + // Should not contain SELECT * + assertThat(query).doesNotContain("*"); + } + + @Test(groups = "unit") + public void testBuildProjectedQueryWithoutWhereClause() { + Set columns = ImmutableSet.of("peer", "rpc_address", "tokens"); + String query = SystemColumnProjection.buildProjectedQuery("system.peers", columns, null); + + assertThat(query).startsWith("SELECT "); + assertThat(query).contains("peer"); + assertThat(query).contains("rpc_address"); + assertThat(query).contains("tokens"); + assertThat(query).contains(" FROM system.peers"); + assertThat(query).doesNotContain("WHERE"); + } + + @Test(groups = "unit") + public void testBuildProjectedQuerySingleColumn() { + Set columns = ImmutableSet.of("host_id"); + String query = SystemColumnProjection.buildProjectedQuery("system.local", columns, null); + + assertThat(query).isEqualTo("SELECT host_id FROM system.local"); + } + + @Test(groups = "unit") + public void testBuildProjectedQueryAllColumnsPresent() { + // Every column in the needed set must appear as an exact identifier in the projected SELECT + // list. Use exact parsing to avoid false positives where one column name is a substring of + // another (e.g. "native_port" inside "native_transport_port"). + Set columns = SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST; + String query = SystemColumnProjection.buildProjectedQuery("system.peers", columns, null); + Set selectedColumns = extractSelectedColumns(query); + + for (String col : columns) { + assertThat(selectedColumns).as("query should project column: " + col).contains(col); + } + assertThat(query).contains(" FROM system.peers"); + assertThat(query).doesNotContain("WHERE"); + } + + /** + * Parses the column identifiers from the {@code SELECT col1, col2, ... FROM ...} portion of a + * projected query string and returns them as a set of trimmed names. + */ + private Set extractSelectedColumns(String query) { + int selectStart = query.indexOf("SELECT "); + int fromStart = query.indexOf(" FROM "); + assertThat(selectStart).as("query should start with SELECT").isEqualTo(0); + assertThat(fromStart).as("query should contain FROM").isGreaterThan(selectStart); + String columnList = query.substring("SELECT ".length(), fromStart); + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (String col : columnList.split(",")) { + builder.add(col.trim()); + } + return builder.build(); + } + + // --------------------------------------------------------------------------- + // Cache fields: declared as volatile, private, instance-level Set + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testCacheFieldsAreVolatilePrivateInstanceSets() throws Exception { + for (String fieldName : new String[] {"localColumns", "peersColumns", "peersV2Columns"}) { + Field field = SystemColumnProjection.class.getDeclaredField(fieldName); + int mods = field.getModifiers(); + + assertThat(Modifier.isVolatile(mods)).as(fieldName + " should be volatile").isTrue(); + assertThat(Modifier.isPrivate(mods)).as(fieldName + " should be private").isTrue(); + assertThat(Modifier.isStatic(mods)).as(fieldName + " must be an instance field").isFalse(); + assertThat(Set.class.isAssignableFrom(field.getType())) + .as(fieldName + " declared type should be Set") + .isTrue(); + } + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java index 83078670d7e..1769e4d263c 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java @@ -82,6 +82,13 @@ public class ScassandraCluster { private final boolean peersV2; + /** + * One stable UUID per node (keyed by 1-based nodeCount), computed once at construction so that + * system.local and system.peers rows for the same node always carry the same host_id regardless + * of which Scassandra process is being primed or how many times primeMetadata() is called. + */ + private final Map hostIdByNodeCount; + ScassandraCluster( Integer[] nodes, String ipPrefix, @@ -148,6 +155,17 @@ public class ScassandraCluster { instances = instanceListBuilder.build(); dcNodeMap = dcNodeMapBuilder.build(); + // Compute stable host_id UUIDs once so every primeMetadata() call uses the same values. + Map hostIds = new HashMap<>(); + int tempCount = 1; + for (Integer dc : new TreeSet(dcNodeMap.keySet())) { + for (int n = 0; n < dcNodeMap.get(dc).size(); n++) { + hostIds.put(tempCount, UUIDs.random()); + tempCount++; + } + } + this.hostIdByNodeCount = hostIds; + // Prime correct keyspace table based on C* version. String[] versionArray = this.cassandraVersion.split("\\.|-"); double major = Double.parseDouble(versionArray[0] + "." + versionArray[1]); @@ -357,6 +375,11 @@ public void start(Cluster cluster, int node) { logger.debug("Starting node {}.", node); Scassandra scassandra = node(node); scassandra.start(); + // Re-prime after restart: Scassandra loses all primes when its process restarts. + // Without re-priming, the driver may query an unprimed node (e.g. if the control + // connection temporarily reconnects to this host), get empty system table responses, + // and fail to bring the node back up within the allowed window. + primeMetadata(scassandra); assertThat(cluster).host(node).comesUpWithin(10, TimeUnit.SECONDS); } @@ -386,6 +409,7 @@ public List getTokensForDC(int dc) { private void primeMetadata(Scassandra node) { PrimingClient client = node.primingClient(); + int nodeCount = 1; ImmutableList.Builder> rows = ImmutableList.builder(); @@ -396,6 +420,7 @@ private void primeMetadata(Scassandra node) { for (int n = 0; n < nodesInDc.size(); n++) { InetSocketAddress binaryAddress = address(nodeCount); InetSocketAddress listenAddress = listenAddress(nodeCount); + java.util.UUID hostId = hostIdByNodeCount.get(nodeCount); nodeCount++; Scassandra peer = nodesInDc.get(n); if (node == peer) { // prime system.local. @@ -423,7 +448,7 @@ private void primeMetadata(Scassandra node) { "release_version", getPeerInfo(dc, n + 1, "release_version", cassandraVersion)); addPeerInfo(row, dc, n + 1, "tokens", ImmutableSet.of(tokens.get(n))); - addPeerInfo(row, dc, n + 1, "host_id", UUIDs.random()); + addPeerInfo(row, dc, n + 1, "host_id", hostId); addPeerInfo(row, dc, n + 1, "schema_version", schemaVersion); addPeerInfo(row, dc, n + 1, "graph", false); @@ -444,6 +469,19 @@ private void primeMetadata(Scassandra node) { .withRows(Collections.>singletonList(row)) .build()) .build()); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocal = + projectedColumnMetadata( + SELECT_LOCAL, SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedLocal, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocal) + .withRows(Collections.>singletonList(row)) + .build()) + .build()); } else { addPeerInfo(row, dc, n + 1, "broadcast_port", listenAddress.getPort()); addPeerInfo(row, dc, n + 1, "listen_port", listenAddress.getPort()); @@ -456,6 +494,20 @@ private void primeMetadata(Scassandra node) { .withRows(Collections.>singletonList(row)) .build()) .build()); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocalV2 = + projectedColumnMetadata( + SELECT_LOCAL_V2, SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery( + projectedQueryString(projectedLocalV2, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocalV2) + .withRows(Collections.>singletonList(row)) + .build()) + .build()); } } else { // prime system.peers. Map row = Maps.newHashMap(); @@ -489,7 +541,6 @@ private void primeMetadata(Scassandra node) { addPeerInfo(row, dc, n + 1, "tokens", ImmutableSet.of(Long.toString(tokens.get(n)))); addPeerInfo(rowV2, dc, n + 1, "tokens", ImmutableSet.of(Long.toString(tokens.get(n)))); - java.util.UUID hostId = UUIDs.random(); addPeerInfo(row, dc, n + 1, "host_id", hostId); addPeerInfo(rowV2, dc, n + 1, "host_id", hostId); @@ -546,6 +597,14 @@ private void primeMetadata(Scassandra node) { .withQuery("SELECT * FROM system.peers") .withThen(then().withColumnTypes(SELECT_PEERS).withRows(rows.build()).build()) .build()); + // Also prime the projected full-scan that the driver sends after the cache is warm. + ColumnMetadata[] projectedPeersFullScan = + projectedColumnMetadata(SELECT_PEERS, SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedPeersFullScan, "system.peers", null)) + .withThen(then().withColumnTypes(projectedPeersFullScan).withRows(rows.build()).build()) + .build()); // return invalid error for peers_v2, indicating the table doesn't exist. if (!peersV2) { @@ -560,6 +619,16 @@ private void primeMetadata(Scassandra node) { .withQuery("SELECT * FROM system.peers_v2") .withThen(then().withColumnTypes(SELECT_PEERS_V2).withRows(rowsV2.build()).build()) .build()); + // Also prime the projected full-scan for peers_v2. + ColumnMetadata[] projectedPeersV2FullScan = + projectedColumnMetadata( + SELECT_PEERS_V2, SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedPeersV2FullScan, "system.peers_v2", null)) + .withThen( + then().withColumnTypes(projectedPeersV2FullScan).withRows(rowsV2.build()).build()) + .build()); } // Needed to ensure cluster_name matches what we expect on connection. @@ -751,6 +820,29 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu column("validator", TEXT), }; + /** Returns the subset of {@code full} whose names are in {@code interest}, preserving order. */ + private static ColumnMetadata[] projectedColumnMetadata( + ColumnMetadata[] full, Set interest) { + List result = new ArrayList<>(); + for (ColumnMetadata col : full) { + if (interest.contains(col.getName())) result.add(col); + } + return result.toArray(new ColumnMetadata[0]); + } + + /** Builds a projected SELECT query string from a ColumnMetadata array. */ + private static String projectedQueryString( + ColumnMetadata[] cols, String table, String whereClause) { + StringBuilder sb = new StringBuilder("SELECT "); + for (int i = 0; i < cols.length; i++) { + if (i > 0) sb.append(", "); + sb.append(cols[i].getName()); + } + sb.append(" FROM ").append(table); + if (whereClause != null) sb.append(" WHERE ").append(whereClause); + return sb.toString(); + } + // Primes a minimal system.local row on an Scassandra node. // We need a host_id so that the driver can store it in Metadata.hosts public static void primeSystemLocalRow(Scassandra scassandra) { @@ -767,6 +859,18 @@ public static void primeSystemLocalRow(Scassandra scassandra) { .withColumnTypes( localMetadata.toArray(new ColumnMetadata[localMetadata.size()])) .withRows(Collections.>singletonList(row)))); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocal = + projectedColumnMetadata(SELECT_LOCAL, SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST); + scassandra + .primingClient() + .prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedLocal, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocal) + .withRows(Collections.>singletonList(row)))); } public static ScassandraClusterBuilder builder() {