Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2767,6 +2767,23 @@ public class Config extends ConfigBase {
@ConfField
public static int auto_analyze_simultaneously_running_task_num = 1;

@ConfField(mutable = true, masterOnly = true, description = {
"统计信息收集时 string 列允许的最大字节长度。若列中存在长度超过该值的行,"
+ "该列的统计信息将被跳过收集(task 仍标记为 FINISHED,在 SHOW ANALYZE 中显示跳过原因)。"
+ "≤ 0 表示关闭此保护。默认 1024 (1KB)。"
+ "注意:此保护只覆盖 FULL / LINEAR / DUJ1 统计收集路径(即 analyze 全表和 sample 的主 SQL)。"
+ "当 enable_partition_analyze=true 时的 per-partition 路径(PARTITION_ANALYZE_TEMPLATE)"
+ "出于正确性考虑不启用该保护,详见 BaseAnalysisTask 中的 NOTE。",
"Max byte length allowed for a string column when collecting statistics. "
+ "If any row in a string column is longer than this value, the column's stats "
+ "collection is skipped (the task is still marked FINISHED, with the skip reason "
+ "shown in SHOW ANALYZE). A value <= 0 disables this protection. Default: 1024 (1KB). "
+ "Note: this protection applies to the FULL / LINEAR / DUJ1 collection paths "
+ "(i.e. the main SQL used by full-table and sample analyze). The per-partition path "
+ "(PARTITION_ANALYZE_TEMPLATE, used when enable_partition_analyze=true) is intentionally "
+ "not guarded for correctness reasons; see the NOTE in BaseAnalysisTask."})
public static long statistics_max_string_column_length = 1024;

@Deprecated
@ConfField
public static final int period_analyze_simultaneously_running_task_num = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.doris.nereids.trees.expressions.functions.scalar.Ascii;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Asin;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Asinh;
import org.apache.doris.nereids.trees.expressions.functions.scalar.AssertTrue;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Atan;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Atan2;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Atanh;
Expand Down Expand Up @@ -656,6 +657,7 @@ public class BuiltinScalarFunctions implements FunctionHelper {
scalar(Ascii.class, "ascii"),
scalar(Asin.class, "asin"),
scalar(Asinh.class, "asinh"),
scalar(AssertTrue.class, "assert_true"),
scalar(Atan.class, "atan"),
scalar(Atanh.class, "atanh"),
scalar(Atan2.class, "atan2"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,20 @@ public void updateTaskState(AnalysisState state, String msg) {
killed = true;
case FINISHED:
for (BaseAnalysisTask task : queryFinished) {
analysisManager.updateTaskStatus(task.info, state, msg, time);
// When flushBuffer passes an empty msg, fall back to the
// task's own info.message. This propagates a skip reason
// previously stashed by BaseAnalysisTask.handleSkip into
// the single FINISHED update for this task, so job.message
// accumulation in AnalysisManager sees it and SHOW ANALYZE
// surfaces the skip reason at the job level.
String taskMsg = msg;
if ((taskMsg == null || taskMsg.isEmpty()) && task.info != null) {
taskMsg = task.info.message;
}
if (taskMsg == null) {
taskMsg = "";
}
analysisManager.updateTaskStatus(task.info, state, taskMsg, time);
}
default:
// DO NOTHING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,18 @@ protected AnalysisInfo buildAndAssignJob(AnalyzeTableCommand command) throws Ddl
syncExecute(analysisTaskInfos.values());
jobInfo.state = AnalysisState.FINISHED;
updateTableStats(jobInfo);
// Sync analyze never populates analysisJobIdToTaskMap, so updateTaskStatus
// skip-message accumulation does not fire for it. Surface any per-task skip
// reasons (e.g. long-string column skip) as an OK-packet info message so
// the user still sees why a column was dropped from collection.
List<String> skipMessages = analysisTaskInfos.values().stream()
.map(t -> t.info == null ? null : t.info.message)
.filter(m -> m != null && !m.isEmpty())
.collect(Collectors.toList());
if (!skipMessages.isEmpty() && ConnectContext.get() != null) {
ConnectContext.get().getState().setOk(0, skipMessages.size(),
String.join(" ", skipMessages));
}
return null;
}
recordAnalysisJob(jobInfo);
Expand Down Expand Up @@ -471,7 +483,12 @@ public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState, String
return;
}
info.state = taskState;
info.message = message;
// Preserve the existing info.message when flushBuffer calls updateTaskState(FINISHED, "")
// for already-finished tasks, so that a previously-set skip message (from
// BaseAnalysisTask.handleSkip) is not wiped by the subsequent batch FINISHED update.
if (!(taskState.equals(AnalysisState.FINISHED) && StringUtils.isEmpty(message))) {
info.message = message;
}
// Update the task cost time when task finished or failed. And only log the final state.
if (taskState.equals(AnalysisState.FINISHED) || taskState.equals(AnalysisState.FAILED)) {
info.timeCostInMs = time - info.lastExecTimeInMs;
Expand All @@ -496,6 +513,14 @@ public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState, String
String errMessage = String.format("%s:[%s] ", info.colName, message);
job.message = job.message == null ? errMessage : job.message + errMessage;
}
// Accumulate a non-empty FINISHED message (e.g. long-string skip reason) into
// job.message so it is visible in SHOW ANALYZE at job level. Guard on the
// incoming message being non-empty to avoid double-counting when flushBuffer
// later calls updateTaskState(FINISHED, "") for the same already-skipped task.
if (taskState.equals(AnalysisState.FINISHED) && !StringUtils.isEmpty(message)) {
String skipMessage = String.format("%s:[%s] ", info.colName, message);
job.message = job.message == null ? skipMessage : job.message + skipMessage;
}
// Set the job state to RUNNING when its first task becomes RUNNING.
if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) {
job.state = AnalysisState.RUNNING;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.statistics;

/**
* Control-flow signal thrown by an analysis task when it decides to skip
* statistics collection for a specific column (e.g. a string column contains
* at least one row whose byte length exceeds
* {@code Config.statistics_max_string_column_length}).
*
* This is NOT an error. The task that catches this exception should mark
* itself as FINISHED (not FAILED) and surface the skip reason via
* {@code info.message} / {@code SHOW ANALYZE}.
*/
public class AnalyzeSkipException extends RuntimeException {

public AnalyzeSkipException(String message) {
super(message);
}

public AnalyzeSkipException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public abstract class BaseAnalysisTask {
public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB
public static final double LIMIT_FACTOR = 1.2;

/**
* Marker string embedded in {@code assert_true} inside statistics collection SQL.
* When any row's string column length exceeds the configured limit, BE throws an
* error whose message contains this marker; FE detects it and converts the task
* result to a skip signal ({@link AnalyzeSkipException}) rather than a failure.
*/
public static final String ANALYZE_SKIP_LONG_STRING_COLUMN_MARKER = "ANALYZE_SKIP_LONG_STRING_COLUMN";

protected static final String FULL_ANALYZE_TEMPLATE =
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
Expand All @@ -81,10 +89,11 @@ public abstract class BaseAnalysisTask {
+ "${dataSizeFunction} AS `data_size`, "
+ "NOW() AS `update_time`, "
+ "null as `hot_value` "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index}";
+ "FROM (SELECT `${colName}`${lengthAssert} "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index}) __lc_t";

protected static final String LINEAR_ANALYZE_TEMPLATE = "WITH cte1 AS ("
+ "SELECT `${colName}` "
+ "SELECT `${colName}`${lengthAssert} "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${sampleHints} ${limit} ${preAggHint}), "
+ "cte2 AS ("
+ "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
Expand Down Expand Up @@ -120,7 +129,7 @@ public abstract class BaseAnalysisTask {
+ "(SELECT "
+ "${subStringColName} AS `hash_value`, "
+ "`${colName}` AS `col_value`, "
+ "LENGTH(`${colName}`) as `len` "
+ "LENGTH(`${colName}`) as `len`${lengthAssert} "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${sampleHints} ${limit}) as `t0` "
+ "${preAggHint} GROUP BY `t0`.`hash_value`), "
+ "cte2 AS ( "
Expand Down Expand Up @@ -166,6 +175,14 @@ public abstract class BaseAnalysisTask {
+ "${data_size} AS `data_size`, "
+ "NOW() ";

// NOTE: PARTITION_ANALYZE_TEMPLATE intentionally does NOT apply the long-string
// skip guard (statistics_max_string_column_length). Partition-granularity analyze
// commits per-batch INSERTs into __partition_stats incrementally. Aborting mid-loop
// via assert_true would leave a mix of fresh and stale rows in __partition_stats
// that is non-trivial to roll back. Since partition-level statistics are seldom
// relied upon today, we accept that long-string columns are NOT protected on this
// path. Only the full / sample OLAP paths and the external-table path enforce the
// per-row byte-length ceiling.
protected static final String PARTITION_ANALYZE_TEMPLATE = " SELECT "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
Expand All @@ -181,7 +198,7 @@ public abstract class BaseAnalysisTask {
+ "SUBSTRING(CAST(MAX(`${colName}`) AS STRING), 1, 1024) AS `max`, "
+ "${dataSizeFunction} AS `data_size`, "
+ "NOW() AS `update_time` "
+ " FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${partitionInfo}";
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${partitionInfo}";

protected static final String MERGE_PARTITION_TEMPLATE =
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
Expand Down Expand Up @@ -254,8 +271,66 @@ protected void init(AnalysisInfo info) {

public void execute() throws Exception {
prepareExecution();
doExecute();
afterExecution();
try {
doExecute();
} catch (AnalyzeSkipException e) {
handleSkip(e);
} catch (Exception e) {
if (containsSkipMarker(e)) {
handleSkip(new AnalyzeSkipException(buildSkipMessage(), e));
return;
}
throw e;
}
}

/**
* Walk the cause chain and inspect every Throwable's message for the
* long-string skip marker. More robust than only checking the root cause,
* since some execution paths wrap the BE error in an outer exception that
* reformats the message without preserving the original cause.
*/
protected static boolean containsSkipMarker(Throwable e) {
Throwable cur = e;
while (cur != null) {
String m = cur.getMessage();
if (m != null && m.contains(ANALYZE_SKIP_LONG_STRING_COLUMN_MARKER)) {
return true;
}
cur = cur.getCause();
}
return false;
}

/**
* Mark this task as FINISHED with a skip message. Called when the task
* detects that the column should be skipped (e.g. long string column).
*/
private void handleSkip(AnalyzeSkipException e) {
String skipMsg = e.getMessage();
LOG.info("Analyze task skip column [{}] in table [{}]. Reason: {}",
info.colName, tbl == null ? "?" : tbl.getName(), skipMsg);
// Stash the skip message on info.message. The job-level flushBuffer path
// (AnalysisJob.updateTaskState -> AnalysisManager.updateTaskStatus) will
// pick it up as the single FINISHED transition for this task, so we
// avoid a redundant updateTaskStatus call here. Doing the state update
// twice used to overwrite AnalysisInfo.timeCostInMs with the near-zero
// delta between the two FINISHED calls.
info.message = skipMsg;
// Route through taskDoneWithoutData: adds this task to queryFinished
// and triggers flushBuffer, which will call updateTaskState(FINISHED,
// "") exactly once per task. AnalysisJob.updateTaskState substitutes
// the task's own info.message when the outer msg is empty, so skipMsg
// reaches job.message for SHOW ANALYZE visibility.
job.taskDoneWithoutData(this);
}

private String buildSkipMessage() {
return String.format(
"Column [%s] has row(s) whose byte length exceeds %d (Config.statistics_max_string_column_length), "
+ "skip collecting statistics for this column.",
info == null ? "?" : info.colName,
org.apache.doris.common.Config.statistics_max_string_column_length);
}

protected void prepareExecution() {
Expand All @@ -266,8 +341,6 @@ protected void prepareExecution() {

protected abstract void doSample() throws Exception;

protected void afterExecution() {}

protected void setTaskStateToRunning() {
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.RUNNING, "", System.currentTimeMillis());
Expand Down Expand Up @@ -498,6 +571,31 @@ protected Map<String, String> buildSqlParams() {
return Maps.newHashMap();
}

/**
* Populate the {@code ${lengthAssert}} placeholder into SQL params map.
* For string columns with config > 0, the placeholder expands into a per-row
* {@code , assert_true(col IS NULL OR LENGTH(col) <= N, 'marker') AS __lc}
* clause that gets inserted into the inner-most SELECT list of statistics
* collection SQL. For non-string columns or when config <= 0, the placeholder
* is an empty string so the SQL stays unchanged.
*
* Note: the {@code IS NULL OR} guard is required because Doris's
* {@code assert_true} BE function throws on NULL inputs.
*/
protected void addLengthAssertParam(Map<String, String> params) {
long maxLen = org.apache.doris.common.Config.statistics_max_string_column_length;
if (col != null && col.getType().isStringType() && maxLen > 0) {
String escapedColName = StatisticsUtil.escapeColumnName(String.valueOf(info.colName));
// The StringSubstitutor used by callers already has ${colName} populated,
// so we inline the escaped column name directly here.
params.put("lengthAssert",
", assert_true(`" + escapedColName + "` IS NULL OR LENGTH(`" + escapedColName + "`) <= "
+ maxLen + ", '" + ANALYZE_SKIP_LONG_STRING_COLUMN_MARKER + "') AS `__lc`");
} else {
params.put("lengthAssert", "");
}
}

protected String castToNumeric(String colName) {
Type type = col.getType();
if (type.isNumericType()) {
Expand Down Expand Up @@ -535,6 +633,9 @@ protected void runQuery(String sql) {
job.appendBuf(this, Collections.singletonList(colStatsData));
} catch (Exception e) {
LOG.warn("Failed to execute sql {}", sql);
if (containsSkipMarker(e)) {
throw new AnalyzeSkipException(buildSkipMessage(), e);
}
throw e;
} finally {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ protected Map<String, String> buildSqlParams() {
params.put("type", col.getType().toString());
}
params.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis()));
addLengthAssertParam(params);
return params;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.SessionVariable;
Expand Down Expand Up @@ -398,6 +399,7 @@ protected Map<String, String> buildSqlParams() {
params.put("tblName", String.valueOf(tbl.getName()));
params.put("index", getIndex());
params.put("preAggHint", "");
addLengthAssertParam(params);
return params;
}

Expand Down Expand Up @@ -524,6 +526,9 @@ protected long getSampleRows() {
* @return True for single unique key column and single distribution column.
*/
protected boolean useLinearAnalyzeTemplate() {
if (DebugPointUtil.isEnable("OlapAnalysisTask.useDUJ1Template")) {
return false;
}
if (partitionColumnSampleTooManyRows || scanFullTable) {
return true;
}
Expand Down
Loading
Loading