Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
Expand All @@ -37,10 +38,14 @@
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.jdbc.PhoenixConnection;
Expand Down Expand Up @@ -68,6 +73,79 @@ public abstract class LogicalTableNameBaseIT extends BaseTest {
public static final String NEW_TABLE_PREFIX = "NEW_TBL_";
private Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);

/**
* Take a snapshot, retrying if the master rejects the request because another snapshot for the
* same table is already in flight. The "already running another snapshot on the same table"
* rejection is most commonly produced by RPC-level retries: the original Admin.snapshot() RPC
* has been accepted by the master and the snapshot procedure is in flight, but the client
* retried the call and the master rejects the duplicate. In that case we poll listSnapshots()
* until the snapshot we wanted appears (or until we time out and try again from scratch). This
* keeps these snapshot+clone tests deterministic on slower or busy hardware.
*/
protected static void snapshotWithRetry(Admin admin, String snapshotName, TableName tableName)
throws IOException {
final long alreadyRunningWaitMs = TimeUnit.MINUTES.toMillis(2L);
final int maxAttempts = 5;
final long backoffMs = 1000L;
IOException lastError = null;
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try {
admin.snapshot(snapshotName, tableName);
return;
} catch (SnapshotCreationException e) {
if (!isAlreadyRunningSnapshot(e)) {
throw e;
}
if (waitForSnapshotToAppear(admin, snapshotName, alreadyRunningWaitMs)) {
return;
}
lastError = e;
}
try {
Thread.sleep(backoffMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
if (lastError != null) {
throw lastError;
}
}

private static boolean isAlreadyRunningSnapshot(Throwable t) {
for (Throwable cur = t; cur != null; cur = cur.getCause()) {
String msg = cur.getMessage();
if (msg != null && msg.contains("already running another snapshot on the same table")) {
return true;
}
}
return false;
}

private static boolean waitForSnapshotToAppear(Admin admin, String snapshotName, long timeoutMs) {
long deadline = System.currentTimeMillis() + timeoutMs;
Pattern pattern = Pattern.compile(Pattern.quote(snapshotName));
while (System.currentTimeMillis() < deadline) {
try {
for (SnapshotDescription d : admin.listSnapshots(pattern)) {
if (snapshotName.equals(d.getName())) {
return true;
}
}
} catch (IOException ioe) {
// transient; keep polling
}
try {
Thread.sleep(500L);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}

static void initCluster(boolean isNamespaceMapped) throws Exception {
Map<String, String> props = Maps.newConcurrentMap();
props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
Expand Down Expand Up @@ -105,7 +183,7 @@ public static void createAndPointToNewPhysicalTable(Connection conn, String full

try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {

admin.snapshot(snapshotName, TableName.valueOf(fullTableHName));
snapshotWithRetry(admin, snapshotName, TableName.valueOf(fullTableHName));
admin.cloneSnapshot(snapshotName, TableName.valueOf(fullNewTableHName));
admin.deleteSnapshot(snapshotName);
LogicalTableNameIT.renameAndDropPhysicalTable(conn, null, schemaName, tableName, newTableName,
Expand All @@ -130,7 +208,7 @@ protected HashMap<String, ArrayList<String>> testBaseTableWithIndex_BaseTableCha
String fullNewTableName = SchemaUtil.getTableName(schemaName, newTableName);
try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
String snapshotName = new StringBuilder(fullTableName).append("-Snapshot").toString();
admin.snapshot(snapshotName, TableName.valueOf(fullTableName));
snapshotWithRetry(admin, snapshotName, TableName.valueOf(fullTableName));
admin.cloneSnapshot(snapshotName, TableName.valueOf(fullNewTableName));
admin.deleteSnapshot(snapshotName);
try (Table htable = conn.unwrap(PhoenixConnection.class).getQueryServices()
Expand Down Expand Up @@ -185,7 +263,7 @@ protected HashMap<String, ArrayList<String>> test_IndexTableChange(Connection co
}
try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
String snapshotName = new StringBuilder(indexName).append("-Snapshot").toString();
admin.snapshot(snapshotName, TableName.valueOf(fullIndexTableHbaseName));
snapshotWithRetry(admin, snapshotName, TableName.valueOf(fullIndexTableHbaseName));
admin.cloneSnapshot(snapshotName, TableName.valueOf(fullNewTableName));
admin.deleteSnapshot(snapshotName);
try (Table htable = conn.unwrap(PhoenixConnection.class).getQueryServices()
Expand Down Expand Up @@ -245,7 +323,7 @@ protected HashMap<String, ArrayList<String>> testWithViewsAndIndex_BaseTableChan
}
try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
String snapshotName = new StringBuilder(fullTableName).append("-Snapshot").toString();
admin.snapshot(snapshotName, TableName.valueOf(fullTableHbaseName));
snapshotWithRetry(admin, snapshotName, TableName.valueOf(fullTableHbaseName));
admin.cloneSnapshot(snapshotName, TableName.valueOf(fullNewTableName));
admin.deleteSnapshot(snapshotName);
try (Table htable = conn.unwrap(PhoenixConnection.class).getQueryServices()
Expand Down