diff --git a/bigtable-dataflow-parent/bigtable-beam-import/run-snapshot-import.sh b/bigtable-dataflow-parent/bigtable-beam-import/run-snapshot-import.sh new file mode 100755 index 0000000000..6e78f7768f --- /dev/null +++ b/bigtable-dataflow-parent/bigtable-beam-import/run-snapshot-import.sh @@ -0,0 +1,139 @@ +#!/bin/bash + +# This script runs a range of Dataflow snapshot import jobs sequentially. +# It should be executed from the 'bigtable-dataflow-parent/bigtable-beam-import' directory. +# +# Usage: ./run-snapshot-import.sh +# Or: ./run-snapshot-import.sh --all +# Example: ./run-snapshot-import.sh 0 3 +# Example: ./run-snapshot-import.sh --all +# +# You can override default configurations by setting environment variables in your terminal. +# Example: TABLE_NAME="my-table" SNAPSHOT_NAME="my-snap" ./run-snapshot-import.sh 0 3 +# +# NOTE: If you are running on a newer JDK (like Java 21 or 26) and hit ByteBuddy errors, +# you can add '-Dnet.bytebuddy.experimental=true' to the java command lines below. +# +# --- Manual Parallel Execution --- +# To run shards in parallel groups of 4 (assuming 20 shards total), you can run 5 instances of this script. +# +# IMPORTANT: Shard 0 performs the restore step. You MUST run the first group (including shard 0) +# first and let it complete the restore step before launching other groups in parallel, +# otherwise they will fail because the restored files won't exist yet! +# +# Example for manual parallel execution: +# ./run-snapshot-import.sh 0 3 & # Run this first! +# # Wait for shard 0 to finish restore, then run the rest: +# ./run-snapshot-import.sh 4 7 & +# ./run-snapshot-import.sh 8 11 & +# ./run-snapshot-import.sh 12 15 & +# ./run-snapshot-import.sh 16 19 & +# +# --- Automated Parallel Execution --- +# Alternatively, use the --all flag to automatically handle the restore step and launch all groups: +# ./run-snapshot-import.sh --all + +if [ "$#" -ne 2 ] && [ "$1" != "--all" ]; then + echo "Usage: $0 " + echo " Or: $0 --all" + exit 1 +fi + +START_SHARD=$1 +END_SHARD=$2 + +# Configurations (Uses environment variables if set, otherwise defaults) +export PROJECT_ID="${PROJECT_ID:-google.com:cloud-bigtable-dev}" +export INSTANCE_ID="${INSTANCE_ID:-tianlei-test-inst}" +export BUCKET="${BUCKET:-tianlei-beam-test-bucket}" +export REGION="${REGION:-us-central1}" + +export TABLE_NAME="${TABLE_NAME:-validation_test}" +export SNAPSHOT_NAME="${SNAPSHOT_NAME:-validation_test_20200929}" +export SERVICE_ACCOUNT="${SERVICE_ACCOUNT:-295490517436-compute@developer.gserviceaccount.com}" + +export NUM_SHARDS="${NUM_SHARDS:-20}" + +export NETWORK="${NETWORK:-tianlei-network}" +export SUBNETWORK="${SUBNETWORK:-regions/us-central1/subnetworks/tianlei-network}" + +JAR_PATH="target/bigtable-beam-import-2.17.0-shaded.jar" + +# --- AUTO-PARALLEL MODE --- +if [ "$1" == "--all" ]; then + echo "🚀 Starting fully automated snapshot import..." + + # Step 1: Perform ONLY the restore step + echo "Step 1/2: Performing snapshot restore (blocking)..." + java -jar ${JAR_PATH} importsnapshot \ + --runner=DataflowRunner \ + --project=${PROJECT_ID} \ + --bigtableInstanceId=${INSTANCE_ID} \ + --bigtableTableId=${TABLE_NAME} \ + --importConfigFilePath=import-config-test.json \ + --stagingLocation=gs://${BUCKET}/dataflow/staging \ + --tempLocation=gs://${BUCKET}/dataflow/temp \ + --region=${REGION} \ + --performOnlyRestoreStep=true \ + --jobName="restore-job" \ + --network=${NETWORK} \ + --subnetwork=${SUBNETWORK} + + echo "Restore completed. Proceeding to data import." + + # Step 2: Launch parallel groups of 4 + echo "Step 2/2: Launching parallel groups of 4 shards..." + SHARDS_PER_GROUP=4 + + for (( start=0; start<$NUM_SHARDS; start+=$SHARDS_PER_GROUP )); do + end=$((start + SHARDS_PER_GROUP - 1)) + [ $end -ge $NUM_SHARDS ] && end=$((NUM_SHARDS - 1)) + + echo "Launching group: shards $start to $end in background" + # Call ourselves with the range! + $0 $start $end & + done + + echo "All groups launched. Waiting for all background jobs to finish..." + wait + echo "🎉 All import jobs completed!" + exit 0 +fi +# ---------------------------------------- + +# Standard Range Mode +for i in $(seq $START_SHARD $END_SHARD); do + echo "Submitting Dataflow job for shardIndex: $i" + + # We skip restore for all shards if running via --all because Step 1 handled it. + # If running manually via ranges, shard 0 will perform restore. + SKIP_RESTORE="true" + if [ $i -eq 0 ]; then + SKIP_RESTORE="false" + fi + + JOB="job-${i}" + java -jar ${JAR_PATH} importsnapshot \ + --runner=DataflowRunner \ + --project=${PROJECT_ID} \ + --bigtableInstanceId=${INSTANCE_ID} \ + --bigtableTableId=${TABLE_NAME} \ + --importConfigFilePath=import-config-test.json \ + --stagingLocation=gs://${BUCKET}/dataflow/staging \ + --tempLocation=gs://${BUCKET}/dataflow/temp \ + --workerMachineType=n1-highmem-4 \ + --diskSizeGb=500 \ + --maxNumWorkers=10 \ + --region=${REGION} \ + --serviceAccount=${SERVICE_ACCOUNT} \ + --usePublicIps=false \ + --enableSnappy=true \ + --skipRestoreStep=${SKIP_RESTORE} \ + --numShards=${NUM_SHARDS} \ + --shardIndex=$i \ + --jobName="${JOB}" \ + --network=${NETWORK} \ + --subnetwork=${SUBNETWORK} + + # Sequential within this script instance +done diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java index a01dd42baf..dbe401c2a4 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java @@ -333,8 +333,6 @@ static Pipeline buildPipelineWithMultipleSnapshots( return pipeline; } // Read records from hbase region files and write to Bigtable - // PCollection hbaseRecords = restoredSnapshots - // .apply("List Regions", new ListRegions()); PCollection>> hbaseRecords = restoredSnapshots .apply("List Regions", new ListRegions()) diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtils.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtils.java index cf4fa08438..54f2207931 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtils.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtils.java @@ -264,7 +264,6 @@ public static Map getSnapshotsFromSnapshotPath( // Build GCS path from given string e.g: // gs://sym-bucket/snapshots/20220309230526/.hbase-snapshot GcsPath gcsPath = GcsPath.fromUri(importSnapshotpath); - // LOG.info("GCS Path:" + gcsPath + ";Object:" + gcsPath.getObject()); Map snapshots = new HashMap<>(); List objects = diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotConfig.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotConfig.java index cef2557531..25f20a0677 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotConfig.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotConfig.java @@ -38,7 +38,6 @@ public static Builder builder() { public abstract String getSourceLocation(); - // public abstract Path getSourcePath(); @Memoized public Path getSourcePath() { return new Path(getSourceLocation()); diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ListRegions.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ListRegions.java index b0e1698866..4f7e0a6dcc 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ListRegions.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ListRegions.java @@ -67,7 +67,7 @@ private Map computeRegionSize(SnapshotManifest snapshotManifest) { regionsSize.put(regionManifest.getRegionInfo().getRegionId(), totalSize); } - return regionsSize; // (int)Math.ceil((totalSize * 1.0)/GIGA_BYTE); + return regionsSize; } /** diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ReadRegions.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ReadRegions.java index 7c2474a748..2cb8706347 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ReadRegions.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ReadRegions.java @@ -15,8 +15,6 @@ */ package com.google.cloud.bigtable.beam.hbasesnapshots.transforms; -// import com.google.cloud.bigtable.beam.hbasesnapshots.ImportSnapshots; - import com.google.api.core.InternalApi; import com.google.cloud.bigtable.beam.hbasesnapshots.conf.RegionConfig; import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig; @@ -189,17 +187,14 @@ public void processElement( boolean hasSplit = false; try (HBaseRegionScanner scanner = newScanner(regionConfig, tracker.currentRestriction())) { for (Result result = scanner.next(); result != null; result = scanner.next()) { - // if (flag==0 ) { if (tracker.tryClaim(ByteKey.copyFrom(result.getRow()))) { outputReceiver.output(KV.of(regionConfig.getSnapshotConfig(), result)); } else { hasSplit = true; break; } - // } } } - // if (!hasSplit) tracker.tryClaim(ByteKey.EMPTY); } @@ -357,13 +352,15 @@ public void processElement( throws IOException { if (element.getValue().listCells().isEmpty()) return; String targetTable = element.getKey().getTableName(); + String snapshotName = element.getKey().getSnapshotName(); // Limit the number of mutations per Put (server will reject >= 100k mutations per Put) byte[] rowKey = element.getValue().getRow(); List mutations = new ArrayList<>(); boolean logAndSkipIncompatibleRowMutations = - verifyRowMutationThresholds(rowKey, element.getValue().listCells(), mutations); + verifyRowMutationThresholds( + rowKey, element.getValue().listCells(), mutations, snapshotName); if (!logAndSkipIncompatibleRowMutations && mutations.size() > 0) { outputReceiver.output(KV.of(targetTable, mutations)); @@ -371,7 +368,8 @@ public void processElement( } private boolean verifyRowMutationThresholds( - byte[] rowKey, List cells, List mutations) throws IOException { + byte[] rowKey, List cells, List mutations, String snapshotName) + throws IOException { boolean logAndSkipIncompatibleRows = false; Put put = null; @@ -384,9 +382,10 @@ private boolean verifyRowMutationThresholds( // handle large cells if (filterLargeCells && cell.getValueLength() > filterLargeCellThresholdBytes) { - // TODO add config name in log LOG.warn( - "Dropping mutation, cell value length, " + "For snapshot " + + snapshotName + + ": Dropping mutation, cell value length, " + cell.getValueLength() + ", exceeds filter length, " + filterLargeCellThresholdBytes @@ -407,11 +406,12 @@ private boolean verifyRowMutationThresholds( cellCount++; } - // TODO add config name in log if (filterLargeRows && totalByteSize > filterLargeRowThresholdBytes) { logAndSkipIncompatibleRows = true; LOG.warn( - "Dropping row, row length, " + "For snapshot " + + snapshotName + + ": Dropping row, row length, " + totalByteSize + ", exceeds filter length threshold, " + filterLargeRowThresholdBytes @@ -419,11 +419,12 @@ private boolean verifyRowMutationThresholds( + Bytes.toStringBinary(rowKey)); } - // TODO add config name in log if (filterLargeRowKeys && rowKey.length > filterLargeRowKeysThresholdBytes) { logAndSkipIncompatibleRows = true; LOG.warn( - "Dropping row, row key length, " + "For snapshot " + + snapshotName + + ": Dropping row, row key length, " + rowKey.length + ", exceeds filter length threshold, " + filterLargeRowKeysThresholdBytes diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtilsTest.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtilsTest.java index 1af5aef156..b68228de15 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtilsTest.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtilsTest.java @@ -225,7 +225,6 @@ public void testgetSubSetSnapshotsFromSnapshotPath() throws IOException { getMatchingSnapshotsFromSnapshotPath(snapshotList, ".*attachments.*"); List expectedResult = snapshotList.stream().filter(e -> e.contains("attachments")).collect(Collectors.toList()); - // LOG.info("Matched:{} and expected:{}", snapshots.size(), expectedResult.size()); assertThat(snapshots.size(), is(equalTo(expectedResult.size()))); assertThat(snapshots.keySet(), containsInAnyOrder(expectedResult.toArray(new String[0]))); } diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/validation/ComputeAndValidateHashFromBigtableDoFnTest.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/validation/ComputeAndValidateHashFromBigtableDoFnTest.java new file mode 100644 index 0000000000..64e7c6959e --- /dev/null +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/validation/ComputeAndValidateHashFromBigtableDoFnTest.java @@ -0,0 +1,474 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.beam.validation; + +import static com.google.bigtable.repackaged.com.google.cloud.bigtable.admin.v2.models.GCRules.GCRULES; + +import com.google.bigtable.repackaged.com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; +import com.google.bigtable.repackaged.com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; +import com.google.bigtable.repackaged.com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; +import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration; +import com.google.cloud.bigtable.beam.validation.HadoopHashTableSource.RangeHash; +import com.google.cloud.bigtable.hbase.BigtableConfiguration; +import com.google.cloud.bigtable.hbase.BigtableOptionsFactory; +import com.google.cloud.bigtable.test.helper.BigtableEmulatorRule; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.BigtableTableHashAccessor.BigtableResultHasher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +public class ComputeAndValidateHashFromBigtableDoFnTest { + + private static final byte[] EMPTY_ROW_KEY = HConstants.EMPTY_BYTE_ARRAY; + protected final Logger LOG = LoggerFactory.getLogger(getClass()); + + public static final String FAKE_TABLE = "fake-table"; + private static final String ROW_KEY_PREFIX = "row-"; + private static final String VALUE_PREFIX = "value-"; + private static final byte[] EXTRA_VALUE = "add".getBytes(); + private static final byte[] CF = "cf".getBytes(); + private static final byte[] CF2 = "cf".getBytes(); + private static final byte[] COL = "col".getBytes(); + private static final long TS = 1000l; + private static final int FIRST_ROW_INDEX = 20; + private static final int LAST_ROW_INDEX = 31; + + @Rule public final BigtableEmulatorRule bigtableEmulator = new BigtableEmulatorRule(); + + @Rule public final transient TestPipeline p = TestPipeline.create(); + + private ComputeAndValidateHashFromBigtableDoFn doFn; + + // Clients that will be connected to the emulator + private BigtableTableAdminClient tableAdminClient; + + private Connection connection; + private Table table; + // Fake a TableHashWrapper. + private FakeTableHashWrapper fakeTableHashWrapper; + + private List hashes; + + @Before + public void setUp() throws IOException { + hashes = new ArrayList<>(); + // Initialize the clients to connect to the emulator + tableAdminClient = + BigtableTableAdminClient.create( + BigtableTableAdminSettings.newBuilderForEmulator(bigtableEmulator.getPort()) + .setProjectId("fake-project") + .setInstanceId("fake-instance") + .build()); + + CloudBigtableTableConfiguration config = + new CloudBigtableTableConfiguration.Builder() + .withProjectId("fake-project") + .withInstanceId("fake-instance") + .withTableId(FAKE_TABLE) + .withConfiguration( + BigtableOptionsFactory.BIGTABLE_EMULATOR_HOST_KEY, + "localhost:" + bigtableEmulator.getPort()) + .build(); + + connection = BigtableConfiguration.connect(config.toHBaseConfig()); + table = connection.getTable(TableName.valueOf(FAKE_TABLE)); + fakeTableHashWrapper = new FakeTableHashWrapper(); + // Scan all the cells for the column, HBase scan fetches 1 cell/column by default + fakeTableHashWrapper.scan = new Scan().setMaxVersions(); + + FakeTableHashWrapperFactory fakeFactory = new FakeTableHashWrapperFactory(fakeTableHashWrapper); + + doFn = + new ComputeAndValidateHashFromBigtableDoFn( + config, + StaticValueProvider.of(FAKE_TABLE), + StaticValueProvider.of("proj"), + StaticValueProvider.of("hash"), + fakeFactory); + + // Create a test table that can be used in tests + tableAdminClient.createTable( + CreateTableRequest.of(FAKE_TABLE) + .addFamily(new String(CF), GCRULES.maxVersions(100)) + .addFamily(new String(CF2), GCRULES.maxVersions(100))); + + p.getCoderRegistry().registerCoderForClass(RangeHash.class, new RangeHashCoder()); + + // Fill CBT table with data. + writeDataToTable(); + } + + @After + public void tearDown() throws IOException { + doFn.cleanupConnection(); + // TODO should we delete the table for each test? + tableAdminClient.deleteTable(FAKE_TABLE); + tableAdminClient.close(); + connection.close(); + } + + private byte[] getRowKey(int i) { + return (ROW_KEY_PREFIX + i).getBytes(); + } + + private byte[] getValue(int rowIndex, int cellIndex) { + return (VALUE_PREFIX + rowIndex + "-" + cellIndex).getBytes(); + } + + private void writeDataToTable() throws IOException { + List puts = new ArrayList<>(); + // Tests use the rows 21-30. Setup some extra data simulate the real world scenario where + // there will be other workitems working parallely on the table. + for (int i = 20; i < 32; i++) { + for (int j = 0; j < 2; j++) { + // Insert rows with 2 cells each + Put put = new Put(getRowKey(i)); + put.addColumn(CF, COL, TS + j, getValue(i, j)); + puts.add(put); + } + } + table.put(puts); + } + + /** Deletes the row range [startIndex, stopIndex) */ + private void deleteRange(int startIndex, int stopIndex) throws IOException { + for (int i = startIndex; i < stopIndex; i++) { + table.delete(new Delete(getRowKey(i))); + } + } + + // Creates a RangeHash for range [startRow, stopRow). + private RangeHash createHash(byte[] startRow, byte[] stopRow) throws IOException { + LOG.debug("Creating hash for rows " + startRow + " to " + stopRow); + BigtableResultHasher hasher = new BigtableResultHasher(); + hasher.startBatch(new ImmutableBytesWritable(startRow)); + + // Scan all the cells for a column. + Scan scan = new Scan().setMaxVersions().withStartRow(startRow).withStopRow(stopRow, false); + + // Read the rows from Bigtable and compute the expected hash. + for (Result result : table.getScanner(scan)) { + LOG.debug("Adding result to hash: " + result); + hasher.hashResult(result); + } + hasher.finishBatch(); + return RangeHash.of( + new ImmutableBytesWritable(startRow), + new ImmutableBytesWritable(stopRow), + hasher.getBatchHash()); + } + + private void validateCounters( + PipelineResult result, Long expectedMatches, Long expectedMismatches) { + MetricQueryResults metrics = result.metrics().allMetrics(); + Map counters = + StreamSupport.stream(metrics.getCounters().spliterator(), false) + .collect(Collectors.toMap((m) -> m.getName().getName(), (m) -> m.getAttempted())); + Assert.assertEquals(expectedMatches, counters.get("ranges_matched")); + Assert.assertEquals(expectedMismatches, counters.get("ranges_not_matched")); + } + + ////////// Happy case tests for various setups////////////////////// + @Test + public void testHashMatchesForMultipleRange() throws Exception { + hashes.add(createHash(getRowKey(21), getRowKey(24))); + hashes.add(createHash(getRowKey(24), getRowKey(28))); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(getRowKey(21)), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output).empty(); + PipelineResult result = p.run(); + validateCounters(result, 2L, 0L); + } + + @Test + public void testHashMatchesForSingleRange() throws Exception { + hashes.add(createHash(getRowKey(21), getRowKey(24))); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(getRowKey(21)), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output).containsInAnyOrder(); + PipelineResult result = p.run(); + validateCounters(result, 1L, 0L); + } + + @Test + public void testHashMatchesForFullTableScanWithMultipleRange() throws Exception { + hashes.add(createHash(EMPTY_ROW_KEY, getRowKey(24))); + hashes.add(createHash(getRowKey(24), EMPTY_ROW_KEY)); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(EMPTY_ROW_KEY), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output).empty(); + PipelineResult result = p.run(); + validateCounters(result, 2L, 0L); + } + + @Test + public void testHashMatchesForMultipleSingleRowRange() throws Exception { + hashes.add(createHash(getRowKey(22), getRowKey(23))); + hashes.add(createHash(getRowKey(23), getRowKey(24))); + hashes.add(createHash(getRowKey(24), getRowKey(25))); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(getRowKey(22)), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output).empty(); + PipelineResult result = p.run(); + validateCounters(result, 3L, 0L); + } + + ///////////////// Test mismatches when Bigtable has extra rows //////////////////// + @Test + public void testAdditionalCellInMiddle() throws Exception { + hashes.add(createHash(getRowKey(21), getRowKey(24))); + hashes.add(createHash(getRowKey(24), getRowKey(27))); + hashes.add(createHash(getRowKey(27), getRowKey(30))); + + // Add an extra cell in the table + table.put(new Put(getRowKey(25)).addColumn(CF, COL, EXTRA_VALUE)); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(getRowKey(21)), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output).containsInAnyOrder(hashes.get(1)); + PipelineResult result = p.run(); + validateCounters(result, 2L, 1L); + } + + @Test + public void testAdditionalRowsAtEnds() throws Exception { + hashes.add(createHash(EMPTY_ROW_KEY, getRowKey(24))); + hashes.add(createHash(getRowKey(24), getRowKey(27))); + hashes.add(createHash(getRowKey(27), EMPTY_ROW_KEY)); + + // Add an extra row in the beginning + table.put(new Put(getRowKey(1)).addColumn(CF, COL, EXTRA_VALUE)); + + // Add an extra row at the end. + table.put(new Put(getRowKey(5)).addColumn(CF, COL, EXTRA_VALUE)); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(EMPTY_ROW_KEY), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output).containsInAnyOrder(hashes.get(0), hashes.get(2)); + PipelineResult result = p.run(); + validateCounters(result, 1L, 2L); + } + + ///////////////////// Test different values /////////////////////////// + @Test + public void testDifferentValues() throws Exception { + hashes.add(createHash(EMPTY_ROW_KEY, getRowKey(21))); + hashes.add(createHash(getRowKey(21), getRowKey(23))); + hashes.add(createHash(getRowKey(23), getRowKey(25))); + hashes.add(createHash(getRowKey(25), getRowKey(27))); + hashes.add(createHash(getRowKey(27), EMPTY_ROW_KEY)); + + // Modify the CF + table.delete(new Delete(getRowKey(20)).addColumns(CF, COL, TS)); + table.put(new Put(getRowKey(1)).addColumn(CF2, COL, TS, getValue(20, 0))); + + // Modify the qualifier + table.delete(new Delete(getRowKey(22)).addColumns(CF, COL, TS)); + table.put(new Put(getRowKey(22)).addColumn(CF, "random-col".getBytes(), TS, getValue(22, 0))); + + // Modify the timestamp + table.delete(new Delete(getRowKey(24)).addColumns(CF, COL, TS)); + table.put(new Put(getRowKey(24)).addColumn(CF, COL, 1, getValue(24, 0))); + + // Modify the value + table.delete(new Delete(getRowKey(26)).addColumns(CF, COL, TS)); + table.put(new Put(getRowKey(26)).addColumn(CF, COL, getValue(26, 0))); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(EMPTY_ROW_KEY), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output) + .containsInAnyOrder(hashes.get(0), hashes.get(1), hashes.get(2), hashes.get(3)); + PipelineResult result = p.run(); + validateCounters(result, 1L, 4L); + } + + ////////////////// Tests with CBT missing data ////////////////////////////// + @Test + public void testMissingRows() throws Exception { + hashes.add(createHash(EMPTY_ROW_KEY, getRowKey(21))); + hashes.add(createHash(getRowKey(21), getRowKey(23))); + hashes.add(createHash(getRowKey(23), getRowKey(25))); + hashes.add(createHash(getRowKey(25), getRowKey(27))); + hashes.add(createHash(getRowKey(27), EMPTY_ROW_KEY)); + + // Delete a row at the beginning + table.delete(new Delete(getRowKey(FIRST_ROW_INDEX))); + + // Delete a row at the middle + table.delete(new Delete(getRowKey(24))); + + // Delete a row at the end + table.delete(new Delete(getRowKey(LAST_ROW_INDEX))); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(EMPTY_ROW_KEY), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output).containsInAnyOrder(hashes.get(0), hashes.get(2), hashes.get(4)); + PipelineResult result = p.run(); + validateCounters(result, 2L, 3L); + } + + @Test + public void testMissingRanges() throws Exception { + hashes.add(createHash(EMPTY_ROW_KEY, getRowKey(21))); + hashes.add(createHash(getRowKey(21), getRowKey(23))); + hashes.add(createHash(getRowKey(23), getRowKey(25))); + hashes.add(createHash(getRowKey(25), getRowKey(27))); + hashes.add(createHash(getRowKey(27), getRowKey(29))); + hashes.add(createHash(getRowKey(29), EMPTY_ROW_KEY)); + + // Delete a range at the beginning + deleteRange(FIRST_ROW_INDEX, 21); + + // Delete a range in middle + deleteRange(23, 25); + + // Delete row ranges at the end, bigtable scanner will finish with multiple row-ranges to + // process. + deleteRange(27, LAST_ROW_INDEX + 1); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(EMPTY_ROW_KEY), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output) + .containsInAnyOrder(hashes.get(0), hashes.get(2), hashes.get(4), hashes.get(5)); + PipelineResult result = p.run(); + validateCounters(result, 2L, 4L); + } + + @Test + public void testCbtEmpty() throws Exception { + hashes.add(createHash(EMPTY_ROW_KEY, getRowKey(25))); + hashes.add(createHash(getRowKey(25), getRowKey(29))); + hashes.add(createHash(getRowKey(29), EMPTY_ROW_KEY)); + + // Delete all data from bigtable + deleteRange(FIRST_ROW_INDEX, LAST_ROW_INDEX); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(EMPTY_ROW_KEY), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output).containsInAnyOrder(hashes); + PipelineResult result = p.run(); + validateCounters(result, 0L, 3L); + } + + ////////////////////// Test that scan is used from TableHash.//////////////////////// + @Test + public void testScanFromTableHash() throws Exception { + hashes.add(createHash(getRowKey(21), getRowKey(24))); + hashes.add(createHash(getRowKey(24), getRowKey(27))); + hashes.add(createHash(getRowKey(27), getRowKey(30))); + + // Update the TableHashWrapper Scan to default. Scan from HashTable.TableHash determines the + // cells used to compute hash. CBT has to use the same cells for validation. + fakeTableHashWrapper.scan = new Scan(); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(getRowKey(21)), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output).containsInAnyOrder(hashes); + PipelineResult result = p.run(); + validateCounters(result, 0L, 3L); + } + + ////////////////////// Combination of different cases ////////////////////////////////// + @Test + public void testMismatchesComprehensive() throws Exception { + hashes.add(createHash(EMPTY_ROW_KEY, getRowKey(21))); + hashes.add(createHash(getRowKey(21), getRowKey(23))); + hashes.add(createHash(getRowKey(23), getRowKey(25))); + hashes.add(createHash(getRowKey(25), getRowKey(27))); + hashes.add(createHash(getRowKey(27), getRowKey(29))); + hashes.add(createHash(getRowKey(29), EMPTY_ROW_KEY)); + + // Delete a range at the beginning from CBT + deleteRange(FIRST_ROW_INDEX, 21); + + // Delete a row in middle from CBT + table.delete(new Delete(getRowKey(23))); + + // Update a value in CBT + table.delete(new Delete(getRowKey(27)).addColumns(CF, COL, TS)); + table.put(new Put(getRowKey(27)).addColumn(CF, COL, getValue(27, 0))); + + // Add an extra row at the end. + table.put(new Put(getRowKey(5)).addColumn(CF, COL, EXTRA_VALUE)); + + PCollection>>> input = + p.apply(Create.of(KV.of(new String(EMPTY_ROW_KEY), Arrays.asList(hashes)))); + + PCollection output = input.apply(ParDo.of(doFn)); + PAssert.that(output) + .containsInAnyOrder(hashes.get(0), hashes.get(2), hashes.get(4), hashes.get(5)); + PipelineResult result = p.run(); + validateCounters(result, 2L, 4L); + } +}