Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions bigtable-dataflow-parent/bigtable-beam-import/run-snapshot-import.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#!/bin/bash

# This script runs a range of Dataflow snapshot import jobs sequentially.
# It should be executed from the 'bigtable-dataflow-parent/bigtable-beam-import' directory.
#
# Usage: ./run-snapshot-import.sh <start_shard> <end_shard>
# Or: ./run-snapshot-import.sh --all
# Example: ./run-snapshot-import.sh 0 3
# Example: ./run-snapshot-import.sh --all
#
# You can override default configurations by setting environment variables in your terminal.
# Example: TABLE_NAME="my-table" SNAPSHOT_NAME="my-snap" ./run-snapshot-import.sh 0 3
#
# NOTE: If you are running on a newer JDK (like Java 21 or 26) and hit ByteBuddy errors,
# you can add '-Dnet.bytebuddy.experimental=true' to the java command lines below.
#
# --- Manual Parallel Execution ---
# To run shards in parallel groups of 4 (assuming 20 shards total), you can run 5 instances of this script.
#
# IMPORTANT: Shard 0 performs the restore step. You MUST run the first group (including shard 0)
# first and let it complete the restore step before launching other groups in parallel,
# otherwise they will fail because the restored files won't exist yet!
#
# Example for manual parallel execution:
# ./run-snapshot-import.sh 0 3 & # Run this first!
# # Wait for shard 0 to finish restore, then run the rest:
# ./run-snapshot-import.sh 4 7 &
# ./run-snapshot-import.sh 8 11 &
# ./run-snapshot-import.sh 12 15 &
# ./run-snapshot-import.sh 16 19 &
#
# --- Automated Parallel Execution ---
# Alternatively, use the --all flag to automatically handle the restore step and launch all groups:
# ./run-snapshot-import.sh --all

if [ "$#" -ne 2 ] && [ "$1" != "--all" ]; then
echo "Usage: $0 <start_shard> <end_shard>"
echo " Or: $0 --all"
exit 1
fi

START_SHARD=$1
END_SHARD=$2

# Configurations (Uses environment variables if set, otherwise defaults)
export PROJECT_ID="${PROJECT_ID:-google.com:cloud-bigtable-dev}"
export INSTANCE_ID="${INSTANCE_ID:-tianlei-test-inst}"
export BUCKET="${BUCKET:-tianlei-beam-test-bucket}"
export REGION="${REGION:-us-central1}"

export TABLE_NAME="${TABLE_NAME:-validation_test}"
export SNAPSHOT_NAME="${SNAPSHOT_NAME:-validation_test_20200929}"
export SERVICE_ACCOUNT="${SERVICE_ACCOUNT:-295490517436-compute@developer.gserviceaccount.com}"

export NUM_SHARDS="${NUM_SHARDS:-20}"

export NETWORK="${NETWORK:-tianlei-network}"
export SUBNETWORK="${SUBNETWORK:-regions/us-central1/subnetworks/tianlei-network}"

JAR_PATH="target/bigtable-beam-import-2.17.0-shaded.jar"

# --- AUTO-PARALLEL MODE ---
if [ "$1" == "--all" ]; then
echo "🚀 Starting fully automated snapshot import..."

# Step 1: Perform ONLY the restore step
echo "Step 1/2: Performing snapshot restore (blocking)..."
java -jar ${JAR_PATH} importsnapshot \
--runner=DataflowRunner \
--project=${PROJECT_ID} \
--bigtableInstanceId=${INSTANCE_ID} \
--bigtableTableId=${TABLE_NAME} \
--importConfigFilePath=import-config-test.json \
--stagingLocation=gs://${BUCKET}/dataflow/staging \
--tempLocation=gs://${BUCKET}/dataflow/temp \
--region=${REGION} \
--performOnlyRestoreStep=true \
--jobName="restore-job" \
--network=${NETWORK} \
--subnetwork=${SUBNETWORK}

echo "Restore completed. Proceeding to data import."

# Step 2: Launch parallel groups of 4
echo "Step 2/2: Launching parallel groups of 4 shards..."
SHARDS_PER_GROUP=4

for (( start=0; start<$NUM_SHARDS; start+=$SHARDS_PER_GROUP )); do
end=$((start + SHARDS_PER_GROUP - 1))
[ $end -ge $NUM_SHARDS ] && end=$((NUM_SHARDS - 1))

echo "Launching group: shards $start to $end in background"
# Call ourselves with the range!
$0 $start $end &
done

echo "All groups launched. Waiting for all background jobs to finish..."
wait
echo "🎉 All import jobs completed!"
exit 0
fi
# ----------------------------------------

# Standard Range Mode
for i in $(seq $START_SHARD $END_SHARD); do
echo "Submitting Dataflow job for shardIndex: $i"

# We skip restore for all shards if running via --all because Step 1 handled it.
# If running manually via ranges, shard 0 will perform restore.
SKIP_RESTORE="true"
if [ $i -eq 0 ]; then
SKIP_RESTORE="false"
fi

JOB="job-${i}"
java -jar ${JAR_PATH} importsnapshot \
--runner=DataflowRunner \
--project=${PROJECT_ID} \
--bigtableInstanceId=${INSTANCE_ID} \
--bigtableTableId=${TABLE_NAME} \
--importConfigFilePath=import-config-test.json \
--stagingLocation=gs://${BUCKET}/dataflow/staging \
--tempLocation=gs://${BUCKET}/dataflow/temp \
--workerMachineType=n1-highmem-4 \
--diskSizeGb=500 \
--maxNumWorkers=10 \
--region=${REGION} \
--serviceAccount=${SERVICE_ACCOUNT} \
--usePublicIps=false \
--enableSnappy=true \
--skipRestoreStep=${SKIP_RESTORE} \
--numShards=${NUM_SHARDS} \
--shardIndex=$i \
--jobName="${JOB}" \
--network=${NETWORK} \
--subnetwork=${SUBNETWORK}

# Sequential within this script instance
done
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,6 @@ static Pipeline buildPipelineWithMultipleSnapshots(
return pipeline;
}
// Read records from hbase region files and write to Bigtable
// PCollection<RegionConfig> hbaseRecords = restoredSnapshots
// .apply("List Regions", new ListRegions());
PCollection<KV<String, Iterable<Mutation>>> hbaseRecords =
restoredSnapshots
.apply("List Regions", new ListRegions())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ public static Map<String, String> getSnapshotsFromSnapshotPath(
// Build GCS path from given string e.g:
// gs://sym-bucket/snapshots/20220309230526/.hbase-snapshot
GcsPath gcsPath = GcsPath.fromUri(importSnapshotpath);
// LOG.info("GCS Path:" + gcsPath + ";Object:" + gcsPath.getObject());
Map<String, String> snapshots = new HashMap<>();

List<StorageObject> objects =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public static Builder builder() {

public abstract String getSourceLocation();

// public abstract Path getSourcePath();
@Memoized
public Path getSourcePath() {
return new Path(getSourceLocation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private Map<Long, Long> computeRegionSize(SnapshotManifest snapshotManifest) {
regionsSize.put(regionManifest.getRegionInfo().getRegionId(), totalSize);
}

return regionsSize; // (int)Math.ceil((totalSize * 1.0)/GIGA_BYTE);
return regionsSize;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.google.cloud.bigtable.beam.hbasesnapshots.transforms;

// import com.google.cloud.bigtable.beam.hbasesnapshots.ImportSnapshots;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.beam.hbasesnapshots.conf.RegionConfig;
import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
Expand Down Expand Up @@ -189,17 +187,14 @@ public void processElement(
boolean hasSplit = false;
try (HBaseRegionScanner scanner = newScanner(regionConfig, tracker.currentRestriction())) {
for (Result result = scanner.next(); result != null; result = scanner.next()) {
// if (flag==0 ) {
if (tracker.tryClaim(ByteKey.copyFrom(result.getRow()))) {
outputReceiver.output(KV.of(regionConfig.getSnapshotConfig(), result));
} else {
hasSplit = true;
break;
}
// }
}
}
// if (!hasSplit)
tracker.tryClaim(ByteKey.EMPTY);
}

Expand Down Expand Up @@ -357,21 +352,24 @@ public void processElement(
throws IOException {
if (element.getValue().listCells().isEmpty()) return;
String targetTable = element.getKey().getTableName();
String snapshotName = element.getKey().getSnapshotName();

// Limit the number of mutations per Put (server will reject >= 100k mutations per Put)
byte[] rowKey = element.getValue().getRow();
List<Mutation> mutations = new ArrayList<>();

boolean logAndSkipIncompatibleRowMutations =
verifyRowMutationThresholds(rowKey, element.getValue().listCells(), mutations);
verifyRowMutationThresholds(
rowKey, element.getValue().listCells(), mutations, snapshotName);

if (!logAndSkipIncompatibleRowMutations && mutations.size() > 0) {
outputReceiver.output(KV.of(targetTable, mutations));
}
}

private boolean verifyRowMutationThresholds(
byte[] rowKey, List<Cell> cells, List<Mutation> mutations) throws IOException {
byte[] rowKey, List<Cell> cells, List<Mutation> mutations, String snapshotName)
throws IOException {
boolean logAndSkipIncompatibleRows = false;

Put put = null;
Expand All @@ -384,9 +382,10 @@ private boolean verifyRowMutationThresholds(

// handle large cells
if (filterLargeCells && cell.getValueLength() > filterLargeCellThresholdBytes) {
// TODO add config name in log
LOG.warn(
"Dropping mutation, cell value length, "
"For snapshot "
+ snapshotName
+ ": Dropping mutation, cell value length, "
+ cell.getValueLength()
+ ", exceeds filter length, "
+ filterLargeCellThresholdBytes
Expand All @@ -407,23 +406,25 @@ private boolean verifyRowMutationThresholds(
cellCount++;
}

// TODO add config name in log
if (filterLargeRows && totalByteSize > filterLargeRowThresholdBytes) {
logAndSkipIncompatibleRows = true;
LOG.warn(
"Dropping row, row length, "
"For snapshot "
+ snapshotName
+ ": Dropping row, row length, "
+ totalByteSize
+ ", exceeds filter length threshold, "
+ filterLargeRowThresholdBytes
+ ", row key: "
+ Bytes.toStringBinary(rowKey));
}

// TODO add config name in log
if (filterLargeRowKeys && rowKey.length > filterLargeRowKeysThresholdBytes) {
logAndSkipIncompatibleRows = true;
LOG.warn(
"Dropping row, row key length, "
"For snapshot "
+ snapshotName
+ ": Dropping row, row key length, "
+ rowKey.length
+ ", exceeds filter length threshold, "
+ filterLargeRowKeysThresholdBytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ public void testgetSubSetSnapshotsFromSnapshotPath() throws IOException {
getMatchingSnapshotsFromSnapshotPath(snapshotList, ".*attachments.*");
List<String> expectedResult =
snapshotList.stream().filter(e -> e.contains("attachments")).collect(Collectors.toList());
// LOG.info("Matched:{} and expected:{}", snapshots.size(), expectedResult.size());
assertThat(snapshots.size(), is(equalTo(expectedResult.size())));
assertThat(snapshots.keySet(), containsInAnyOrder(expectedResult.toArray(new String[0])));
}
Expand Down
Loading