Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,20 @@ public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {

for (Map.Entry<Integer, List<String>> e : grpsCfgs.grpToNodes.entrySet()) {
int grp = e.getKey();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should rename grp -> grpId - both here and in the log messages?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grp must be used for group name in logs.

String grpName = grpsCfgs.grpIdToName.get(grp);

for (String node : e.getValue()) {
for (int part : dump.partitions(node, grp)) {
if (grps != null && !grps.get(grp).add(part)) {
log.info("Skip copy partition [node=" + node + ", grp=" + grp + ", part=" + part + ']');
log.info("Skip copy partition [node=" + node + ", grp=" + grpName + ", part=" + part + ']');

continue;
}

Runnable consumePart = () -> {
if (skip.get()) {
if (log.isDebugEnabled()) {
log.debug("Skip partition due to previous error [node=" + node + ", grp=" + grp +
log.debug("Skip partition due to previous error [node=" + node + ", grp=" + grpName +
", part=" + part + ']');
}

Expand All @@ -150,7 +151,7 @@ public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {

try (DumpedPartitionIterator iter = dump.iterator(node, grp, part, grpsCfgs.cacheIds)) {
if (log.isDebugEnabled()) {
log.debug("Consuming partition [node=" + node + ", grp=" + grp +
log.debug("Consuming partition [node=" + node + ", grp=" + grpName +
", part=" + part + ']');
}

Expand All @@ -159,7 +160,7 @@ public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {
catch (Exception ex) {
skip.set(cfg.failFast());

log.error("Error consuming partition [node=" + node + ", grp=" + grp +
log.error("Error consuming partition [node=" + node + ", grp=" + grpName +
", part=" + part + ']', ex);

throw new IgniteException(ex);
Expand Down Expand Up @@ -365,9 +366,19 @@ private static GridKernalContext standaloneKernalContext(SnapshotFileTree sft, I
private GroupsConfigs groupsConfigs(Dump dump) {
Map<Integer, List<String>> grpsToNodes = new HashMap<>();
List<StoredCacheData> ccfgs = new ArrayList<>();
Map<Integer, String> grpIdToName = new HashMap<>();

Set<Integer> grpIds = cfg.groupNames() != null
? Arrays.stream(cfg.groupNames()).map(CU::cacheId).collect(Collectors.toSet())
? Arrays.stream(cfg.groupNames())
.map(grpName -> {
int grpId = CU.cacheId(grpName);

if (!grpIdToName.containsKey(grpId))
grpIdToName.put(grpId, grpName);

return grpId;
})
.collect(Collectors.toSet())
: null;

Set<Integer> cacheIds = cfg.cacheNames() != null
Expand All @@ -392,11 +403,14 @@ private GroupsConfigs groupsConfigs(Dump dump) {
}

grpsToNodes.get(grp).add(meta.folderName());

if (!grpIdToName.containsKey(grp))
grpIdToName.put(grp, grpCaches.get(0).configuration().getGroupName());
}
}

// Optimize - skip whole cache if only one in group!
return new GroupsConfigs(grpsToNodes, ccfgs, cacheIds);
return new GroupsConfigs(grpsToNodes, ccfgs, cacheIds, grpIdToName);
}

/** */
Expand All @@ -410,11 +424,20 @@ private static class GroupsConfigs {
/** Cache ids. */
public final Set<Integer> cacheIds;

/** Mapping from group id to group name. */
public final Map<Integer, String> grpIdToName;

/** */
public GroupsConfigs(Map<Integer, List<String>> grpToNodes, Collection<StoredCacheData> cacheCfgs, Set<Integer> cacheIds) {
public GroupsConfigs(
Map<Integer, List<String>> grpToNodes,
Collection<StoredCacheData> cacheCfgs,
Set<Integer> cacheIds,
Map<Integer, String> grpIdToName
) {
this.grpToNodes = grpToNodes;
this.cacheCfgs = cacheCfgs;
this.cacheIds = cacheIds;
this.grpIdToName = grpIdToName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_TRANSFER_RATE_DMS_KEY;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.CACHE_0;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.DMP_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.GRP;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.KEYS_CNT;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.USER_FACTORY;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.dump;
Expand Down Expand Up @@ -1296,6 +1297,185 @@ public void testConfigOnlySnapshotThrows() throws Exception {
}
}

/** */
@Test
public void testDumpReaderDebugLogsGroupName() throws Exception {
checkDumpReaderDebugLogsGroupName(new String[]{GRP});
}

/** */
@Test
public void testDumpReaderDebugLogsNullableGroupName() throws Exception {
checkDumpReaderDebugLogsGroupName(null);
}

/** */
@Test
public void testDumpReaderSkipCopiesLogsGroupName() throws Exception {
int parts = 4;
String dumpName0 = "dump0";
String dumpName1 = "dump1";

File snapshotPath0 = Files.createTempDirectory("snapshots0").toFile();
File snapshotPath1 = Files.createTempDirectory("snapshots1").toFile();
File combinedDumpDir = Files.createTempDirectory("combined_dump").toFile();

ListeningTestLogger testLog = new ListeningTestLogger(log);

LogListener skipLsnr = LogListener.matches("Skip copy partition")
.times(parts)
.andMatches("grp=" + GRP)
.build();

testLog.registerListener(skipLsnr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty line before.


try {
IgniteEx node0 = startGrid(getConfiguration("node0")
.setConsistentId("node0")
.setSnapshotPath(snapshotPath0.getAbsolutePath())
.setGridLogger(testLog));

IgniteEx node1 = startGrid(getConfiguration("node1")
.setConsistentId("node1")
.setSnapshotPath(snapshotPath1.getAbsolutePath())
.setGridLogger(testLog));

node0.cluster().state(ClusterState.ACTIVE);

CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<Integer, Integer>()
.setName(DEFAULT_CACHE_NAME)
.setGroupName(GRP)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction().setPartitions(parts));

IgniteCache<Integer, Integer> cache = node0.createCache(ccfg);

IntStream.range(0, KEYS_CNT).forEach(i -> cache.put(i, i));

node0.snapshot().createDump(dumpName0, null).get(getTestTimeout());

U.sleep(100);

node1.snapshot().createDump(dumpName1, null).get(getTestTimeout());

File dumpDir0 = new File(snapshotPath0, dumpName0);
File dumpDir1 = new File(snapshotPath1, dumpName1);

U.copy(dumpDir0, combinedDumpDir, true);
U.copy(dumpDir1, combinedDumpDir, true);

DumpConsumer dummyConsumer = new DumpConsumer() {
@Override public void start() {
// No-op.
}

@Override public void onMappings(Iterator<TypeMapping> mappings) {
// No-op.
}

@Override public void onTypes(Iterator<BinaryType> types) {
// No-op.
}

@Override public void onCacheConfigs(Iterator<StoredCacheData> caches) {
// No-op.
}

@Override public void onPartition(int grpId, int partId, Iterator<DumpEntry> data) {
// No-op.
}

@Override public void stop() {
// No-op.
}
};

new DumpReader(
new DumpReaderConfiguration(
null,
combinedDumpDir.getAbsolutePath(),
null,
dummyConsumer,
DFLT_THREAD_CNT,
DFLT_TIMEOUT,
false,
true,
true,
new String[]{GRP},
null,
true,
null
),
testLog
).run();

assertTrue(skipLsnr.check());
}
finally {
U.delete(snapshotPath0);
U.delete(snapshotPath1);
U.delete(combinedDumpDir);
}
}

/** */
private void checkDumpReaderDebugLogsGroupName(String[] grpNames) throws Exception {
String id = "test";

setLoggerDebugLevel();

ListeningTestLogger testLog = new ListeningTestLogger(log);

LogListener errLsnr = LogListener.matches("Error consuming partition").andMatches("grp=" + GRP).build();
LogListener cnsmLsnr = LogListener.matches("Consuming partition").andMatches("grp=" + GRP).build();

testLog.registerListener(errLsnr);
testLog.registerListener(cnsmLsnr);

IgniteEx ign = startGrid(getConfiguration(id).setConsistentId(id).setGridLogger(testLog));

ign.cluster().state(ClusterState.ACTIVE);

IgniteCache<Integer, Integer> cache = ign.createCache(new CacheConfiguration<Integer, Integer>()
.setName(CACHE_0)
.setGroupName(GRP)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction().setPartitions(3))
);

IntStream.range(0, KEYS_CNT).forEach(i -> cache.put(i, i));

ign.snapshot().createDump(DMP_NAME, null).get(getTestTimeout());

TestDumpConsumer cnsmr = new TestDumpConsumer() {
@Override public void onPartition(int grp, int part, Iterator<DumpEntry> data) {
throw new RuntimeException("trigger error log");
}
};

assertThrows(null, () -> new DumpReader(
new DumpReaderConfiguration(
DMP_NAME,
null,
ign.configuration(),
cnsmr,
DFLT_THREAD_CNT,
DFLT_TIMEOUT,
true,
true,
false,
grpNames,
null,
false,
null
),
testLog
).run(), RuntimeException.class, "trigger error log");

assertTrue("Log with group name not found", errLsnr.check());
assertTrue("Consuming with group name not found", cnsmLsnr.check());
}

/** */
public class TestCacheConflictResolutionManager<K, V> extends GridCacheManagerAdapter<K, V>
implements CacheConflictResolutionManager<K, V> {
Expand Down