Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.RequestRoutingType;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.internal.core.cql.DefaultSimpleStatement;
Expand Down Expand Up @@ -86,7 +85,7 @@ static SimpleStatement newInstance(@NonNull String cqlQuery) {
null,
null,
Statement.NO_NOW_IN_SECONDS,
RequestRoutingType.REGULAR);
null);
}

/**
Expand Down Expand Up @@ -121,7 +120,7 @@ static SimpleStatement newInstance(
null,
null,
Statement.NO_NOW_IN_SECONDS,
RequestRoutingType.REGULAR);
null);
}

/**
Expand Down Expand Up @@ -153,7 +152,7 @@ static SimpleStatement newInstance(
null,
null,
Statement.NO_NOW_IN_SECONDS,
RequestRoutingType.REGULAR);
null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import com.datastax.oss.driver.internal.core.cql.RequestRoutingTypeAccessor;
import com.datastax.oss.driver.internal.core.util.RoutingKey;
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Down Expand Up @@ -89,7 +90,15 @@ protected StatementBuilder(StatementT template) {
this.timeout = template.getTimeout();
this.node = template.getNode();
this.nowInSeconds = template.getNowInSeconds();
this.requestRoutingType = template.getRequestRoutingType();
this.requestRoutingType = getConfiguredRequestRoutingType(template);
}

@Nullable
private RequestRoutingType getConfiguredRequestRoutingType(StatementT template) {
if (template instanceof RequestRoutingTypeAccessor) {
return ((RequestRoutingTypeAccessor) template).getConfiguredRequestRoutingType();
}
return template.getRequestRoutingType();
}

/** @see Statement#setExecutionProfileName(String) */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ public static DefaultPreparedStatement toPreparedStatement(
context.getProtocolVersion(),
lwtInfo != null && lwtInfo.isLwt(response.variablesMetadata.flags)
? RequestRoutingType.LWT
: RequestRoutingType.REGULAR);
: null);
Comment thread
dkropachev marked this conversation as resolved.
}

public static ColumnDefinitions toColumnDefinitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.slf4j.LoggerFactory;

@Immutable
public class DefaultBatchStatement implements BatchStatement {
public class DefaultBatchStatement implements BatchStatement, RequestRoutingTypeAccessor {
private static final Logger LOG = LoggerFactory.getLogger(DefaultBatchStatement.class);

private final BatchType batchType;
Expand Down Expand Up @@ -857,6 +857,8 @@ public BatchStatement setNowInSeconds(int newNowInSeconds) {
public RequestRoutingType getRequestRoutingType() {
if (Objects.nonNull(requestRoutingType)) {
return requestRoutingType;
} else if (consistencyLevel != null && consistencyLevel.isSerial()) {
return RequestRoutingType.LWT;
} else if (Objects.isNull(
cachedStatementsRequestRoutingType)) { // Immutability of the statement list and statements
// allows us to cache the result
Expand All @@ -870,6 +872,12 @@ public RequestRoutingType getRequestRoutingType() {
return cachedStatementsRequestRoutingType;
}

@Nullable
@Override
public RequestRoutingType getConfiguredRequestRoutingType() {
return requestRoutingType;
}

@NonNull
@Override
public BatchStatement setRequestRoutingType(RequestRoutingType requestRoutingType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import net.jcip.annotations.Immutable;

@Immutable
public class DefaultBoundStatement implements BoundStatement {
public class DefaultBoundStatement implements BoundStatement, RequestRoutingTypeAccessor {

private final PreparedStatement preparedStatement;
private final ColumnDefinitions variableDefinitions;
Expand Down Expand Up @@ -805,9 +804,22 @@ public BoundStatement setNowInSeconds(int newNowInSeconds) {
@Nullable
@Override
public RequestRoutingType getRequestRoutingType() {
return Objects.nonNull(requestRoutingType)
? requestRoutingType
: preparedStatement.getRequestRoutingType();
if (requestRoutingType != null) {
return requestRoutingType;
}
if (consistencyLevel != null && consistencyLevel.isSerial()) {
Comment thread
dkropachev marked this conversation as resolved.
return RequestRoutingType.LWT;
}
if (preparedStatement instanceof RequestRoutingTypeAccessor) {
return ((RequestRoutingTypeAccessor) preparedStatement).getConfiguredRequestRoutingType();
}
return preparedStatement.getRequestRoutingType();
}

@Nullable
@Override
public RequestRoutingType getConfiguredRequestRoutingType() {
return requestRoutingType;
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.slf4j.LoggerFactory;

@ThreadSafe
public class DefaultPreparedStatement implements PreparedStatement {
public class DefaultPreparedStatement implements PreparedStatement, RequestRoutingTypeAccessor {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPreparedStatement.class);
private static final Splitter SPACE_SPLITTER = Splitter.onPattern("\\s+");
private static final Splitter COMMA_SPLITTER = Splitter.onPattern(",");
Expand Down Expand Up @@ -196,6 +196,20 @@ public boolean isLWT() {
@Nullable
@Override
public RequestRoutingType getRequestRoutingType() {
if (requestRoutingType != null) {
return requestRoutingType;
}

if (consistencyLevelForBoundStatements != null
&& consistencyLevelForBoundStatements.isSerial()) {
Comment thread
dkropachev marked this conversation as resolved.
return RequestRoutingType.LWT;
}
return null;
}

@Nullable
@Override
public RequestRoutingType getConfiguredRequestRoutingType() {
return requestRoutingType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import net.jcip.annotations.Immutable;

@Immutable
public class DefaultSimpleStatement implements SimpleStatement {
public class DefaultSimpleStatement implements SimpleStatement, RequestRoutingTypeAccessor {

private final String query;
private final List<Object> positionalValues;
Expand Down Expand Up @@ -776,6 +776,18 @@ public SimpleStatement setNowInSeconds(int newNowInSeconds) {
@Nullable
@Override
public RequestRoutingType getRequestRoutingType() {
if (requestRoutingType != null) {
return requestRoutingType;
}
if (consistencyLevel != null && consistencyLevel.isSerial()) {
return RequestRoutingType.LWT;
}
return null;
Comment thread
dkropachev marked this conversation as resolved.
}

@Nullable
@Override
public RequestRoutingType getConfiguredRequestRoutingType() {
return requestRoutingType;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.internal.core.cql;

import com.datastax.oss.driver.api.core.RequestRoutingType;
import edu.umd.cs.findbugs.annotations.Nullable;

/** Internal hook to distinguish stored routing type from consistency-inferred routing type. */
public interface RequestRoutingTypeAccessor {
@Nullable
RequestRoutingType getConfiguredRequestRoutingType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,37 @@ public RequestRoutingMethod getRequestRoutingMethod(@Nullable Request request) {
if (request == null) {
return RequestRoutingMethod.REGULAR;
}
if (request.getRequestRoutingType() == RequestRoutingType.LWT) {
RequestRoutingType requestRoutingType = request.getRequestRoutingType();
if (requestRoutingType == RequestRoutingType.LWT
|| (requestRoutingType == null && hasSerialConsistency(request))) {
return lwtRequestRoutingMethod;
} else {
return RequestRoutingMethod.REGULAR;
}
}

private boolean hasSerialConsistency(@NonNull Request request) {
if (!(request instanceof Statement)) {
return false;
}

return getEffectiveConsistency((Statement<?>) request).isSerial();
}

@NonNull
private Optional<DriverExecutionProfile> getRequestProfile(@NonNull Request request) {
DriverExecutionProfile requestProfile = request.getExecutionProfile();
if (requestProfile != null) {
return Optional.of(requestProfile);
}

String profileName = request.getExecutionProfileName();
if (profileName != null && !profileName.isEmpty()) {
return Optional.of(context.getConfig().getProfile(profileName));
}
return Optional.of(profile);
}

/**
* Returns the local datacenter name, if known; empty otherwise.
*
Expand Down Expand Up @@ -362,18 +386,31 @@ protected Queue<Node> newQueryPlanPreserveReplicas(
for (Object obj : getLiveNodes().dc(null).toArray()) {
allNodes.add((Node) obj);
}
replicas = filterNodesIn(replicas, new LinkedHashSet<>(allNodes));
queryPlan.addAll(replicas);
addRotatedNonReplicas(queryPlan, allNodes, replicas, request);
} else {
// With local DC: prioritize local, then remote
Map<String, List<Node>> nodesByDc = getAllNodesByDc();
boolean includeRemoteDcs = isDcFailoverAllowedForRequest(request);
Map<String, List<Node>> nodesByDc =
includeRemoteDcs
? getAllNodesByDc()
: Collections.singletonMap(localDc, dcNodeList(localDc));
Set<Node> liveNodesForPlan =
nodesByDc.values().stream()
.flatMap(List::stream)
.collect(Collectors.toCollection(LinkedHashSet::new));
replicas = filterNodesIn(replicas, liveNodesForPlan);
addReplicasByDc(queryPlan, replicas, localDc);
addNonReplicasByDc(queryPlan, nodesByDc, replicas, localDc, request);
}

return new SimpleQueryPlan(queryPlan.toArray());
}

private List<Node> filterNodesIn(List<Node> nodes, Set<Node> nodesToKeep) {
return nodes.stream().filter(nodesToKeep::contains).collect(Collectors.toList());
}

/** Collect all live nodes grouped by DC, with preferred remote DCs ordered first. */
private Map<String, List<Node>> getAllNodesByDc() {
Map<String, List<Node>> nodesByDc = new LinkedHashMap<>();
Expand Down Expand Up @@ -537,22 +574,38 @@ protected Queue<Node> maybeAddDcFailover(@Nullable Request request, @NonNull Que
if (maxNodesPerRemoteDc <= 0 || localDc == null) {
return local;
}
if (!allowDcFailoverForLocalCl && request instanceof Statement) {
Statement<?> statement = (Statement<?>) request;
ConsistencyLevel consistency = statement.getConsistencyLevel();
if (consistency == null) {
consistency = defaultConsistencyLevel;
}
if (consistency.isDcLocal()) {
return local;
}
if (!isDcFailoverAllowedForRequest(request)) {
return local;
}
if (preferredRemoteDcs.isEmpty()) {
return new CompositeQueryPlan(local, buildRemoteQueryPlanAll());
}
return new CompositeQueryPlan(local, buildRemoteQueryPlanPreferred());
}

private boolean isDcFailoverAllowedForRequest(@Nullable Request request) {
if (!allowDcFailoverForLocalCl && request instanceof Statement) {
return !getEffectiveConsistency((Statement<?>) request).isDcLocal();
}
return true;
}

@NonNull
private ConsistencyLevel getEffectiveConsistency(@NonNull Statement<?> statement) {
ConsistencyLevel consistency = statement.getConsistencyLevel();
if (consistency != null) {
return consistency;
}

return getRequestProfile(statement)
.map(
requestProfile ->
context
.getConsistencyLevelRegistry()
.nameToLevel(requestProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY)))
.orElse(defaultConsistencyLevel);
}

private QueryPlan buildRemoteQueryPlanAll() {

return new LazyQueryPlan() {
Expand Down
Loading
Loading