From c5720fa0db5ce42e9ad6a3862345109f8a006abf Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Tue, 12 May 2026 22:39:27 -0400 Subject: [PATCH] Start consider SERIAL SELECT as LWT queries Driver did not consider SERIAL SELECT as LWT and therefore routed them as regular queries causing LWT congestion. Fix is to consider consistency when RequestRoutingMethod is calculated. --- .../driver/api/core/cql/SimpleStatement.java | 7 +- .../driver/api/core/cql/StatementBuilder.java | 11 +- .../driver/internal/core/cql/Conversions.java | 2 +- .../core/cql/DefaultBatchStatement.java | 10 +- .../core/cql/DefaultBoundStatement.java | 22 +- .../core/cql/DefaultPreparedStatement.java | 16 +- .../core/cql/DefaultSimpleStatement.java | 14 +- .../core/cql/RequestRoutingTypeAccessor.java | 27 ++ .../BasicLoadBalancingPolicy.java | 77 ++++- .../api/core/cql/StatementBuilderTest.java | 85 +++++ .../core/cql/DefaultBatchStatementTest.java | 12 + .../cql/DefaultPreparedStatementTest.java | 112 +++++++ ...asicLoadBalancingPolicyDcFailoverTest.java | 41 +++ ...LoadBalancingPolicyRequestRoutingTest.java | 300 ++++++++++++++++++ .../LWTLoadBalancingMultiDcIT.java | 66 ++++ .../metadata/DefaultMetadataTabletMapIT.java | 60 +++- 16 files changed, 829 insertions(+), 33 deletions(-) create mode 100644 core/src/main/java/com/datastax/oss/driver/internal/core/cql/RequestRoutingTypeAccessor.java create mode 100644 core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatementTest.java diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/cql/SimpleStatement.java b/core/src/main/java/com/datastax/oss/driver/api/core/cql/SimpleStatement.java index 20f17fa716e..2489254ed13 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/cql/SimpleStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/cql/SimpleStatement.java @@ -20,7 +20,6 @@ import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DefaultProtocolVersion; -import com.datastax.oss.driver.api.core.RequestRoutingType; import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.internal.core.cql.DefaultSimpleStatement; @@ -86,7 +85,7 @@ static SimpleStatement newInstance(@NonNull String cqlQuery) { null, null, Statement.NO_NOW_IN_SECONDS, - RequestRoutingType.REGULAR); + null); } /** @@ -121,7 +120,7 @@ static SimpleStatement newInstance( null, null, Statement.NO_NOW_IN_SECONDS, - RequestRoutingType.REGULAR); + null); } /** @@ -153,7 +152,7 @@ static SimpleStatement newInstance( null, null, Statement.NO_NOW_IN_SECONDS, - RequestRoutingType.REGULAR); + null); } /** diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/cql/StatementBuilder.java b/core/src/main/java/com/datastax/oss/driver/api/core/cql/StatementBuilder.java index 9894dd9c813..6f2dcf174ec 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/cql/StatementBuilder.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/cql/StatementBuilder.java @@ -23,6 +23,7 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.metadata.token.Token; +import com.datastax.oss.driver.internal.core.cql.RequestRoutingTypeAccessor; import com.datastax.oss.driver.internal.core.util.RoutingKey; import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap; import edu.umd.cs.findbugs.annotations.NonNull; @@ -89,7 +90,15 @@ protected StatementBuilder(StatementT template) { this.timeout = template.getTimeout(); this.node = template.getNode(); this.nowInSeconds = template.getNowInSeconds(); - this.requestRoutingType = template.getRequestRoutingType(); + this.requestRoutingType = getConfiguredRequestRoutingType(template); + } + + @Nullable + private RequestRoutingType getConfiguredRequestRoutingType(StatementT template) { + if (template instanceof RequestRoutingTypeAccessor) { + return ((RequestRoutingTypeAccessor) template).getConfiguredRequestRoutingType(); + } + return template.getRequestRoutingType(); } /** @see Statement#setExecutionProfileName(String) */ diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java index 0a864293b0d..dd11ee596e9 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java @@ -430,7 +430,7 @@ public static DefaultPreparedStatement toPreparedStatement( context.getProtocolVersion(), lwtInfo != null && lwtInfo.isLwt(response.variablesMetadata.flags) ? RequestRoutingType.LWT - : RequestRoutingType.REGULAR); + : null); } public static ColumnDefinitions toColumnDefinitions( diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java index cde8d91e4c9..af95829756f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; @Immutable -public class DefaultBatchStatement implements BatchStatement { +public class DefaultBatchStatement implements BatchStatement, RequestRoutingTypeAccessor { private static final Logger LOG = LoggerFactory.getLogger(DefaultBatchStatement.class); private final BatchType batchType; @@ -857,6 +857,8 @@ public BatchStatement setNowInSeconds(int newNowInSeconds) { public RequestRoutingType getRequestRoutingType() { if (Objects.nonNull(requestRoutingType)) { return requestRoutingType; + } else if (consistencyLevel != null && consistencyLevel.isSerial()) { + return RequestRoutingType.LWT; } else if (Objects.isNull( cachedStatementsRequestRoutingType)) { // Immutability of the statement list and statements // allows us to cache the result @@ -870,6 +872,12 @@ public RequestRoutingType getRequestRoutingType() { return cachedStatementsRequestRoutingType; } + @Nullable + @Override + public RequestRoutingType getConfiguredRequestRoutingType() { + return requestRoutingType; + } + @NonNull @Override public BatchStatement setRequestRoutingType(RequestRoutingType requestRoutingType) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java index 2c3ad902f39..509d7a90a76 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java @@ -44,11 +44,10 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Objects; import net.jcip.annotations.Immutable; @Immutable -public class DefaultBoundStatement implements BoundStatement { +public class DefaultBoundStatement implements BoundStatement, RequestRoutingTypeAccessor { private final PreparedStatement preparedStatement; private final ColumnDefinitions variableDefinitions; @@ -805,9 +804,22 @@ public BoundStatement setNowInSeconds(int newNowInSeconds) { @Nullable @Override public RequestRoutingType getRequestRoutingType() { - return Objects.nonNull(requestRoutingType) - ? requestRoutingType - : preparedStatement.getRequestRoutingType(); + if (requestRoutingType != null) { + return requestRoutingType; + } + if (consistencyLevel != null && consistencyLevel.isSerial()) { + return RequestRoutingType.LWT; + } + if (preparedStatement instanceof RequestRoutingTypeAccessor) { + return ((RequestRoutingTypeAccessor) preparedStatement).getConfiguredRequestRoutingType(); + } + return preparedStatement.getRequestRoutingType(); + } + + @Nullable + @Override + public RequestRoutingType getConfiguredRequestRoutingType() { + return requestRoutingType; } @NonNull diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java index 754a89ac228..652e3f50af7 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java @@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory; @ThreadSafe -public class DefaultPreparedStatement implements PreparedStatement { +public class DefaultPreparedStatement implements PreparedStatement, RequestRoutingTypeAccessor { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPreparedStatement.class); private static final Splitter SPACE_SPLITTER = Splitter.onPattern("\\s+"); private static final Splitter COMMA_SPLITTER = Splitter.onPattern(","); @@ -196,6 +196,20 @@ public boolean isLWT() { @Nullable @Override public RequestRoutingType getRequestRoutingType() { + if (requestRoutingType != null) { + return requestRoutingType; + } + + if (consistencyLevelForBoundStatements != null + && consistencyLevelForBoundStatements.isSerial()) { + return RequestRoutingType.LWT; + } + return null; + } + + @Nullable + @Override + public RequestRoutingType getConfiguredRequestRoutingType() { return requestRoutingType; } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultSimpleStatement.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultSimpleStatement.java index 0268689d86f..39e308987fd 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultSimpleStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultSimpleStatement.java @@ -42,7 +42,7 @@ import net.jcip.annotations.Immutable; @Immutable -public class DefaultSimpleStatement implements SimpleStatement { +public class DefaultSimpleStatement implements SimpleStatement, RequestRoutingTypeAccessor { private final String query; private final List positionalValues; @@ -776,6 +776,18 @@ public SimpleStatement setNowInSeconds(int newNowInSeconds) { @Nullable @Override public RequestRoutingType getRequestRoutingType() { + if (requestRoutingType != null) { + return requestRoutingType; + } + if (consistencyLevel != null && consistencyLevel.isSerial()) { + return RequestRoutingType.LWT; + } + return null; + } + + @Nullable + @Override + public RequestRoutingType getConfiguredRequestRoutingType() { return requestRoutingType; } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/RequestRoutingTypeAccessor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/RequestRoutingTypeAccessor.java new file mode 100644 index 00000000000..07e17e50da6 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/RequestRoutingTypeAccessor.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.cql; + +import com.datastax.oss.driver.api.core.RequestRoutingType; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** Internal hook to distinguish stored routing type from consistency-inferred routing type. */ +public interface RequestRoutingTypeAccessor { + @Nullable + RequestRoutingType getConfiguredRequestRoutingType(); +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index 2cd581237b1..f0e1cb3cf45 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -189,13 +189,37 @@ public RequestRoutingMethod getRequestRoutingMethod(@Nullable Request request) { if (request == null) { return RequestRoutingMethod.REGULAR; } - if (request.getRequestRoutingType() == RequestRoutingType.LWT) { + RequestRoutingType requestRoutingType = request.getRequestRoutingType(); + if (requestRoutingType == RequestRoutingType.LWT + || (requestRoutingType == null && hasSerialConsistency(request))) { return lwtRequestRoutingMethod; } else { return RequestRoutingMethod.REGULAR; } } + private boolean hasSerialConsistency(@NonNull Request request) { + if (!(request instanceof Statement)) { + return false; + } + + return getEffectiveConsistency((Statement) request).isSerial(); + } + + @NonNull + private Optional getRequestProfile(@NonNull Request request) { + DriverExecutionProfile requestProfile = request.getExecutionProfile(); + if (requestProfile != null) { + return Optional.of(requestProfile); + } + + String profileName = request.getExecutionProfileName(); + if (profileName != null && !profileName.isEmpty()) { + return Optional.of(context.getConfig().getProfile(profileName)); + } + return Optional.of(profile); + } + /** * Returns the local datacenter name, if known; empty otherwise. * @@ -362,11 +386,20 @@ protected Queue newQueryPlanPreserveReplicas( for (Object obj : getLiveNodes().dc(null).toArray()) { allNodes.add((Node) obj); } + replicas = filterNodesIn(replicas, new LinkedHashSet<>(allNodes)); queryPlan.addAll(replicas); addRotatedNonReplicas(queryPlan, allNodes, replicas, request); } else { - // With local DC: prioritize local, then remote - Map> nodesByDc = getAllNodesByDc(); + boolean includeRemoteDcs = isDcFailoverAllowedForRequest(request); + Map> nodesByDc = + includeRemoteDcs + ? getAllNodesByDc() + : Collections.singletonMap(localDc, dcNodeList(localDc)); + Set liveNodesForPlan = + nodesByDc.values().stream() + .flatMap(List::stream) + .collect(Collectors.toCollection(LinkedHashSet::new)); + replicas = filterNodesIn(replicas, liveNodesForPlan); addReplicasByDc(queryPlan, replicas, localDc); addNonReplicasByDc(queryPlan, nodesByDc, replicas, localDc, request); } @@ -374,6 +407,10 @@ protected Queue newQueryPlanPreserveReplicas( return new SimpleQueryPlan(queryPlan.toArray()); } + private List filterNodesIn(List nodes, Set nodesToKeep) { + return nodes.stream().filter(nodesToKeep::contains).collect(Collectors.toList()); + } + /** Collect all live nodes grouped by DC, with preferred remote DCs ordered first. */ private Map> getAllNodesByDc() { Map> nodesByDc = new LinkedHashMap<>(); @@ -537,15 +574,8 @@ protected Queue maybeAddDcFailover(@Nullable Request request, @NonNull Que if (maxNodesPerRemoteDc <= 0 || localDc == null) { return local; } - if (!allowDcFailoverForLocalCl && request instanceof Statement) { - Statement statement = (Statement) request; - ConsistencyLevel consistency = statement.getConsistencyLevel(); - if (consistency == null) { - consistency = defaultConsistencyLevel; - } - if (consistency.isDcLocal()) { - return local; - } + if (!isDcFailoverAllowedForRequest(request)) { + return local; } if (preferredRemoteDcs.isEmpty()) { return new CompositeQueryPlan(local, buildRemoteQueryPlanAll()); @@ -553,6 +583,29 @@ protected Queue maybeAddDcFailover(@Nullable Request request, @NonNull Que return new CompositeQueryPlan(local, buildRemoteQueryPlanPreferred()); } + private boolean isDcFailoverAllowedForRequest(@Nullable Request request) { + if (!allowDcFailoverForLocalCl && request instanceof Statement) { + return !getEffectiveConsistency((Statement) request).isDcLocal(); + } + return true; + } + + @NonNull + private ConsistencyLevel getEffectiveConsistency(@NonNull Statement statement) { + ConsistencyLevel consistency = statement.getConsistencyLevel(); + if (consistency != null) { + return consistency; + } + + return getRequestProfile(statement) + .map( + requestProfile -> + context + .getConsistencyLevelRegistry() + .nameToLevel(requestProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY))) + .orElse(defaultConsistencyLevel); + } + private QueryPlan buildRemoteQueryPlanAll() { return new LazyQueryPlan() { diff --git a/core/src/test/java/com/datastax/oss/driver/api/core/cql/StatementBuilderTest.java b/core/src/test/java/com/datastax/oss/driver/api/core/cql/StatementBuilderTest.java index a10208645fd..286f3f8db1a 100644 --- a/core/src/test/java/com/datastax/oss/driver/api/core/cql/StatementBuilderTest.java +++ b/core/src/test/java/com/datastax/oss/driver/api/core/cql/StatementBuilderTest.java @@ -21,9 +21,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.DefaultProtocolVersion; +import com.datastax.oss.driver.api.core.RequestRoutingType; +import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; +import com.datastax.oss.driver.internal.core.cql.DefaultBoundStatement; import com.datastax.oss.driver.shaded.guava.common.base.Charsets; import edu.umd.cs.findbugs.annotations.NonNull; import java.nio.ByteBuffer; +import java.util.Collections; import org.junit.Test; public class StatementBuilderTest { @@ -103,4 +109,83 @@ public void should_match_set_routing_key_vararg() { builderStmt = builder.setRoutingKey(buff2, buff1).build(); assertThat(expectedStmt.getRoutingKey()).isNotEqualTo(builderStmt.getRoutingKey()); } + + @Test + public void should_not_copy_inferred_simple_routing_type_as_explicit() { + SimpleStatement serialStatement = + SimpleStatement.builder("select * from test.foo") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .build(); + + assertThat(serialStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + + SimpleStatement regularStatement = + SimpleStatement.builder(serialStatement) + .setConsistencyLevel(DefaultConsistencyLevel.ONE) + .build(); + + assertThat(regularStatement.getRequestRoutingType()).isNull(); + } + + @Test + public void should_not_copy_inferred_bound_routing_type_as_explicit() { + BoundStatement serialStatement = newRegularBoundStatement(DefaultConsistencyLevel.LOCAL_SERIAL); + + assertThat(serialStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + + BoundStatement regularStatement = + new BoundStatementBuilder(serialStatement) + .setConsistencyLevel(DefaultConsistencyLevel.ONE) + .build(); + + assertThat(regularStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.REGULAR); + } + + @Test + public void should_not_copy_inferred_batch_routing_type_as_explicit() { + BatchStatement serialStatement = + BatchStatement.builder(BatchType.LOGGED) + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .build(); + + assertThat(serialStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + + BatchStatement regularStatement = + BatchStatement.builder(serialStatement) + .setConsistencyLevel(DefaultConsistencyLevel.ONE) + .build(); + + assertThat(regularStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.REGULAR); + } + + private BoundStatement newRegularBoundStatement(DefaultConsistencyLevel consistencyLevel) { + PreparedStatement preparedStatement = mock(PreparedStatement.class); + ColumnDefinitions variableDefinitions = mock(ColumnDefinitions.class); + when(preparedStatement.isLWT()).thenReturn(false); + when(preparedStatement.getRequestRoutingType()).thenReturn(RequestRoutingType.REGULAR); + when(preparedStatement.getVariableDefinitions()).thenReturn(variableDefinitions); + return new DefaultBoundStatement( + preparedStatement, + variableDefinitions, + new ByteBuffer[0], + null, + null, + null, + null, + null, + Collections.emptyMap(), + null, + false, + Statement.NO_DEFAULT_TIMESTAMP, + null, + Integer.MIN_VALUE, + consistencyLevel, + null, + null, + CodecRegistry.DEFAULT, + DefaultProtocolVersion.DEFAULT, + null, + Statement.NO_NOW_IN_SECONDS, + null); + } } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatementTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatementTest.java index 3f38ddaf3cb..c62426eeffc 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatementTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatementTest.java @@ -103,6 +103,18 @@ public void should_not_issue_log_warn_if_statement_have_no_consistency_level_set verify(logger.appender, times(0)).doAppend(logger.loggingEventCaptor.capture()); } + @Test + public void should_not_infer_lwt_status_from_serial_consistency_level_option() { + BatchStatement batch = + BatchStatement.builder(BatchType.LOGGED) + .addStatement(SimpleStatement.newInstance("UPDATE foo SET v = ? WHERE pk = ?", 1, 1)) + .setSerialConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .build(); + + assertThat(batch.getRequestRoutingType()).isEqualTo(RequestRoutingType.REGULAR); + assertThat(batch.isLWT()).isFalse(); + } + @Test public void should_infer_lwt_status() { // SELECT is not allowed in practice but is sufficient for unit testing diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatementTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatementTest.java new file mode 100644 index 00000000000..7ec6232fea7 --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatementTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.cql; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.DefaultProtocolVersion; +import com.datastax.oss.driver.api.core.RequestRoutingType; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; +import com.datastax.oss.protocol.internal.util.Bytes; +import java.util.Collections; +import org.junit.Test; + +public class DefaultPreparedStatementTest { + + @Test + public void should_not_keep_inferred_routing_type_after_bound_consistency_override() { + DefaultPreparedStatement preparedStatement = + newPreparedStatement(DefaultConsistencyLevel.LOCAL_SERIAL, null, null); + + assertThat(preparedStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + + BoundStatement boundStatement = + preparedStatement.bind().setConsistencyLevel(DefaultConsistencyLevel.ONE); + + assertThat(boundStatement.getConsistencyLevel()).isEqualTo(DefaultConsistencyLevel.ONE); + assertThat(boundStatement.getRequestRoutingType()).isNull(); + } + + @Test + public void should_not_infer_routing_type_from_prepared_serial_consistency_level_option() { + DefaultPreparedStatement preparedStatement = + newPreparedStatement(null, DefaultConsistencyLevel.LOCAL_SERIAL, null); + + assertThat(preparedStatement.getRequestRoutingType()).isNull(); + assertThat(preparedStatement.bind().getRequestRoutingType()).isNull(); + } + + @Test + public void should_not_infer_routing_type_from_bound_serial_consistency_level_override() { + DefaultPreparedStatement preparedStatement = newPreparedStatement(null, null, null); + + BoundStatement boundStatement = + preparedStatement.bind().setSerialConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL); + + assertThat(boundStatement.getRequestRoutingType()).isNull(); + } + + @Test + public void should_keep_detected_lwt_routing_type_after_bound_consistency_override() { + DefaultPreparedStatement preparedStatement = + newPreparedStatement(DefaultConsistencyLevel.LOCAL_SERIAL, null, RequestRoutingType.LWT); + + BoundStatement boundStatement = + preparedStatement.bind().setConsistencyLevel(DefaultConsistencyLevel.ONE); + + assertThat(boundStatement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + } + + private DefaultPreparedStatement newPreparedStatement( + ConsistencyLevel consistencyLevel, + ConsistencyLevel serialConsistencyLevel, + RequestRoutingType requestRoutingType) { + ColumnDefinitions variableDefinitions = + DefaultColumnDefinitions.valueOf(Collections.emptyList()); + return new DefaultPreparedStatement( + Bytes.fromHexString("0x"), + "SELECT * FROM test.foo WHERE pk = ?", + variableDefinitions, + Collections.emptyList(), + null, + null, + null, + null, + Collections.emptyMap(), + null, + null, + null, + null, + null, + Collections.emptyMap(), + null, + null, + null, + Integer.MIN_VALUE, + consistencyLevel, + serialConsistencyLevel, + false, + CodecRegistry.DEFAULT, + DefaultProtocolVersion.DEFAULT, + requestRoutingType); + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicyDcFailoverTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicyDcFailoverTest.java index 4ba2c3829ce..6e8360942c9 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicyDcFailoverTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicyDcFailoverTest.java @@ -33,8 +33,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint; import com.datastax.oss.driver.internal.core.metadata.DefaultNode; @@ -57,6 +59,45 @@ public class BasicLoadBalancingPolicyDcFailoverTest extends BasicLoadBalancingPo @Mock protected DefaultNode node8; @Mock protected DefaultNode node9; + @Test + public void should_not_add_remote_nodes_for_preserve_routing_with_local_serial_consistency() { + when(defaultProfile.getString( + DefaultDriverOption.LOAD_BALANCING_DEFAULT_LWT_REQUEST_ROUTING_METHOD)) + .thenReturn("PRESERVE_REPLICA_ORDER"); + policy = createAndInitPolicy(); + SimpleStatement statement = + SimpleStatement.newInstance("SELECT * FROM ks.foo") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY); + when(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .thenReturn(ImmutableList.of(node7, node1, node2)); + + assertThat(policy.newQueryPlan(statement, session)) + .containsOnlyElementsOf(policy.getLiveNodes().dc("dc1")); + } + + @Test + public void should_ignore_down_replicas_for_preserve_routing_with_local_serial_consistency() { + when(defaultProfile.getString( + DefaultDriverOption.LOAD_BALANCING_DEFAULT_LWT_REQUEST_ROUTING_METHOD)) + .thenReturn("PRESERVE_REPLICA_ORDER"); + policy = createAndInitPolicy(); + SimpleStatement statement = + SimpleStatement.newInstance("SELECT * FROM ks.foo") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY); + when(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .thenReturn(ImmutableList.of(node7, node1, node2)); + + for (Node node : ImmutableList.copyOf(policy.getLiveNodes().dc("dc1"))) { + policy.onDown(node); + } + + assertThat(policy.newQueryPlan(statement, session)).isEmpty(); + } + @Test @Override public void should_prioritize_single_replica() { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestRoutingTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestRoutingTest.java index 4877659092f..43d14326ba4 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestRoutingTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestRoutingTest.java @@ -25,17 +25,27 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import com.datastax.oss.driver.api.core.RequestRoutingType; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BatchType; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.metadata.token.Token; import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.internal.core.cql.DefaultBoundStatement; import com.datastax.oss.driver.internal.core.loadbalancing.BasicLoadBalancingPolicy.RequestRoutingMethod; import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; @@ -43,6 +53,7 @@ import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import com.datastax.oss.protocol.internal.util.Bytes; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Optional; import java.util.Queue; import java.util.UUID; @@ -165,6 +176,259 @@ public void should_dispatch_to_regular_query_plan_when_request_is_regular() { assertThat(plan2).containsExactlyInAnyOrder(node1, node2, node3); } + @Test + public void + should_dispatch_to_preserve_query_plan_when_simple_local_serial_select_and_config_preserve() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.isLWT()).isFalse(); + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_preserve_query_plan_when_new_instance_local_serial_select_and_config_preserve() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + SimpleStatement statement = + SimpleStatement.newInstance( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.isLWT()).isFalse(); + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_preserve_query_plan_when_bound_local_serial_select_and_config_preserve() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + BoundStatement statement = newRegularBoundStatementWithLocalSerialConsistency(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.isLWT()).isFalse(); + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_regular_query_plan_when_simple_has_only_serial_consistency_level_option() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setSerialConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getRequestRoutingType()).isNull(); + assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR); + assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_regular_query_plan_when_bound_has_only_serial_consistency_level_option() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + BoundStatement statement = newRegularBoundStatement(null, DefaultConsistencyLevel.LOCAL_SERIAL); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.REGULAR); + assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR); + assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_regular_query_plan_when_batch_has_only_serial_consistency_level_option() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + BatchStatement statement = + BatchStatement.builder(BatchType.LOGGED) + .addStatement(SimpleStatement.newInstance("UPDATE foo SET v = ? WHERE pk = ?", 1, 1)) + .setSerialConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.REGULAR); + assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR); + assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3); + } + + @Test + public void should_dispatch_to_regular_query_plan_when_local_serial_select_and_config_regular() { + // Given + initPolicy("REGULAR"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.isLWT()).isFalse(); + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR); + assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3); + } + + @Test + public void should_dispatch_to_preserve_query_plan_when_profile_has_local_serial_consistency() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + DriverExecutionProfile serialProfile = mock(DriverExecutionProfile.class); + when(serialProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY)) + .thenReturn("LOCAL_SERIAL"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setExecutionProfile(serialProfile) + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getConsistencyLevel()).isNull(); + assertThat(statement.getRequestRoutingType()).isNull(); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_preserve_query_plan_when_default_profile_has_local_serial_consistency() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + when(defaultProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY)) + .thenReturn("LOCAL_SERIAL"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getConsistencyLevel()).isNull(); + assertThat(statement.getRequestRoutingType()).isNull(); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + + @Test + public void + should_dispatch_to_preserve_query_plan_when_profile_name_has_local_serial_consistency() { + // Given + initPolicy("PRESERVE_REPLICA_ORDER"); + DriverExecutionProfile serialProfile = mock(DriverExecutionProfile.class); + when(config.getProfile("serial")).thenReturn(serialProfile); + when(serialProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY)) + .thenReturn("LOCAL_SERIAL"); + SimpleStatement statement = + SimpleStatement.builder( + "SELECT * FROM unique_key_value " + + "WHERE unique_key=? AND unique_value=? AND context=?") + .setExecutionProfileName("serial") + .setRoutingKeyspace(KEYSPACE) + .setRoutingKey(ROUTING_KEY) + .build(); + given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY)) + .willReturn(ImmutableList.of(node1, node2)); + + // When + Queue plan = policy.newQueryPlan(statement, session); + + // Then + assertThat(statement.getConsistencyLevel()).isNull(); + assertThat(statement.getRequestRoutingType()).isNull(); + assertThat(policy.getRequestRoutingMethod(statement)) + .isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER); + assertThat(plan).containsExactly(node1, node2, node3); + } + @Test public void should_dispatch_to_preserve_query_plan_when_lwt_and_config_preserve() { // Given @@ -186,6 +450,42 @@ public void should_dispatch_to_preserve_query_plan_when_lwt_and_config_preserve( assertThat(plan3).containsExactly(node2, node1, node3); } + private BoundStatement newRegularBoundStatementWithLocalSerialConsistency() { + return newRegularBoundStatement(DefaultConsistencyLevel.LOCAL_SERIAL, null); + } + + private BoundStatement newRegularBoundStatement( + DefaultConsistencyLevel consistencyLevel, DefaultConsistencyLevel serialConsistencyLevel) { + PreparedStatement preparedStatement = mock(PreparedStatement.class); + ColumnDefinitions variableDefinitions = mock(ColumnDefinitions.class); + when(preparedStatement.isLWT()).thenReturn(false); + when(preparedStatement.getRequestRoutingType()).thenReturn(RequestRoutingType.REGULAR); + when(preparedStatement.getVariableDefinitions()).thenReturn(variableDefinitions); + return new DefaultBoundStatement( + preparedStatement, + variableDefinitions, + new ByteBuffer[0], + null, + null, + KEYSPACE, + ROUTING_KEY, + null, + Collections.emptyMap(), + null, + false, + Statement.NO_DEFAULT_TIMESTAMP, + null, + Integer.MIN_VALUE, + consistencyLevel, + serialConsistencyLevel, + null, + null, + null, + null, + Statement.NO_NOW_IN_SECONDS, + null); + } + @Test public void should_dispatch_to_regular_query_plan_when_lwt_but_config_regular() { // Given diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/LWTLoadBalancingMultiDcIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/LWTLoadBalancingMultiDcIT.java index 3e7d4de2b27..50ab832523c 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/LWTLoadBalancingMultiDcIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/LWTLoadBalancingMultiDcIT.java @@ -23,9 +23,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.junit.Assume.assumeTrue; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import com.datastax.oss.driver.api.core.ProtocolVersion; import com.datastax.oss.driver.api.core.RequestRoutingType; import com.datastax.oss.driver.api.core.Version; @@ -33,8 +35,10 @@ import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; import com.datastax.oss.driver.api.core.cql.BatchType; +import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; @@ -57,6 +61,7 @@ public class LWTLoadBalancingMultiDcIT { private static final String LOCAL_DC = "dc1"; private static final String KEYSPACE = "test"; + private static final String LOCAL_SERIAL_PROFILE = "local-serial"; private static final CustomCcmRule CCM_RULE = CustomCcmRule.builder().withNodes(2, 1).build(); // 2 nodes in DC1, 1 node in DC2 @@ -67,7 +72,12 @@ public class LWTLoadBalancingMultiDcIT { .withConfigLoader( SessionUtils.configLoaderBuilder() .withString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, LOCAL_DC) + .withString( + DefaultDriverOption.LOAD_BALANCING_DEFAULT_LWT_REQUEST_ROUTING_METHOD, + "PRESERVE_REPLICA_ORDER") .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30)) + .startProfile(LOCAL_SERIAL_PROFILE) + .withString(DefaultDriverOption.REQUEST_CONSISTENCY, "LOCAL_SERIAL") .build()) .build(); @@ -76,6 +86,7 @@ public class LWTLoadBalancingMultiDcIT { public static final int FIRST_TEST_PARTITION_KEY = 4242; public static final int SECOND_TEST_PARTITION_KEY = 4343; + public static final int THIRD_TEST_PARTITION_KEY = 4444; public static final int NUM_TEST_ITERATIONS = 30; @BeforeClass @@ -207,4 +218,59 @@ public void should_route_lwt_batch_to_local_dc_replicas() { assertThat(coordinators).isSubsetOf(localReplicas); assertThat(coordinatorDcs).containsOnly(LOCAL_DC); } + + @Test + public void should_route_prepared_local_serial_simple_select_as_lwt() { + assumeTrue(CcmBridge.isDistributionOf(BackendType.SCYLLA)); + + CqlSession session = SESSION_RULE.session(); + SimpleStatement simpleSelect = + SimpleStatement.builder("SELECT * FROM test.foo WHERE pk = ? AND ck = ?") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL) + .build(); + PreparedStatement select = session.prepare(simpleSelect); + BoundStatement statement = select.bind(THIRD_TEST_PARTITION_KEY, 0); + + assertThat(simpleSelect.isLWT()).isFalse(); + assertThat(simpleSelect.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(simpleSelect.getConsistencyLevel()).isEqualTo(DefaultConsistencyLevel.LOCAL_SERIAL); + + assertThat(select.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + + assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT); + assertThat(statement.getRoutingKeyspace()).isEqualTo(CqlIdentifier.fromCql(KEYSPACE)); + assertThat(statement.getRoutingKey()).isNotNull(); + assertThat(statement.getConsistencyLevel()).isEqualTo(DefaultConsistencyLevel.LOCAL_SERIAL); + + ResultSet result = session.execute(statement); + assertThat(result.getExecutionInfo().getCoordinator()).isNotNull(); + } + + @Test + public void should_route_prepared_profiled_local_serial_simple_select_with_lwt_policy() { + assumeTrue(CcmBridge.isDistributionOf(BackendType.SCYLLA)); + + CqlSession session = SESSION_RULE.session(); + SimpleStatement simpleSelect = + SimpleStatement.builder("SELECT * FROM test.foo WHERE pk = ? AND ck = ?") + .setExecutionProfileName(LOCAL_SERIAL_PROFILE) + .build(); + PreparedStatement select = session.prepare(simpleSelect); + BoundStatement statement = select.bind(THIRD_TEST_PARTITION_KEY, 0); + + assertThat(simpleSelect.isLWT()).isFalse(); + assertThat(simpleSelect.getRequestRoutingType()).isNull(); + assertThat(simpleSelect.getConsistencyLevel()).isNull(); + + assertThat(select.getRequestRoutingType()).isNull(); + + assertThat(statement.getRequestRoutingType()).isNull(); + assertThat(statement.getExecutionProfileName()).isEqualTo(LOCAL_SERIAL_PROFILE); + assertThat(statement.getRoutingKeyspace()).isEqualTo(CqlIdentifier.fromCql(KEYSPACE)); + assertThat(statement.getRoutingKey()).isNotNull(); + assertThat(statement.getConsistencyLevel()).isNull(); + + ResultSet result = session.execute(statement); + assertThat(result.getExecutionInfo().getCoordinator()).isNotNull(); + } } diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java index 9a01a8c5164..2395e55f5c0 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java @@ -4,6 +4,9 @@ import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.RequestRoutingType; +import com.datastax.oss.driver.api.core.Version; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; @@ -14,6 +17,7 @@ import com.datastax.oss.driver.api.core.metadata.Tablet; import com.datastax.oss.driver.api.testinfra.ScyllaOnly; import com.datastax.oss.driver.api.testinfra.ScyllaRequirement; +import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge; import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule; import com.datastax.oss.driver.api.testinfra.session.SessionRule; import com.datastax.oss.driver.api.testinfra.session.SessionUtils; @@ -25,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -49,7 +54,10 @@ public class DefaultMetadataTabletMapIT { private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataTabletMapIT.class); private static final CustomCcmRule CCM_RULE = CustomCcmRule.builder() - .withNodes(2) + // Drop nodes back to 2 once https://github.com/scylladb/scylladb/issues/29874 is fixed + // After 2026.1 Scylla does not send tablet hint on a wrong-shard for LWT queries + // 3rd node makes one node completely incorrect, that is when Scylla sends tablet hint + .withNodes(3) .withCassandraConfiguration( "experimental_features", "['consistent-topology-changes','tablets']") .build(); @@ -67,6 +75,8 @@ public class DefaultMetadataTabletMapIT { private static final int INITIAL_TABLETS = 32; private static final int QUERIES = 1600; private static final int REPLICATION_FACTOR = 2; + private static final Version SCYLLA_LWT_TABLETS_SUPPORT_VERSION = + Objects.requireNonNull(Version.parse("2026.1")); private static final String KEYSPACE_NAME = "tabletsTest"; private static final String TABLE_NAME = "tabletsTable"; private static final String CREATE_KEYSPACE_QUERY = @@ -120,6 +130,14 @@ public class DefaultMetadataTabletMapIT { private static final SimpleStatement STMT_SELECT_CK_CONCRETE = buildStatement("SELECT pk, ck FROM %s.%s WHERE pk = ? AND ck = 1"); + private static final SimpleStatement STMT_SELECT_LOCAL_SERIAL = + buildStatement("SELECT pk,ck FROM %s.%s WHERE pk = ? AND ck = ?") + .setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL); + + private static final SimpleStatement STMT_SELECT_SERIAL = + buildStatement("SELECT pk,ck FROM %s.%s WHERE pk = ? AND ck = ?") + .setConsistencyLevel(DefaultConsistencyLevel.SERIAL); + private static final SimpleStatement STMT_UPDATE = buildStatement("UPDATE %s.%s SET val = 1 WHERE pk = ? AND ck = ?"); @@ -195,6 +213,9 @@ public void every_statement_should_deliver_tablet_info() { statements.put("SELECT_CONCRETE_PREPARED", s -> s.prepare(STMT_SELECT_CONCRETE).bind()); statements.put("SELECT_PK_CONCRETE_PREPARED", s -> s.prepare(STMT_SELECT_PK_CONCRETE).bind(2)); statements.put("SELECT_CK_CONCRETE_PREPARED", s -> s.prepare(STMT_SELECT_CK_CONCRETE).bind(2)); + statements.put( + "SELECT_LOCAL_SERIAL_PREPARED", s -> s.prepare(STMT_SELECT_LOCAL_SERIAL).bind(2, 2)); + statements.put("SELECT_SERIAL_PREPARED", s -> s.prepare(STMT_SELECT_SERIAL).bind(2, 2)); statements.put("INSERT_CONCRETE", s -> STMT_INSERT_CONCRETE); statements.put("INSERT_PREPARED", s -> s.prepare(STMT_INSERT).bind(2, 2)); statements.put("INSERT_NO_KS_PREPARED", s -> s.prepare(STMT_INSERT_NO_KS).bind(2, 2)); @@ -227,8 +248,9 @@ public void every_statement_should_deliver_tablet_info() { // Scylla does not return tablet info for queries with PK built into query continue; } - if (stmtEntry.getKey().contains("LWT")) { - // LWT is not yet supported by scylla on tables with tablets + if ((stmtEntry.getKey().contains("LWT") || stmtEntry.getKey().contains("SERIAL")) + && !isLWTTabletsSupported()) { + // LWT is supported on tables with tablets starting with Scylla 2026.1. continue; } if (sessionEntry.getKey().equals("REGULAR") && stmtEntry.getKey().contains("NO_KS")) { @@ -256,6 +278,17 @@ public void every_statement_should_deliver_tablet_info() { ex.addSuppressed(e); throw ex; } + + if (stmtEntry.getKey().contains("SERIAL")) { + if (stmt.getRequestRoutingType() != RequestRoutingType.LWT) { + testErrors.add( + String.format( + "Statement %s on session %s is routed as regular query", + stmtEntry.getKey(), sessionEntry.getKey())); + continue; + } + } + try { if (!executeOnAllHostsAndReturnIfResultHasTabletsInfo(session, stmt)) { testErrors.add( @@ -296,7 +329,7 @@ public void every_statement_should_deliver_tablet_info() { } @Test - public void should_receive_each_tablet_exactly_once() { + public void should_receive_all_tablets_and_stop_receiving_tablet_info() { int counter = 0; try (CqlSession session = CqlSession.builder().addContactEndPoints(CCM_RULE.getContactPoints()).build()) { @@ -306,7 +339,7 @@ public void should_receive_each_tablet_exactly_once() { counter++; } } - Assert.assertEquals(INITIAL_TABLETS, counter); + assertReceivedAtLeastOnePayloadPerTablet(counter); assertSessionTabletMapIsFilled(session); } @@ -322,8 +355,8 @@ public void should_receive_each_tablet_exactly_once() { LOG.debug("Ran first set of queries"); - // With enough queries we should hit a wrong node for each tablet exactly once. - Assert.assertEquals(INITIAL_TABLETS, counter); + // With enough queries we should hit a wrong node for each tablet at least once. + assertReceivedAtLeastOnePayloadPerTablet(counter); assertSessionTabletMapIsFilled(session); // All tablet information should be available by now (unless for some reason cluster did sth @@ -342,6 +375,13 @@ public void should_receive_each_tablet_exactly_once() { } } + private static void assertReceivedAtLeastOnePayloadPerTablet(int payloadsCount) { + Assert.assertTrue( + String.format( + "Expected at least %d tablet payloads, got %d", INITIAL_TABLETS, payloadsCount), + payloadsCount >= INITIAL_TABLETS); + } + private static boolean waitSessionLearnedTabletInfo(CqlSession session) { try { await() @@ -354,6 +394,12 @@ private static boolean waitSessionLearnedTabletInfo(CqlSession session) { } } + private static boolean isLWTTabletsSupported() { + return CcmBridge.getScyllaVersion() + .map(version -> version.compareTo(SCYLLA_LWT_TABLETS_SUPPORT_VERSION) >= 0) + .orElse(false); + } + private static boolean checkIfRoutedProperly(CqlSession session, Statement stmt) { // DefaultLoadBalancingPolicy suppose to prioritize nodes from replica list randomly shuffling // them