Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public class TableSnapshotReadsMapReduceIT extends BaseTest {
private static final String RECORDING_YEAR = "RECORDING_YEAR";
private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
private static final String MAX_RECORDING = "MAX_RECORDING";
private final static String SNAPSHOT_NAME = "FOO";
private static final String FIELD1 = "FIELD1";
private static final String FIELD2 = "FIELD2";
private static final String FIELD3 = "FIELD3";
Expand All @@ -106,6 +105,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseTest {
private static List<List<Object>> result;
private long timestamp;
private String tableName;
private String snapshotName;
private Job job;
private Path tmpDir;
private Configuration conf;
Expand Down Expand Up @@ -133,6 +133,7 @@ public void before() throws SQLException, IOException {
// create table
try (Connection conn = DriverManager.getConnection(getUrl())) {
tableName = generateUniqueName();
snapshotName = "SNAP_" + tableName;
conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
conn.commit();
}
Expand All @@ -144,44 +145,44 @@ public void before() throws SQLException, IOException {

@Test
public void testMapReduceSnapshots() throws Exception {
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, snapshotName, tableName,
tmpDir, null, FIELD1, FIELD2, FIELD3);
configureJob(job, tableName, null, null, false);
}

@Test
public void testMapReduceSnapshotsMultiRegion() throws Exception {
String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD1 asc";
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, snapshotName, tableName,
tmpDir, inputQuery);
configureJob(job, tableName, null, null, true);
}

@Test
public void testMapReduceSnapshotsWithCondition() throws Exception {
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, snapshotName, tableName,
tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
configureJob(job, tableName, null, "FIELD3 > 0001", false);
}

@Test
public void testMapReduceSnapshotWithLimit() throws Exception {
String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1";
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, snapshotName, tableName,
tmpDir, inputQuery);
configureJob(job, tableName, inputQuery, null, false);
}

@Test
public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
// Submitting and asserting successful Map Reduce Job over snapshots
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, snapshotName, tableName,
tmpDir, null, FIELD1, FIELD2, FIELD3);
configureJob(job, tableName, null, null, false);

// Asserting that snapshot name is set in configuration
Configuration config = job.getConfiguration();
Assert.assertEquals("Correct snapshot name not found in configuration", SNAPSHOT_NAME,
Assert.assertEquals("Correct snapshot name not found in configuration", snapshotName,
config.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY));

TestingMapReduceParallelScanGrouper.clearNumCallsToGetRegionBoundaries();
Expand Down Expand Up @@ -311,7 +312,7 @@ public int compare(List<Object> o1, List<Object> o2) {
rs.next());
assertRestoreDirCount(conf, tmpDir.toString(), 1);
} finally {
deleteSnapshotIfExists(SNAPSHOT_NAME);
deleteSnapshotIfExists(snapshotName);
}
}

Expand Down Expand Up @@ -374,6 +375,7 @@ private void upsertRecord(PreparedStatement stmt, String field1, String field2,

private void upsertAndSnapshot(String tableName, boolean shouldSplit, Configuration configuration)
throws Exception {
deleteSnapshotIfExists(snapshotName);
if (shouldSplit) {
// having very few rows in table doesn't really help much with splitting case.
// we should upsert large no of rows as a prerequisite to splitting
Expand All @@ -390,7 +392,7 @@ private void upsertAndSnapshot(String tableName, boolean shouldSplit, Configurat
splitTableSync(admin, hbaseTableName, Bytes.toBytes("CCCC"), 2);
}

snapshotCreateSync(hbaseTableName, admin, SNAPSHOT_NAME);
snapshotCreateSync(hbaseTableName, admin, snapshotName);

List<SnapshotDescription> snapshots = admin.listSnapshots();
Assert.assertEquals(tableName, snapshots.get(0).getTableNameAsString());
Expand All @@ -409,7 +411,7 @@ private void upsertAndSnapshot(String tableName, boolean shouldSplit, Configurat
FileSystem fs = rootDir.getFileSystem(configuration);
Path restoreDir = new Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY));
RestoreSnapshotHelper.copySnapshotForScanner(configuration, fs, rootDir, restoreDir,
SNAPSHOT_NAME);
snapshotName);
PhoenixConfigurationUtil.setMRSnapshotManagedExternally(configuration, true);
}
}
Expand Down