Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Bulk import makes requests of tablet servers, and those requests can take a long time. Our
Expand All @@ -61,6 +64,7 @@ class BulkImportMove extends ManagerRepo {

private static final long serialVersionUID = 1L;

private static final Logger log = LoggerFactory.getLogger(BulkImportMove.class);
private final BulkInfo bulkInfo;

public BulkImportMove(BulkInfo bulkInfo) {
Expand All @@ -74,6 +78,12 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {

VolumeManager fs = manager.getVolumeManager();

if (log.isTraceEnabled()) {
FileStatus[] files = fs.listStatus(sourceDir);
log.trace("{} bulk import move starting. source:{} dest:{} files to move:{}",
FateTxId.formatTid(tid), sourceDir, bulkDir, files == null ? 0 : files.length);
}

if (bulkInfo.tableState == TableState.ONLINE) {
ZooArbitrator.start(manager.getContext(), Constants.BULK_ARBITRATOR_TYPE, tid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;

import java.util.ArrayList;
import java.util.List;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
Expand Down Expand Up @@ -109,6 +112,7 @@ public long isReady(long tid, Manager manager) throws Exception {

int tabletsToWaitFor = 0;
int tabletCount = 0;
List<String> waitingDetail = log.isTraceEnabled() ? new ArrayList<>() : null;

TabletsMetadata tablets = TabletsMetadata.builder(manager.getContext()).forTable(tableId)
.overlapping(startRow, endRow).fetch(LOCATION, PREV_ROW, COMPACT_ID).build();
Expand All @@ -119,6 +123,11 @@ public long isReady(long tid, Manager manager) throws Exception {
if (tablet.hasCurrent()) {
serversToFlush.increment(tablet.getLocation().getServerInstance(), 1);
}
if (waitingDetail != null && waitingDetail.size() < 5) {
String loc = tablet.hasCurrent() ? tablet.getLocation().getServerInstance().getHostPort()
: "no location";
waitingDetail.add(tablet.getExtent() + "@" + loc);
}
}

tabletCount++;
Expand Down Expand Up @@ -179,6 +188,15 @@ public long isReady(long tid, Manager manager) throws Exception {
"{} tablets compacted:{}/{} servers contacted:{} expected id:{} compaction extent:{} sleepTime:{}",
FateTxId.formatTid(tid), tabletCount - tabletsToWaitFor, tabletCount,
serversToFlush.size(), compactId, extent, sleepTime);

StringBuilder sb = new StringBuilder();
waitingDetail.forEach(d -> sb.append("\n waiting on ").append(d));
if (tabletsToWaitFor > waitingDetail.size()) {
sb.append(String.format("\n ... and %d more", tabletsToWaitFor - waitingDetail.size()));
}
if (sb.length() > 0) {
log.trace("{}", sb);
}
}

return sleepTime;
Expand Down