diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java index 950064d168302..8fac3b5605f1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java @@ -31,7 +31,14 @@ public enum CheckpointWriteOrder { * All checkpoint pages are collected into single list and sorted by page index. * Provides almost sequential disk writes, which can be much faster on some SSD models. */ - SEQUENTIAL; + SEQUENTIAL, + + /** + * All checkpoint pages are sorted by page index. But for each new partition, the first page is one with the + * maximum page id, followed by pages sorted in ascending order. As a result, the first page for a given partition + * preallocates all necessary disk space for the file on the current checkpoint. + */ + SEQUENTIAL_WITH_PREALLOCATION; /** * Enumerated values. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java index 47480dace142a..78b9f31d78a8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.record.CacheState; @@ -491,7 +492,7 @@ private GridConcurrentMultiPairQueue splitAndSortCpPag cpPagesPerRegion.add(new T2<>(regPages.getKey(), pages)); } - if (checkpointWriteOrder == CheckpointWriteOrder.SEQUENTIAL) { + if (checkpointWriteOrder != CheckpointWriteOrder.RANDOM) { Comparator cmp = Comparator.comparingInt(FullPageId::groupId) .thenComparingLong(FullPageId::effectivePageId); @@ -506,11 +507,58 @@ private GridConcurrentMultiPairQueue splitAndSortCpPag if (pool != null) pool.shutdown(); + + if (checkpointWriteOrder == CheckpointWriteOrder.SEQUENTIAL_WITH_PREALLOCATION) { + for (T2 pagesPerReg : cpPagesPerRegion) + arrangeForPreallocation(pagesPerReg.getValue()); + } } return new GridConcurrentMultiPairQueue<>(cpPagesPerRegion); } + /** + * Arranges the array of pages in such a way that for each new partition, the first page is one with the + * maximum page id, followed by pages sorted in ascending order. As a result, the first page for a given partition + * preallocates all necessary disk space for the file on the current checkpoint. + * + * @param pageIds Sorted (by group id and page id) array of page ids. + */ + public static void arrangeForPreallocation(FullPageId[] pageIds) { + if (pageIds.length <= 1) + return; + + int partStartIdx = 0; + FullPageId prevFullPageId = pageIds[0]; + int prevGrpId = prevFullPageId.groupId(); + int prevPartId = PageIdUtils.partId(prevFullPageId.pageId()); + + for (int i = 1; i < pageIds.length; i++) { + assert pageIds[i].groupId() >= pageIds[i - 1].groupId() : "Unsorted page IDs array"; + assert pageIds[i].groupId() != pageIds[i - 1].groupId() + || pageIds[i].effectivePageId() >= pageIds[i - 1].effectivePageId() : "Unsorted page IDs array"; + + int curGrpId = pageIds[i].groupId(); + int curPartId = PageIdUtils.partId(pageIds[i].pageId()); + + if (curGrpId == prevGrpId && curPartId == prevPartId) { + FullPageId tmp = pageIds[i]; + pageIds[i] = prevFullPageId; + prevFullPageId = tmp; + continue; + } + + pageIds[partStartIdx] = prevFullPageId; + + prevGrpId = curGrpId; + prevPartId = curPartId; + prevFullPageId = pageIds[i]; + partStartIdx = i; + } + + pageIds[partStartIdx] = prevFullPageId; + } + /** * Performs parallel sort in isolated fork join pool. * diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java deleted file mode 100644 index 9a23d60489595..0000000000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.ignite.internal.processors.cache.persistence; - -import org.apache.ignite.configuration.CheckpointWriteOrder; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; - -/** - * - */ -public class IgnitePersistenceSequentialCheckpointTest extends IgnitePersistentStoreCacheGroupsTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); - dsCfg.setCheckpointThreads(4).setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected int entriesCount() { - return 100; - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java index 1f5d907c7b134..6079b61d380d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java @@ -36,6 +36,7 @@ import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.CheckpointWriteOrder; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -47,16 +48,27 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -/** - * - */ +/** */ +@RunWith(Parameterized.class) public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest { + /** Checkpoint write order. */ + @Parameterized.Parameter + public CheckpointWriteOrder cpWriteOrder; + + /** */ + @Parameterized.Parameters(name = "cpWriteOrder={0}") + public static Object[] parameters() { + return CheckpointWriteOrder.values(); + } + /** */ private static final String GROUP1 = "grp1"; @@ -88,6 +100,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest .setDefaultDataRegionConfiguration( new DataRegionConfiguration().setMaxSize(100L * 1024 * 1024).setPersistenceEnabled(true)) .setPageSize(1024) + .setCheckpointWriteOrder(cpWriteOrder) .setWalMode(WALMode.LOG_ONLY); cfg.setDataStorageConfiguration(memCfg); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/PageIdsArrangeForPreallocationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/PageIdsArrangeForPreallocationTest.java new file mode 100644 index 0000000000000..06166564f199b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/PageIdsArrangeForPreallocationTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint; + +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointWorkflow; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for CheckpointWorkflow.arrangeForPreallocation method. + */ +public class PageIdsArrangeForPreallocationTest { + /** */ + @Test + public void testSinglePageId() { + FullPageId[] pageIds = new FullPageId[] { + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 1), + }; + + CheckpointWorkflow.arrangeForPreallocation(pageIds); + + FullPageId[] exp = new FullPageId[] { + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 1), + }; + + Assert.assertArrayEquals(exp, pageIds); + } + + /** */ + @Test + public void testSinglePartition() { + FullPageId[] pageIds = new FullPageId[] { + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 2), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 3), 1), + }; + + CheckpointWorkflow.arrangeForPreallocation(pageIds); + + FullPageId[] exp = new FullPageId[] { + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 3), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 2), 1), + }; + + Assert.assertArrayEquals(exp, pageIds); + } + + /** */ + @Test + public void testSamePartitionDifferentGroups() { + FullPageId[] pageIds = new FullPageId[] { + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 2), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 2), 3), + }; + + CheckpointWorkflow.arrangeForPreallocation(pageIds); + + FullPageId[] exp = new FullPageId[] { + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 2), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 2), 3), + }; + + Assert.assertArrayEquals(exp, pageIds); + } + + /** */ + @Test + public void testMixed() { + FullPageId[] pageIds = new FullPageId[] { + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 2), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 5), 1), + new FullPageId(PageIdUtils.pageId(2, PageIdAllocator.FLAG_DATA, 4), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 2), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 3), 2), + new FullPageId(PageIdUtils.pageId(3, PageIdAllocator.FLAG_DATA, 1), 2), + new FullPageId(PageIdUtils.pageId(3, PageIdAllocator.FLAG_DATA, 2), 2), + new FullPageId(PageIdUtils.pageId(3, PageIdAllocator.FLAG_DATA, 3), 2), + new FullPageId(PageIdUtils.pageId(3, PageIdAllocator.FLAG_DATA, 4), 2), + new FullPageId(PageIdUtils.pageId(3, PageIdAllocator.FLAG_DATA, 5), 2), + }; + + CheckpointWorkflow.arrangeForPreallocation(pageIds); + + FullPageId[] exp = new FullPageId[] { + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 5), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 2), 1), + new FullPageId(PageIdUtils.pageId(2, PageIdAllocator.FLAG_DATA, 4), 1), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 3), 2), + new FullPageId(PageIdUtils.pageId(1, PageIdAllocator.FLAG_DATA, 1), 2), + new FullPageId(PageIdUtils.pageId(3, PageIdAllocator.FLAG_DATA, 5), 2), + new FullPageId(PageIdUtils.pageId(3, PageIdAllocator.FLAG_DATA, 1), 2), + new FullPageId(PageIdUtils.pageId(3, PageIdAllocator.FLAG_DATA, 2), 2), + new FullPageId(PageIdUtils.pageId(3, PageIdAllocator.FLAG_DATA, 3), 2), + new FullPageId(PageIdUtils.pageId(3, PageIdAllocator.FLAG_DATA, 4), 2), + }; + + Assert.assertArrayEquals(exp, pageIds); + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java index f26bf4d305088..1ea2e6a46515b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsMarshallerMappingRestoreOnNodeStartTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheRebalancingTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxHistoricalRebalancingTest; -import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistenceSequentialCheckpointTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreCacheGroupsTest; import org.apache.ignite.internal.processors.cache.persistence.PersistenceDirectoryWarningLoggingTest; import org.apache.ignite.internal.processors.cache.persistence.RestorePartitionStateDuringCheckpointTest; @@ -36,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMultiNodePutGetRestartTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgniteSequentialNodeCrashRecoveryTest; +import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.PageIdsArrangeForPreallocationTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCacheDestroyDuringCheckpointTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCacheIntegrationTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsDiskErrorsRecoveringTest; @@ -64,7 +64,7 @@ IgnitePdsPageEvictionTest.class, IgnitePdsMultiNodePutGetRestartTest.class, IgnitePersistentStoreCacheGroupsTest.class, - IgnitePersistenceSequentialCheckpointTest.class, + PageIdsArrangeForPreallocationTest.class, PersistenceDirectoryWarningLoggingTest.class, WalPathsTest.class, WalRecoveryTxLogicalRecordsTest.class,