diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java index 14d9171f3ff0e4..46fc749493c80e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java @@ -26,6 +26,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -254,7 +255,7 @@ public void addReplica(Replica replica, boolean isRestore) { @Override public List getReplicas() { - return this.replicas; + return Lists.newArrayList(this.replicas); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java index 2e98354acef886..d18c670786934b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java @@ -94,7 +94,7 @@ public MaterializedIndex(long id, IndexState state) { } public List getTablets() { - return tablets; + return Lists.newArrayList(tablets); } public List getTabletIdsInOrder() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index bcf74528da6fd9..5c5d2138c461cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -382,7 +382,14 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { if (result.isSetTabletStatList()) { for (TTabletStat stat : result.getTabletStatList()) { if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) { - Replica replica = invertedIndex.getReplica(stat.getTabletId(), beId); + Replica replica; + try { + replica = invertedIndex.getReplica(stat.getTabletId(), beId); + } catch (IllegalStateException e) { + LOG.debug("skip stale tablet stat update for tablet {} on backend {}", + stat.getTabletId(), beId, e); + continue; + } if (replica != null) { replica.setDataSize(stat.getDataSize()); replica.setRemoteDataSize(stat.getRemoteDataSize()); @@ -411,7 +418,14 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { // the replica is obsolete, ignore it. continue; } - Replica replica = invertedIndex.getReplica(entry.getKey(), beId); + Replica replica; + try { + replica = invertedIndex.getReplica(entry.getKey(), beId); + } catch (IllegalStateException e) { + LOG.debug("skip stale tablet stat update for tablet {} on backend {}", + entry.getKey(), beId, e); + continue; + } if (replica == null) { // replica may be deleted from catalog, ignore it. continue; diff --git a/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy b/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy new file mode 100644 index 00000000000000..6a4d683b7157cc --- /dev/null +++ b/regression-test/suites/nonConcurrent/tablet_stat_mgr_p0/test_tablet_stat_mgr_concurrent_partition_churn.groovy @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.io.RandomAccessFile +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference + +suite("test_tablet_stat_mgr_concurrent_partition_churn", "nonConcurrent") { + String dbName = context.config.getDbNameByFile(context.file) + sql "select 1" + + String tableName = "test_tablet_stat_mgr_churn" + String oldInterval = null + AtomicBoolean stopped = new AtomicBoolean(false) + AtomicReference firstError = new AtomicReference<>() + + String dorisHome = System.getProperty("DORIS_HOME") + if (dorisHome == null || dorisHome.isEmpty()) { + dorisHome = context.config.suitePath.replace("/regression-test/suites", "") + } + + File feLog = new File("${dorisHome}/fe/log/fe.log") + File feWarnLog = new File("${dorisHome}/fe/log/fe.warn.log") + long feLogOffset = feLog.exists() ? feLog.length() : 0L + long feWarnLogOffset = feWarnLog.exists() ? feWarnLog.length() : 0L + + def readAppendedLog = { File file, long offset -> + if (!file.exists()) { + logger.warn("skip checking appended log because {} does not exist", file.getAbsolutePath()) + return "" + } + RandomAccessFile raf = new RandomAccessFile(file, "r") + try { + long safeOffset = Math.min(offset, raf.length()) + raf.seek(safeOffset) + int size = (int) (raf.length() - safeOffset) + if (size <= 0) { + return "" + } + byte[] bytes = new byte[size] + raf.readFully(bytes) + return new String(bytes, "UTF-8") + } finally { + raf.close() + } + } + + def failIfNeeded = { + if (firstError.get() != null) { + throw firstError.get() + } + } + + def startWorker = { Closure body -> + Thread.startDaemon { + try { + connect(context.config.jdbcUser, context.config.jdbcPassword, context.config.jdbcUrl) { + sql "use ${dbName}" + body() + } + } catch (Throwable t) { + logger.warn("tablet stat mgr concurrency worker failed", t) + firstError.compareAndSet(null, t) + stopped.set(true) + } + } + } + + try { + def configRow = sql """ ADMIN SHOW FRONTEND CONFIG LIKE 'tablet_stat_update_interval_second' """ + oldInterval = configRow[0][1] + sql """ ADMIN SET FRONTEND CONFIG ("tablet_stat_update_interval_second" = "1") """ + + sql """ DROP TABLE IF EXISTS ${tableName} FORCE """ + sql """ + CREATE TABLE ${tableName} ( + `k1` INT NOT NULL, + `k2` INT NOT NULL + ) + DUPLICATE KEY(`k1`) + PARTITION BY RANGE(`k1`) ( + PARTITION p0 VALUES LESS THAN ("100"), + PARTITION p1 VALUES LESS THAN ("200") + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + sql """ INSERT INTO ${tableName} VALUES (1, 1), (101, 1) """ + + def insertWorker = startWorker { + int i = 0 + while (!stopped.get()) { + int left = i % 90 + int right = 100 + (i % 90) + sql """ INSERT INTO ${tableName} VALUES (${left}, ${i}), (${right}, ${i}) """ + i++ + } + } + + def partitionWorker = startWorker { + int i = 0 + while (!stopped.get()) { + String partitionName = "p_dyn_${i}" + int upperBound = 300 + i * 100 + sql """ ALTER TABLE ${tableName} ADD PARTITION ${partitionName} VALUES LESS THAN ("${upperBound}") """ + sql """ ALTER TABLE ${tableName} DROP PARTITION IF EXISTS ${partitionName} FORCE """ + i++ + } + } + + def tableWorker = startWorker { + int i = 0 + while (!stopped.get()) { + String tempTable = "tmp_tablet_stat_mgr_churn_${i}" + sql """ + CREATE TABLE ${tempTable} ( + `k1` INT NOT NULL, + `k2` INT NOT NULL + ) + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + sql """ DROP TABLE IF EXISTS ${tempTable} FORCE """ + i++ + } + } + + sleep(12000) + stopped.set(true) + [insertWorker, partitionWorker, tableWorker].each { Thread thread -> + thread.join(10000) + } + + failIfNeeded() + sql "sync" + def rowCount = sql """ SELECT count(*) FROM ${tableName} """ + assertTrue(rowCount[0][0].toLong() > 0L) + + sleep(3000) + String appendedLogs = readAppendedLog(feLog, feLogOffset) + readAppendedLog(feWarnLog, feWarnLogOffset) + assertFalse(appendedLogs.contains("ConcurrentModificationException")) + assertFalse(appendedLogs.contains("daemon thread got exception. name: tablet stat mgr")) + } finally { + stopped.set(true) + if (oldInterval != null) { + sql """ ADMIN SET FRONTEND CONFIG ("tablet_stat_update_interval_second" = "${oldInterval}") """ + } + sql """ DROP TABLE IF EXISTS ${tableName} FORCE """ + } +}