diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 849e87175e346..0927dd3e79b3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -773,6 +773,16 @@ private IgniteInternalFuture initLocalSnapshotStartSt "Another snapshot operation in progress [req=" + req + ", curr=" + curSnpOp + ']')); } + // Let's keep the metrics on any node. + if (clusterSnpFut == null) { + clusterSnpFut = new ClusterSnapshotFuture(req.reqId, req.snpName, req.incremental() ? req.incrementIndex() : null); + + if (req.incremental()) + lastSeenIncSnpFut = clusterSnpFut; + else + lastSeenSnpFut = clusterSnpFut; + } + SnapshotOperation snpOp = new SnapshotOperation(req, new SnapshotFileTree(cctx.kernalContext(), req.snapshotName(), req.snapshotPath())); @@ -1116,7 +1126,9 @@ private void processLocalSnapshotStartStageResult(UUID id, Map snpName = mreg0.findMetric("LastSnapshotName"); - ObjectGauge errMsg = mreg0.findMetric("LastSnapshotErrorMessage"); - ObjectGauge> snpList = mreg0.findMetric("LocalSnapshotNames"); - // Snapshot process will be blocked when delta partition files processing starts. snp(ignite).localSnapshotSenderFactory( blockingLocalSnapshotSender(ignite, deltaApply, deltaBlock)); - assertEquals("Snapshot start time must be undefined prior to snapshot operation started.", - 0, startTime.value()); - assertEquals("Snapshot end time must be undefined to snapshot operation started.", - 0, endTime.value()); - assertTrue("Snapshot name must not exist prior to snapshot operation started.", snpName.value().isEmpty()); - assertTrue("Snapshot error message must null prior to snapshot operation started.", errMsg.value().isEmpty()); - assertTrue("Snapshots on local node must not exist", snpList.value().isEmpty()); + for (Ignite g : G.allGrids()) { + MetricRegistry mreg = ((IgniteEx)g).context().metric().registry(SNAPSHOT_METRICS); + + LongMetric startTime = mreg.findMetric("LastSnapshotStartTime"); + LongMetric endTime = mreg.findMetric("LastSnapshotEndTime"); + ObjectGauge snpName = mreg.findMetric("LastSnapshotName"); + ObjectGauge errMsg = mreg.findMetric("LastSnapshotErrorMessage"); + ObjectGauge> snpList = mreg.findMetric("LocalSnapshotNames"); + + assertEquals("Snapshot start time must be undefined prior to snapshot operation started.", + 0, startTime.value()); + assertEquals("Snapshot end time must be undefined to snapshot operation started.", + 0, endTime.value()); + assertTrue("Snapshot name must not exist prior to snapshot operation started.", snpName.value().isEmpty()); + assertTrue("Snapshot error message must null prior to snapshot operation started.", errMsg.value().isEmpty()); + assertTrue("Snapshots on local node must not exist", snpList.value().isEmpty()); + } long cutoffStartTime = U.currentTimeMillis(); - IgniteFuture fut0 = snp(ignite).createSnapshot(SNAPSHOT_NAME, null, false, onlyPrimary); + IgniteFuture fut = snp(ignite).createSnapshot(SNAPSHOT_NAME, null, false, onlyPrimary); U.await(deltaApply); - assertTrue("Snapshot start time must be set prior to snapshot operation started " + - "[startTime=" + startTime.value() + ", cutoffTime=" + cutoffStartTime + ']', - startTime.value() >= cutoffStartTime); - assertEquals("Snapshot end time must be zero prior to snapshot operation started.", - 0, endTime.value()); - assertEquals("Snapshot name must be set prior to snapshot operation started.", - SNAPSHOT_NAME, snpName.value()); - assertTrue("Snapshot error message must null prior to snapshot operation started.", - errMsg.value().isEmpty()); - - IgniteFuture fut1 = snp(grid(1)).createSnapshot(newSnapshotName, null, false, onlyPrimary); - - assertThrowsWithCause((Callable)fut1::get, IgniteException.class); + for (Ignite g : G.allGrids()) { + MetricRegistry mreg = ((IgniteEx)g).context().metric().registry(SNAPSHOT_METRICS); + + LongMetric startTime = mreg.findMetric("LastSnapshotStartTime"); + LongMetric endTime = mreg.findMetric("LastSnapshotEndTime"); + ObjectGauge snpName = mreg.findMetric("LastSnapshotName"); + ObjectGauge errMsg = mreg.findMetric("LastSnapshotErrorMessage"); + + assertTrue("Snapshot start time must be set prior to snapshot operation started " + + "[startTime=" + startTime.value() + ", cutoffTime=" + cutoffStartTime + ']', + startTime.value() >= cutoffStartTime); + assertEquals("Snapshot end time must be zero prior to snapshot operation started.", + 0, endTime.value()); + assertEquals("Snapshot name must be set prior to snapshot operation started.", + SNAPSHOT_NAME, snpName.value()); + assertTrue("Snapshot error message must null prior to snapshot operation started.", + errMsg.value().isEmpty()); + } - MetricRegistry mreg1 = grid(1).context().metric().registry(SNAPSHOT_METRICS); + deltaBlock.countDown(); - LongMetric startTime1 = mreg1.findMetric("LastSnapshotStartTime"); - LongMetric endTime1 = mreg1.findMetric("LastSnapshotEndTime"); - ObjectGauge snpName1 = mreg1.findMetric("LastSnapshotName"); - ObjectGauge errMsg1 = mreg1.findMetric("LastSnapshotErrorMessage"); + fut.get(); - assertTrue("Snapshot start time must be greater than zero for finished snapshot.", - startTime1.value() > 0); - assertEquals("Snapshot end time must zero for failed on start snapshots.", - 0, endTime1.value()); - assertEquals("Snapshot name must be set when snapshot operation already finished.", - newSnapshotName, snpName1.value()); - assertNotNull("Concurrent snapshot operation must failed.", - errMsg1.value()); + for (Ignite g : G.allGrids()) { + MetricRegistry mreg = ((IgniteEx)g).context().metric().registry(SNAPSHOT_METRICS); - deltaBlock.countDown(); + LongMetric startTime = mreg.findMetric("LastSnapshotStartTime"); + LongMetric endTime = mreg.findMetric("LastSnapshotEndTime"); - fut0.get(); - - assertTrue("Snapshot start time must be greater than zero for finished snapshot.", - startTime.value() > 0); - assertTrue("Snapshot end time must be greater than zero for finished snapshot.", - endTime.value() > 0); - assertEquals("Snapshot name must be set when snapshot operation already finished.", - SNAPSHOT_NAME, snpName.value()); - assertTrue("Concurrent snapshot operation must finished successfully.", - errMsg.value().isEmpty()); - assertEquals("Only the first snapshot must be created and stored on disk.", - Collections.singletonList(SNAPSHOT_NAME), snpList.value()); + waitForCondition(() -> endTime.value() != 0L && startTime.value() != 0 && endTime.value() > startTime.value(), + getTestTimeout()); + } } /** @throws Exception If fails. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotWarnAtomicCachesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotWarnAtomicCachesTest.java index 1806cee36a7c2..68911b39dbcb0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotWarnAtomicCachesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotWarnAtomicCachesTest.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -112,8 +113,8 @@ private CacheConfiguration[] prepareCacheConfs(String grp0, St /** */ public void checkCachesSnapshotCreationAndRestore(CacheConfiguration... ccfgs) throws Exception { - List allWarnCaches = new ArrayList<>(); - Map> warnCachesByGrps = new HashMap<>(); + List allWarnCaches = new ArrayList<>(); + Map> warnCachesByGrps = new HashMap<>(); for (CacheConfiguration ccfg: ccfgs) { if (ccfg.getAtomicityMode() == CacheAtomicityMode.ATOMIC && ccfg.getBackups() > 0) { @@ -122,37 +123,25 @@ public void checkCachesSnapshotCreationAndRestore(CacheConfiguration { caches = caches == null ? new ArrayList<>() : caches; - int cacheNum = Integer.parseInt(ccfg.getName().replace("cache", "")); + caches.add(ccfg.getName()); - caches.add(cacheNum); - - allWarnCaches.add(cacheNum); + allWarnCaches.add(ccfg.getName()); return caches; }); } } - checkWarnMessageOnCreateSnapshot(cachesPattern(allWarnCaches), ccfgs); - checkWarnMessageOnRestoreSnapshot(cachesPattern(allWarnCaches), null); + checkWarnMessageOnCreateSnapshot(allWarnCaches, ccfgs); + checkWarnMessageOnRestoreSnapshot(allWarnCaches, null); for (String grp: warnCachesByGrps.keySet()) - checkWarnMessageOnRestoreSnapshot(cachesPattern(warnCachesByGrps.get(grp)), F.asList(grp)); - } - - /** Transforms cache numbers to cache pattern. For example, [0, 1] -> cache[0,1], cache[0,1]. */ - private @Nullable String cachesPattern(List cacheNums) { - if (cacheNums.isEmpty()) - return null; - - return cacheNums.stream() - .map(c -> "cache" + cacheNums) - .collect(Collectors.joining(", ")); + checkWarnMessageOnRestoreSnapshot(warnCachesByGrps.get(grp), F.asList(grp)); } /** */ private void checkWarnMessageOnCreateSnapshot( - @Nullable String warnAtomicCaches, + Collection warnAtomicCaches, CacheConfiguration... ccfgs ) throws Exception { this.ccfgs = ccfgs; @@ -172,25 +161,25 @@ private void checkWarnMessageOnCreateSnapshot( g.snapshot().createSnapshot(SNP).get(getTestTimeout()); - assertTrue(warnAtomicCaches, lsnr.check()); + assertTrue(lsnr.check()); for (CacheConfiguration c: ccfgs) { for (int i = 1_000; i < 2_000; i++) g.cache(c.getName()).put(i, i); } - lsnr = warnLogListener(warnAtomicCaches, warnAtomicCaches == null ? 0 : 1); + lsnr = warnLogListener(warnAtomicCaches, warnAtomicCaches.isEmpty() ? 0 : 3); lsnLogger.registerListener(lsnr); g.snapshot().createIncrementalSnapshot(SNP).get(getTestTimeout()); - assertTrue(warnAtomicCaches, lsnr.check()); + assertTrue(lsnr.check(getTestTimeout())); } /** */ private void checkWarnMessageOnRestoreSnapshot( - @Nullable String warnAtomicCaches, + Collection warnAtomicCaches, @Nullable Collection restoreCacheGrps ) throws Exception { stopAllGrids(); @@ -211,26 +200,36 @@ private void checkWarnMessageOnRestoreSnapshot( g.snapshot().restoreSnapshot(SNP, restoreCacheGrps).get(getTestTimeout()); - assertTrue(warnAtomicCaches + " " + restoreCacheGrps, lsnr.check()); + assertTrue(lsnr.check()); g.destroyCaches(g.cacheNames()); awaitPartitionMapExchange(); - lsnr = warnLogListener(warnAtomicCaches, warnAtomicCaches == null ? 0 : 1); + lsnr = warnLogListener(warnAtomicCaches, warnAtomicCaches.isEmpty() ? 0 : 1); lsnLogger.registerListener(lsnr); g.snapshot().restoreSnapshot(SNP, restoreCacheGrps, 1).get(getTestTimeout()); - assertTrue(warnAtomicCaches + " " + restoreCacheGrps, lsnr.check()); + assertTrue(lsnr.check()); } /** */ - private LogListener warnLogListener(@Nullable String atomicCaches, int times) { + private LogListener warnLogListener(Collection atomicCaches, int times) { + String cachesStr = null; + + if (atomicCaches.size() == 1) + cachesStr = F.first(atomicCaches); + else if (atomicCaches.size() > 1) { + cachesStr = "((" + String.join(", ", atomicCaches) + ')'; + + cachesStr += "|(" + atomicCaches.stream().sorted(Comparator.reverseOrder()).collect(Collectors.joining(", ")) + "))"; + } + Pattern p = Pattern.compile( "Incremental snapshot \\[snpName=" + SNP + ", incIdx=1] contains ATOMIC caches with backups:" - + (atomicCaches == null ? "" : " \\[" + atomicCaches) + ']'); + + (cachesStr == null ? "" : " \\[" + cachesStr) + "]"); return LogListener.matches(p).times(times).build(); }