Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,22 @@ class ControlConnection implements Connection.Owner {
// from here on out.
private volatile boolean isPeersV2 = true;

private final SystemColumnProjection projection = new SystemColumnProjection();

public ControlConnection(Cluster.Manager manager) {
this.cluster = manager;
}

/**
* Resets the projected-column caches so that the next query to each system table sends {@code
* SELECT *} and re-discovers available columns. Intended for use in tests that clear Scassandra
* primes between driver operations.
*/
@VisibleForTesting
void resetColumnCaches() {
Comment thread
dkropachev marked this conversation as resolved.
projection.reset();
}

// Only for the initial connection. Does not schedule retries if it fails
void connect() throws UnsupportedProtocolVersionException {
if (isShutdown) return;
Expand Down Expand Up @@ -326,6 +338,12 @@ private Connection tryConnect(Host host, boolean isInitialConnection)
ProtocolEvent.Type.SCHEMA_CHANGE);
connection.write(new Requests.Register(evs));

// Reset column caches so refreshNodeListAndTokenMap() uses SELECT * to rediscover
// which columns this server exposes, rather than a projected query built for a
// previous connection's server. The caches are populated during the queries below
// and remain warm for the lifetime of this connection.
projection.reset();

// We need to refresh the node list first so we know about the cassandra version of
// the node we're connecting to.
// This will create the token map for the first time, but it will be incomplete
Expand Down Expand Up @@ -453,6 +471,11 @@ void refreshNodeListAndTokenMap() {
} catch (ExecutionException e) {
// If we're being shutdown during refresh, this can happen. That's fine so don't scare the
// user.
if (e.getCause() instanceof InvalidQueryException) {
// A projected query referenced a column the server no longer exposes; reset caches so
// the next connection re-discovers columns via SELECT *.
projection.reset();
}
if (!isShutdown)
logger.error(
"[Control connection] Unexpected error while refreshing node list and token map", e);
Expand Down Expand Up @@ -491,26 +514,44 @@ private Row fetchNodeInfo(Host host, Connection c)
if (isConnectedHost || host.getBroadcastSocketAddress() != null) {
String query;
if (isConnectedHost) {
query = SELECT_LOCAL;
query = projection.localQuery();
} else {
InetSocketAddress broadcastAddress = host.getBroadcastSocketAddress();
query =
isPeersV2
? SELECT_PEERS_V2
+ " WHERE peer='"
+ broadcastAddress.getAddress().getHostAddress()
+ "' AND peer_port="
+ broadcastAddress.getPort()
: SELECT_PEERS
+ " WHERE peer='"
+ broadcastAddress.getAddress().getHostAddress()
+ "'";
// Always use SELECT * for single-row WHERE lookups. Projected queries are only used for
// full-table scans via selectPeersFuture(), where the cache is guaranteed to be warm and
// every node has the projected full-scan prime registered. For WHERE lookups the control
// connection may query a node that was never restarted (and therefore still carries only
// the original SELECT * prime from init time), so projecting here risks a cache miss.
if (isPeersV2) {
String whereClause =
"peer='"
+ broadcastAddress.getAddress().getHostAddress()
+ "' AND peer_port="
+ broadcastAddress.getPort();
query = SELECT_PEERS_V2 + " WHERE " + whereClause;
} else {
String whereClause = "peer='" + broadcastAddress.getAddress().getHostAddress() + "'";
query = SELECT_PEERS + " WHERE " + whereClause;
}
}
DefaultResultSetFuture future =
new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(query));
c.write(future);
Row row = future.get().one();
ResultSet rs = future.get();
Row row = rs.one();
if (row != null) {
// Populate the column cache only when we got a real row. WHERE-clause lookups may return
// zero rows (e.g. broadcast address changed), in which case the ColumnDefinitions still
// exist in the result metadata but there is nothing useful to learn — we must not warm
// the cache from an empty result, or subsequent full-table scans will send a projected
// query that the server may not recognise.
if (isConnectedHost) {
projection.populateLocal(rs);
} else if (isPeersV2) {
projection.populatePeersV2(rs);
} else {
projection.populatePeers(rs);
}
return row;
} else {
InetSocketAddress address = host.getBroadcastSocketAddress();
Expand Down Expand Up @@ -582,6 +623,11 @@ boolean refreshNodeInfo(Host host) {
} catch (ExecutionException e) {
// If we're being shutdown during refresh, this can happen. That's fine so don't scare the
// user.
if (e.getCause() instanceof InvalidQueryException) {
// A projected query referenced a column the server no longer exposes; reset caches so
// the next connection re-discovers columns via SELECT *.
projection.reset();
}
if (!isShutdown)
logger.debug("[Control connection] Unexpected error while refreshing node info", e);
signalError();
Expand Down Expand Up @@ -719,7 +765,7 @@ private ListenableFuture<ResultSet> selectPeersFuture(final Connection connectio
if (isPeersV2) {
DefaultResultSetFuture peersV2Future =
new DefaultResultSetFuture(
null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS_V2));
null, cluster.protocolVersion(), new Requests.Query(projection.peersV2Query()));
connection.write(peersV2Future);
final SettableFuture<ResultSet> peersFuture = SettableFuture.create();
// if peers v2 query fails, query peers table instead.
Expand All @@ -729,6 +775,7 @@ private ListenableFuture<ResultSet> selectPeersFuture(final Connection connectio

@Override
public void onSuccess(ResultSet result) {
projection.populatePeersV2(result);
peersFuture.set(result);
}

Expand All @@ -742,6 +789,9 @@ public void onFailure(Throwable t) {
|| (t instanceof ServerError
&& t.getMessage().contains("Unknown keyspace/cf pair (system.peers_v2)"))) {
isPeersV2 = false;
// Reset all caches: peersV2Columns is now stale, and peers cache should be cleared
// so the first system.peers query re-discovers columns via SELECT *.
projection.reset();
MoreFutures.propagateFuture(peersFuture, selectPeersFuture(connection));
} else {
peersFuture.setException(t);
Expand All @@ -751,11 +801,11 @@ public void onFailure(Throwable t) {
MoreExecutors.directExecutor());
return peersFuture;
} else {
DefaultResultSetFuture peersFuture =
DefaultResultSetFuture rawFuture =
new DefaultResultSetFuture(
null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS));
connection.write(peersFuture);
return peersFuture;
null, cluster.protocolVersion(), new Requests.Query(projection.peersQuery()));
connection.write(rawFuture);
return projection.hookPeers(rawFuture);
}
}

Expand All @@ -774,7 +824,7 @@ private void refreshNodeListAndTokenMap(

DefaultResultSetFuture localFuture =
new DefaultResultSetFuture(
null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL));
null, cluster.protocolVersion(), new Requests.Query(projection.localQuery()));
ListenableFuture<ResultSet> peersFuture = selectPeersFuture(connection);
connection.write(localFuture);

Expand All @@ -783,7 +833,9 @@ private void refreshNodeListAndTokenMap(
Map<Host, Set<Token>> tokenMap = new HashMap<Host, Set<Token>>();

// Update cluster name, DC and rack for the one node we are connected to
Row localRow = localFuture.get().one();
ResultSet localRs = localFuture.get();
projection.populateLocal(localRs);
Row localRow = localRs.one();
if (localRow == null) {
throw new IllegalStateException(
String.format(
Expand Down
Loading
Loading