Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
<version>3.3.4-SNAPSHOT</version>
<version>3.4.0</version>
<packaging>jar</packaging>
<name>TiKV Java Client</name>
<description>A Java Client for TiKV</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {
try (RegionStoreClient client = builder.build(startKey)) {
client.setTimeout(conf.getScanTimeout());
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
currentCache = client.scan(backOffer, startKey, version);
currentCache = client.scan(backOffer, startKey, rangeEndKey, version, keyOnly);
// If we get region before scan, we will use region from cache which
// may have wrong end key. This may miss some regions that split from old region.
// Client will get the newest region during scan. So we need to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
protected final RegionStoreClientBuilder builder;
protected List<Kvrpcpb.KvPair> currentCache;
protected ByteString startKey;
protected ByteString rangeEndKey;
protected int index = -1;
protected int limit;
protected boolean keyOnly;
Expand All @@ -52,7 +53,8 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
int limit,
boolean keyOnly) {
this.startKey = requireNonNull(startKey, "start key is null");
this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null"));
this.rangeEndKey = requireNonNull(endKey, "end key is null");
this.endKey = Key.toRawKey(this.rangeEndKey);
this.hasEndKey = !endKey.isEmpty();
this.limit = limit;
this.keyOnly = keyOnly;
Expand Down
40 changes: 29 additions & 11 deletions src/main/java/org/tikv/common/region/RegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -337,19 +337,35 @@ private List<KvPair> handleBatchGetResponse(

public List<KvPair> scan(
BackOffer backOffer, ByteString startKey, long version, boolean keyOnly) {
return scan(backOffer, startKey, ByteString.EMPTY, version, keyOnly);
}

public List<KvPair> scan(
BackOffer backOffer, ByteString startKey, ByteString endKey, long version, boolean keyOnly) {
boolean forWrite = false;
while (true) {
Supplier<ScanRequest> request =
() ->
ScanRequest.newBuilder()
.setContext(
makeContext(
getResolvedLocks(version), this.storeType, backOffer.getSlowLog()))
.setStartKey(codec.encodeKey(startKey))
.setVersion(version)
.setKeyOnly(keyOnly)
.setLimit(getConf().getScanBatchSize())
.build();
() -> {
ScanRequest.Builder b =
ScanRequest.newBuilder()
.setContext(
makeContext(
getResolvedLocks(version), this.storeType, backOffer.getSlowLog()))
.setVersion(version)
.setKeyOnly(keyOnly)
.setLimit(getConf().getScanBatchSize());

// API version matters here: v2 transactional keys are keyspace-prefixed; encodeRange
// applies that encoding to both bounds. v1 passes raw keys and only sets start on the
// RPC in this client path
if (getConf().getApiVersion().isV2()) {
Pair<ByteString, ByteString> range = codec.encodeRange(startKey, endKey);
b.setStartKey(range.first).setEndKey(range.second);
} else {
b.setStartKey(codec.encodeKey(startKey));
}
return b.build();
};

KVErrorHandler<ScanResponse> handler =
new KVErrorHandler<>(
Expand All @@ -367,7 +383,9 @@ public List<KvPair> scan(
region = regionManager.getRegionByKey(startKey, backOffer);

if (handleScanResponse(backOffer, resp, version, forWrite)) {
return resp.getPairsList();
// Return logical user-space keys (same as batchGet) so scan iterators, prefix bounds, and
// get() stay consistent in API v2; wire format includes the keyspace prefix.
return codec.decodeKvPairs(resp.getPairsList());
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/test/java/org/tikv/common/KVMockServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.key.Key;
Expand Down Expand Up @@ -362,9 +363,13 @@ public void kvScan(
}
} else {
SortedMap<Key, ByteString> kvs = dataMap.tailMap(key);
Stream<Map.Entry<Key, ByteString>> entryStream = kvs.entrySet().stream();
if (!request.getEndKey().isEmpty()) {
Key rangeEnd = toRawKey(request.getEndKey());
entryStream = entryStream.filter(entry -> entry.getKey().compareTo(rangeEnd) < 0);
}
builder.addAllPairs(
kvs.entrySet()
.stream()
entryStream
.map(
kv -> {
Kvrpcpb.KvPair.Builder kvBuilder =
Expand Down
Loading