From f9eee27e8083bb34c277493f5ee31efdad1c5af9 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 27 Oct 2025 17:06:59 +0800 Subject: [PATCH 01/61] add information schema table --- .../iotdb/db/qp/sql/IdentifierParser.g4 | 1 + .../apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 6 +- .../org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 4 + .../common/header/DatasetHeaderFactory.java | 4 + .../execution/operator/OperatorContext.java | 2 +- .../source/ShowDiskUsageOperator.java | 151 +++++++++ ...formationSchemaContentSupplierFactory.java | 320 +++++++++++++++++- .../db/queryengine/plan/analyze/Analysis.java | 5 +- .../plan/analyze/AnalyzeVisitor.java | 29 ++ .../queryengine/plan/parser/ASTVisitor.java | 7 + .../plan/planner/LogicalPlanBuilder.java | 29 ++ .../plan/planner/LogicalPlanVisitor.java | 9 + .../plan/planner/OperatorTreeGenerator.java | 14 + .../plan/planner/TableOperatorGenerator.java | 20 +- .../SimpleFragmentParallelPlanner.java | 2 + .../plan/planner/plan/node/PlanNodeType.java | 4 + .../plan/planner/plan/node/PlanVisitor.java | 5 + .../plan/node/source/ShowDiskUsageNode.java | 137 ++++++++ .../DataNodeLocationSupplierFactory.java | 1 + .../PushPredicateIntoTableScan.java | 71 +++- .../security/TreeAccessCheckVisitor.java | 10 + .../plan/relational/sql/ast/AstVisitor.java | 4 + .../sql/ast/ShowDiskUsageOfTable.java | 40 +++ .../relational/sql/parser/AstBuilder.java | 33 ++ .../relational/sql/rewrite/ShowRewrite.java | 32 ++ .../plan/statement/StatementType.java | 1 + .../plan/statement/StatementVisitor.java | 5 + .../statement/sys/ShowDiskUsageStatement.java | 58 ++++ .../utils/DiskUsageStatisticUtil.java | 130 +++++++ .../StorageEngineTimePartitionIterator.java | 93 +++++ .../utils/TableDiskUsageStatisticUtil.java | 152 +++++++++ .../utils/TreeDiskUsageStatisticUtil.java | 120 +++++++ .../schema/column/ColumnHeaderConstant.java | 9 + .../schema/table/InformationSchema.java | 37 ++ .../relational/grammar/sql/RelationalSql.g4 | 7 +- pom.xml | 2 +- 36 files changed, 1541 insertions(+), 13 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowDiskUsageNode.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowDiskUsageOfTable.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowDiskUsageStatement.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/StorageEngineTimePartitionIterator.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 index 0cc5e6faf8b2c..0a003d6fb7178 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 @@ -98,6 +98,7 @@ keyWords | DEVICES | DISABLE | DISCARD + | DISK_USAGE | DROP | ELAPSEDTIME | ELSE diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index 58f77da6c33e2..38b20a38d565b 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -89,7 +89,7 @@ dclStatement utilityStatement : flush | clearCache | setConfiguration | settle | startRepairData | stopRepairData | explain | setSystemStatus | showVersion | showFlushInfo | showLockInfo | showQueryResource - | showQueries | showCurrentTimestamp | killQuery | grantWatermarkEmbedding + | showQueries | showDiskUsage | showCurrentTimestamp | killQuery | grantWatermarkEmbedding | revokeWatermarkEmbedding | loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile | setSqlDialect | showCurrentSqlDialect | showCurrentUser ; @@ -1233,6 +1233,10 @@ showQueries rowPaginationClause? ; +showDiskUsage + : SHOW DISK_USAGE FROM prefixPath + ; + // Show Current Timestamp showCurrentTimestamp : SHOW CURRENT_TIMESTAMP diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index 0efaf9fac3879..1001180e3e2af 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -302,6 +302,10 @@ DISCARD : D I S C A R D ; +DISK_USAGE + : D I S K '_' U S A G E + ; + DROP : D R O P ; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java index a9e737ac06e66..1ad792ba4d761 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java @@ -199,6 +199,10 @@ public static DatasetHeader getShowQueriesHeader() { return new DatasetHeader(ColumnHeaderConstant.showQueriesColumnHeaders, false); } + public static DatasetHeader getShowDiskUsageHeader() { + return new DatasetHeader(ColumnHeaderConstant.showDiskUsageColumnHeaders, true); + } + public static DatasetHeader getShowSpaceQuotaHeader() { return new DatasetHeader(ColumnHeaderConstant.showSpaceQuotaColumnHeaders, true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java index beb25030400fb..76e33339f0458 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java @@ -112,7 +112,7 @@ public FragmentInstanceContext getInstanceContext() { return driverContext.getFragmentInstanceContext(); } - public Duration getMaxRunTime() { + public static Duration getMaxRunTime() { return maxRunTime; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java new file mode 100644 index 0000000000000..c6a39caef0a41 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.utils.PathUtils; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.utils.StorageEngineTimePartitionIterator; +import org.apache.iotdb.db.storageengine.dataregion.utils.TreeDiskUsageStatisticUtil; + +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; + +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +public class ShowDiskUsageOperator implements SourceOperator { + + private final OperatorContext operatorContext; + private final PlanNodeId sourceId; + private final PartialPath pathPattern; + private final StorageEngineTimePartitionIterator timePartitionIterator; + private TreeDiskUsageStatisticUtil statisticUtil; + private boolean allConsumed = false; + private long result = 0; + + public ShowDiskUsageOperator( + OperatorContext operatorContext, PlanNodeId sourceId, PartialPath pathPattern) { + this.operatorContext = operatorContext; + this.sourceId = sourceId; + this.pathPattern = pathPattern; + this.timePartitionIterator = + new StorageEngineTimePartitionIterator( + Optional.of( + dataRegion -> { + String databaseName = dataRegion.getDatabaseName(); + return !PathUtils.isTableModelDatabase(databaseName) + && pathPattern.matchPrefixPath(new PartialPath(databaseName)); + }), + Optional.empty()); + } + + @Override + public PlanNodeId getSourceId() { + return sourceId; + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + if (!hasNext()) { + throw new NoSuchElementException(); + } + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + do { + if (statisticUtil != null && statisticUtil.hasNextFile()) { + statisticUtil.calculateNextFile(); + continue; + } + if (statisticUtil != null) { + result += statisticUtil.getResult()[0]; + statisticUtil.close(); + } + if (timePartitionIterator.next()) { + DataRegion dataRegion = timePartitionIterator.currentDataRegion(); + long timePartition = timePartitionIterator.currentTimePartition(); + statisticUtil = + new TreeDiskUsageStatisticUtil( + dataRegion.getTsFileManager(), timePartition, pathPattern); + } else { + allConsumed = true; + } + } while (System.nanoTime() - start < maxRuntime && !allConsumed); + + if (!allConsumed) { + return null; + } + TsBlockBuilder tsBlockBuilder = + new TsBlockBuilder(1, DatasetHeaderFactory.getShowDiskUsageHeader().getRespDataTypes()); + tsBlockBuilder.getTimeColumnBuilder().writeLong(0); + tsBlockBuilder.getValueColumnBuilders()[0].writeInt( + IoTDBDescriptor.getInstance().getConfig().getDataNodeId()); + tsBlockBuilder.getValueColumnBuilders()[1].writeLong(result); + tsBlockBuilder.declarePosition(); + return tsBlockBuilder.build(); + } + + @Override + public boolean hasNext() throws Exception { + return !allConsumed; + } + + @Override + public void close() throws Exception { + if (statisticUtil != null) { + statisticUtil.close(); + } + } + + @Override + public boolean isFinished() throws Exception { + return allConsumed; + } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index db36048fcd054..270421900e1e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.commons.audit.UserEntity; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; @@ -41,6 +42,7 @@ import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo4InformationSchema; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo4InformationSchema; @@ -62,11 +64,13 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TTableInfo; import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowCreateViewTask; @@ -75,20 +79,28 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.util.ReservedIdentifiers; import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlKeywords; import org.apache.iotdb.db.schemaengine.table.InformationSchemaUtils; +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.utils.StorageEngineTimePartitionIterator; +import org.apache.iotdb.db.storageengine.dataregion.utils.TableDiskUsageStatisticUtil; import org.apache.iotdb.db.utils.MathUtils; import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.thrift.TException; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -97,6 +109,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -124,7 +137,10 @@ public class InformationSchemaContentSupplierFactory { private InformationSchemaContentSupplierFactory() {} public static Iterator getSupplier( - final String tableName, final List dataTypes, final UserEntity userEntity) { + final String tableName, + final List dataTypes, + final UserEntity userEntity, + final Filter pushDownFilter) { try { switch (tableName) { case InformationSchema.QUERIES: @@ -161,6 +177,8 @@ public static Iterator getSupplier( return new ConfigNodesSupplier(dataTypes, userEntity); case InformationSchema.DATA_NODES: return new DataNodesSupplier(dataTypes, userEntity); + case InformationSchema.TABLE_DISK_USAGE: + return new TableDiskUsageSupplier2(dataTypes, userEntity, pushDownFilter); default: throw new UnsupportedOperationException("Unknown table: " + tableName); } @@ -1218,6 +1236,306 @@ public boolean hasNext() { } } + private static class TableDiskUsageSupplier implements Iterator { + private final List dataTypes; + private final Map> databaseTableInfoMap; + private final Filter pushDownFilter; + private final Iterator dataRegionIterator; + + private DataRegion currentDataRegion; + private Iterator timePartitionsIterator; + private long currentTimePartition; + private List currentTablesToScan; + private TableDiskUsageStatisticUtil statisticUtil; + + private TableDiskUsageSupplier( + final List dataTypes, final UserEntity userEntity, Filter pushDownFilter) + throws TException, ClientManagerException { + this.dataTypes = dataTypes; + this.pushDownFilter = pushDownFilter; + AuthorityChecker.getAccessControl().checkUserGlobalSysPrivilege(userEntity); + try (final ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + this.databaseTableInfoMap = client.showTables4InformationSchema().getDatabaseTableInfoMap(); + } + this.dataRegionIterator = StorageEngine.getInstance().getAllDataRegions().iterator(); + } + + @Override + public boolean hasNext() { + if (statisticUtil != null) { + return true; + } + try { + while (true) { + if (timePartitionsIterator != null && timePartitionsIterator.hasNext()) { + currentTimePartition = timePartitionsIterator.next(); + currentTablesToScan = getTablesToScan(currentTimePartition); + if (!currentTablesToScan.isEmpty()) { + statisticUtil = + new TableDiskUsageStatisticUtil( + currentDataRegion.getTsFileManager(), + currentTimePartition, + currentTablesToScan); + return true; + } + } else if (!nextDataRegion()) { + return false; + } // should not have else branch + } + } catch (Throwable t) { + closeStatisticUtil(); + throw t; + } + } + + private boolean nextDataRegion() { + while (dataRegionIterator.hasNext()) { + currentDataRegion = dataRegionIterator.next(); + if (currentDataRegion == null) { + continue; + } + + List tTableInfos = + databaseTableInfoMap.get(currentDataRegion.getDatabaseName()); + if (tTableInfos == null || tTableInfos.isEmpty()) { + continue; + } + + timePartitionsIterator = currentDataRegion.getTimePartitions().iterator(); + if (timePartitionsIterator.hasNext()) { + return true; + } + } + return false; + } + + private List getTablesToScan(long timePartition) { + String databaseName = currentDataRegion.getDatabaseName(); + List tTableInfos = databaseTableInfoMap.get(databaseName); + if (tTableInfos == null || tTableInfos.isEmpty()) { + return Collections.emptyList(); + } + + if (pushDownFilter == null) { + return tTableInfos.stream().map(TTableInfo::getTableName).collect(Collectors.toList()); + } + + List tablesToScan = new ArrayList<>(tTableInfos.size()); + for (TTableInfo tTableInfo : tTableInfos) { + Object[] row = new Object[5]; + row[0] = new Binary(currentDataRegion.getDatabaseName(), TSFileConfig.STRING_CHARSET); + row[1] = new Binary(tTableInfo.getTableName(), TSFileConfig.STRING_CHARSET); + row[2] = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + row[3] = Integer.parseInt(currentDataRegion.getDataRegionId()); + row[4] = timePartition; + if (pushDownFilter.satisfyRow(0, row)) { + tablesToScan.add(tTableInfo.getTableName()); + } + } + return tablesToScan; + } + + @Override + public TsBlock next() { + try { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + long maxRuntime = OperatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + + if (statisticUtil.hasNextFile()) { + do { + statisticUtil.calculateNextFile(); + } while (System.nanoTime() - start < maxRuntime && statisticUtil.hasNextFile()); + if (statisticUtil.hasNextFile()) { + return null; + } + } + + TsBlockBuilder builder = new TsBlockBuilder(dataTypes); + long[] resultArr = statisticUtil.getResult(); + + for (int i = 0; i < currentTablesToScan.size(); i++) { + builder.getTimeColumnBuilder().writeLong(0); + ColumnBuilder[] columns = builder.getValueColumnBuilders(); + + columns[0].writeBinary( + new Binary(currentDataRegion.getDatabaseName(), TSFileConfig.STRING_CHARSET)); + columns[1].writeBinary( + new Binary(currentTablesToScan.get(i), TSFileConfig.STRING_CHARSET)); + columns[2].writeInt(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()); + columns[3].writeInt(Integer.parseInt(currentDataRegion.getDataRegionId())); + columns[4].writeLong(currentTimePartition); + columns[5].writeLong(resultArr[i]); + builder.declarePosition(); + } + closeStatisticUtil(); + return builder.build(); + } catch (Throwable t) { + closeStatisticUtil(); + throw t; + } + } + + private void closeStatisticUtil() { + if (statisticUtil == null) { + return; + } + try { + statisticUtil.close(); + statisticUtil = null; + } catch (IOException ignored) { + } + } + } + + private static class TableDiskUsageSupplier2 implements Iterator { + private final List dataTypes; + private final Map> databaseTableInfoMap; + private final Filter pushDownFilter; + + private DataRegion currentDataRegion; + private long currentTimePartition; + private List currentTablesToScan; + private TableDiskUsageStatisticUtil statisticUtil; + + private final StorageEngineTimePartitionIterator timePartitionIterator; + + private TableDiskUsageSupplier2( + final List dataTypes, final UserEntity userEntity, Filter pushDownFilter) + throws TException, ClientManagerException { + this.dataTypes = dataTypes; + this.pushDownFilter = pushDownFilter; + AuthorityChecker.getAccessControl().checkUserGlobalSysPrivilege(userEntity); + try (final ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + this.databaseTableInfoMap = client.showTables4InformationSchema().getDatabaseTableInfoMap(); + } + this.timePartitionIterator = + new StorageEngineTimePartitionIterator( + Optional.of( + dataRegion -> { + List tTableInfos = + databaseTableInfoMap.get(dataRegion.getDatabaseName()); + if (tTableInfos == null || tTableInfos.isEmpty()) { + return false; + } + return PathUtils.isTableModelDatabase(dataRegion.getDatabaseName()); + }), + Optional.of( + (dataRegion, timePartition) -> { + currentTablesToScan = getTablesToScan(dataRegion, timePartition); + return !currentTablesToScan.isEmpty(); + })); + } + + @Override + public boolean hasNext() { + if (statisticUtil != null) { + return true; + } + + try { + if (timePartitionIterator.next()) { + currentDataRegion = timePartitionIterator.currentDataRegion(); + currentTimePartition = timePartitionIterator.currentTimePartition(); + statisticUtil = + new TableDiskUsageStatisticUtil( + currentDataRegion.getTsFileManager(), currentTimePartition, currentTablesToScan); + return true; + } + return false; + } catch (Exception e) { + closeStatisticUtil(); + throw new RuntimeException(e.getMessage(), e); + } + } + + private List getTablesToScan(DataRegion dataRegion, long timePartition) { + String databaseName = dataRegion.getDatabaseName(); + List tTableInfos = databaseTableInfoMap.get(databaseName); + if (tTableInfos == null || tTableInfos.isEmpty()) { + return Collections.emptyList(); + } + + if (pushDownFilter == null) { + return tTableInfos.stream().map(TTableInfo::getTableName).collect(Collectors.toList()); + } + + List tablesToScan = new ArrayList<>(tTableInfos.size()); + for (TTableInfo tTableInfo : tTableInfos) { + Object[] row = new Object[5]; + row[0] = new Binary(dataRegion.getDatabaseName(), TSFileConfig.STRING_CHARSET); + row[1] = new Binary(tTableInfo.getTableName(), TSFileConfig.STRING_CHARSET); + row[2] = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + row[3] = Integer.parseInt(dataRegion.getDataRegionId()); + row[4] = timePartition; + if (pushDownFilter.satisfyRow(0, row)) { + tablesToScan.add(tTableInfo.getTableName()); + } + } + return tablesToScan; + } + + @Override + public TsBlock next() { + try { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + long maxRuntime = OperatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + + if (statisticUtil.hasNextFile()) { + do { + statisticUtil.calculateNextFile(); + } while (System.nanoTime() - start < maxRuntime && statisticUtil.hasNextFile()); + if (statisticUtil.hasNextFile()) { + return null; + } + } + + TsBlockBuilder builder = new TsBlockBuilder(dataTypes); + long[] resultArr = statisticUtil.getResult(); + + for (int i = 0; i < currentTablesToScan.size(); i++) { + builder.getTimeColumnBuilder().writeLong(0); + ColumnBuilder[] columns = builder.getValueColumnBuilders(); + + columns[0].writeBinary( + new Binary(currentDataRegion.getDatabaseName(), TSFileConfig.STRING_CHARSET)); + columns[1].writeBinary( + new Binary(currentTablesToScan.get(i), TSFileConfig.STRING_CHARSET)); + columns[2].writeInt(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()); + columns[3].writeInt(Integer.parseInt(currentDataRegion.getDataRegionId())); + columns[4].writeLong(currentTimePartition); + columns[5].writeLong(resultArr[i]); + builder.declarePosition(); + } + closeStatisticUtil(); + return builder.build(); + } catch (Throwable t) { + closeStatisticUtil(); + throw t; + } + } + + private void closeStatisticUtil() { + if (statisticUtil == null) { + return; + } + try { + statisticUtil.close(); + statisticUtil = null; + } catch (IOException ignored) { + } + } + } + private abstract static class TsBlockSupplier implements Iterator { protected final TsBlockBuilder resultBuilder; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index 9235bc350889a..1aef8ca39821e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -57,6 +57,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.db.schemaengine.template.Template; @@ -487,6 +488,7 @@ private boolean hasDataSource() { return (dataPartition != null && !dataPartition.isEmpty()) || (schemaPartition != null && !schemaPartition.isEmpty()) || statement instanceof ShowQueriesStatement + || statement instanceof ShowDiskUsageStatement || (statement instanceof QueryStatement && ((QueryStatement) statement).isAggregationQuery()); } @@ -509,7 +511,8 @@ public boolean isQuery() { public boolean needSetHighestPriority() { // if is this Statement is ShowQueryStatement, set its instances to the highest priority, so // that the sub-tasks of the ShowQueries instances could be executed first. - return StatementType.SHOW_QUERIES.equals(statement.getType()); + return StatementType.SHOW_QUERIES.equals(statement.getType()) + || StatementType.SHOW_DISK_USAGE.equals(statement.getType()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 30e8426c707e3..4ec8fe588dc92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -144,6 +144,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.schemaengine.template.Template; @@ -3786,6 +3787,34 @@ public Analysis visitShowQueries( return analysis; } + @Override + public Analysis visitShowDiskUsage( + ShowDiskUsageStatement showDiskUsageStatement, MPPQueryContext context) { + Analysis analysis = new Analysis(); + analysis.setRealStatement(showDiskUsageStatement); + analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowDiskUsageHeader()); + analysis.setVirtualSource(true); + + List allReadableDataNodeLocations = getReadableDataNodeLocations(); + if (allReadableDataNodeLocations.isEmpty()) { + throw new StatementAnalyzeException("no Running DataNodes"); + } + analysis.setReadableDataNodeLocations(allReadableDataNodeLocations); + + Set sourceExpressions = new HashSet<>(); + for (ColumnHeader columnHeader : analysis.getRespDatasetHeader().getColumnHeaders()) { + sourceExpressions.add( + TimeSeriesOperand.constructColumnHeaderExpression( + columnHeader.getColumnName(), columnHeader.getColumnType())); + } + analysis.setSourceExpressions(sourceExpressions); + sourceExpressions.forEach(expression -> analyzeExpressionType(analysis, expression)); + + analysis.setMergeOrderParameter(new OrderByParameter(showDiskUsageStatement.getSortItemList())); + + return analysis; + } + private void analyzeWhere(Analysis analysis, ShowQueriesStatement showQueriesStatement) { WhereCondition whereCondition = showQueriesStatement.getWhereCondition(); if (whereCondition == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 1695faa1ec030..0c02c015503a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -240,6 +240,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentSqlDialectStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; @@ -3683,6 +3684,12 @@ public Statement visitShowQueries(IoTDBSqlParser.ShowQueriesContext ctx) { return showQueriesStatement; } + @Override + public Statement visitShowDiskUsage(IoTDBSqlParser.ShowDiskUsageContext ctx) { + PartialPath pathPattern = parsePrefixPath(ctx.prefixPath()); + return new ShowDiskUsageStatement(pathPattern); + } + // show region @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index af965ba60c396..ccec53ce54e0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -85,6 +85,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; @@ -1283,6 +1284,34 @@ private LogicalPlanBuilder planSingleShowQueries( return this; } + public LogicalPlanBuilder planShowDiskUsage(Analysis analysis, PartialPath pathPattern) { + List dataNodeLocations = analysis.getReadableDataNodeLocations(); + if (dataNodeLocations.size() == 1) { + this.root = + new ShowDiskUsageNode( + context.getQueryId().genPlanNodeId(), dataNodeLocations.get(0), pathPattern); + } else { + MergeSortNode mergeSortNode = + new MergeSortNode( + context.getQueryId().genPlanNodeId(), + analysis.getMergeOrderParameter(), + ShowDiskUsageNode.SHOW_DISK_USAGE_HEADER_COLUMNS); + dataNodeLocations.forEach( + dataNodeLocation -> + mergeSortNode.addChild( + new ShowDiskUsageNode( + context.getQueryId().genPlanNodeId(), dataNodeLocation, pathPattern))); + this.root = mergeSortNode; + } + + ColumnHeaderConstant.showDiskUsageColumnHeaders.forEach( + columnHeader -> + context + .getTypeProvider() + .setTreeModelType(columnHeader.getColumnName(), columnHeader.getColumnType())); + return this; + } + public LogicalPlanBuilder planOrderBy(List sortItemList) { if (sortItemList.isEmpty()) { return this; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index fa39b3ba7dcef..c6d607717a926 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -85,6 +85,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.ShowLogicalViewStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.db.schemaengine.template.Template; @@ -955,6 +956,14 @@ public PlanNode visitShowQueries( return planBuilder.getRoot(); } + @Override + public PlanNode visitShowDiskUsage( + ShowDiskUsageStatement showDiskUsageStatement, MPPQueryContext context) { + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); + planBuilder = planBuilder.planShowDiskUsage(analysis, showDiskUsageStatement.getPathPattern()); + return planBuilder.getRoot(); + } + @Override public PlanNode visitCreateLogicalView( CreateLogicalViewStatement createLogicalViewStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index ede82a7d1cb7a..ce092eb734cdc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -150,6 +150,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesAggregationScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.ShowDiskUsageOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.ShowQueriesOperator; import org.apache.iotdb.db.queryengine.execution.operator.window.ConditionWindowParameter; import org.apache.iotdb.db.queryengine.execution.operator.window.CountWindowParameter; @@ -236,6 +237,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; @@ -2593,6 +2595,18 @@ public Operator visitShowQueries(ShowQueriesNode node, LocalExecutionPlanContext node.getAllowedUsername()); } + @Override + public Operator visitShowDiskUsage(ShowDiskUsageNode node, LocalExecutionPlanContext context) { + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + ShowDiskUsageOperator.class.getSimpleName()); + return new ShowDiskUsageOperator(operatorContext, node.getPlanNodeId(), node.getPathPattern()); + } + private List generateOutputColumnsFromChildren(MultiChildProcessNode node) { // TODO we should also sort the InputLocation for each column if they are not overlapped return makeLayout(node).entrySet().stream() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 9dfc1e0c48f21..fb94885bc8a99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -1281,6 +1281,23 @@ public Operator visitInformationSchemaTableScan( node.getPlanNodeId(), InformationSchemaTableScanOperator.class.getSimpleName()); + Filter pushDownFilter = null; + if (node.getPushDownPredicate() != null) { + Map measurementColumnsIndexMap = + new HashMap<>(node.getOutputColumnNames().size()); + for (int i = 0; i < node.getOutputColumnNames().size(); i++) { + measurementColumnsIndexMap.put(node.getOutputColumnNames().get(i), i); + } + pushDownFilter = + convertPredicateToFilter( + node.getPushDownPredicate(), + measurementColumnsIndexMap, + node.getAssignments(), + null, + context.getZoneId(), + TimestampPrecisionUtils.currPrecision); + } + final List dataTypes = node.getOutputSymbols().stream() .map(symbol -> getTSDataType(context.getTypeProvider().getTableModelType(symbol))) @@ -1296,7 +1313,8 @@ public Operator visitInformationSchemaTableScan( .getDriverContext() .getFragmentInstanceContext() .getSessionInfo() - .getUserEntity())); + .getUserEntity(), + pushDownFilter)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index 334d1973f5345..e1ef9921cf96d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; import org.apache.tsfile.read.common.Path; @@ -156,6 +157,7 @@ private void produceFragmentInstance(PlanFragment fragment) { if (analysis.getTreeStatement() instanceof QueryStatement || analysis.getTreeStatement() instanceof ExplainAnalyzeStatement || analysis.getTreeStatement() instanceof ShowQueriesStatement + || analysis.getTreeStatement() instanceof ShowDiskUsageStatement || (analysis.getTreeStatement() instanceof ShowTimeSeriesStatement && ((ShowTimeSeriesStatement) analysis.getTreeStatement()).isOrderByHeat())) { fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index a834f7e076d2d..40bff077a0cb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -102,6 +102,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode; @@ -261,6 +262,7 @@ public enum PlanNodeType { CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR((short) 97), LAST_QUERY_SCAN((short) 98), + SHOW_DISK_USAGE((short) 99), CREATE_OR_UPDATE_TABLE_DEVICE((short) 902), TABLE_DEVICE_QUERY_SCAN((short) 903), @@ -587,6 +589,8 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) { "You should never see ContinuousSameSearchIndexSeparatorNode in this function, because ContinuousSameSearchIndexSeparatorNode should never be used in network transmission."); case 98: return LastQueryScanNode.deserialize(buffer); + case 99: + return ShowDiskUsageNode.deserialize(buffer); case 902: return CreateOrUpdateTableDeviceNode.deserialize(buffer); case 903: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index b884ac01935b7..917b9654ce741 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -106,6 +106,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanSourceNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; @@ -372,6 +373,10 @@ public R visitShowQueries(ShowQueriesNode node, C context) { return visitPlan(node, context); } + public R visitShowDiskUsage(ShowDiskUsageNode node, C context) { + return visitPlan(node, context); + } + public R visitIdentitySink(IdentitySinkNode node, C context) { return visitPlan(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowDiskUsageNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowDiskUsageNode.java new file mode 100644 index 0000000000000..843c4a8d7d9ef --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowDiskUsageNode.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; + +import com.google.common.collect.ImmutableList; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class ShowDiskUsageNode extends VirtualSourceNode { + + public static final List SHOW_DISK_USAGE_HEADER_COLUMNS = + ImmutableList.of(ColumnHeaderConstant.DATA_NODE_ID, ColumnHeaderConstant.SIZE_IN_BYTES); + + private final PartialPath pathPattern; + + public ShowDiskUsageNode( + PlanNodeId id, TDataNodeLocation dataNodeLocation, PartialPath pathPattern) { + super(id, dataNodeLocation); + this.pathPattern = pathPattern; + } + + public PartialPath getPathPattern() { + return pathPattern; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public void addChild(PlanNode child) { + throw new UnsupportedOperationException("no child is allowed for ShowDiskUsageNode"); + } + + @Override + public PlanNodeType getType() { + return PlanNodeType.SHOW_DISK_USAGE; + } + + @Override + public PlanNode clone() { + return new ShowDiskUsageNode(getPlanNodeId(), getDataNodeLocation(), pathPattern); + } + + @Override + public int allowedChildCount() { + return NO_CHILD_ALLOWED; + } + + @Override + public List getOutputColumnNames() { + return SHOW_DISK_USAGE_HEADER_COLUMNS; + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitShowDiskUsage(this, context); + } + + // We only use DataNodeLocation when do distributionPlan, so DataNodeLocation is no need to + // serialize + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.SHOW_DISK_USAGE.serialize(byteBuffer); + pathPattern.serialize(byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.SHOW_DISK_USAGE.serialize(stream); + pathPattern.serialize(stream); + } + + public static ShowDiskUsageNode deserialize(ByteBuffer byteBuffer) { + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + PartialPath pathPattern = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer); + return new ShowDiskUsageNode(planNodeId, null, pathPattern); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + ShowDiskUsageNode that = (ShowDiskUsageNode) o; + return Objects.equals(this.pathPattern, that.pathPattern); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), pathPattern); + } + + @Override + public String toString() { + return String.format( + "ShowDiskUsageNode-%s: [pathPattern: %s]", this.getPlanNodeId(), pathPattern); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java index b2e384fbd4487..2044852fa27ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java @@ -85,6 +85,7 @@ private static InformationSchemaTableDataNodeLocationSupplier getInstance() { public List getDataNodeLocations(final String tableName) { switch (tableName) { case InformationSchema.QUERIES: + case InformationSchema.TABLE_DISK_USAGE: return getReadableDataNodeLocations(); case InformationSchema.DATABASES: case InformationSchema.TABLES: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index fab09a2e3f520..39449a5a1915f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.DataPartitionQueryParam; +import org.apache.iotdb.commons.schema.table.InformationSchema; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.sql.SemanticException; @@ -56,10 +57,12 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; @@ -440,6 +443,21 @@ && extractUnique(conjunct).contains(node.getGroupIdSymbol().get())) { return output; } + @Override + public PlanNode visitInformationSchemaTableScan( + InformationSchemaTableScanNode node, RewriteContext context) { + if (TRUE_LITERAL.equals(context.inheritedPredicate)) { + return node; + } + switch (node.getQualifiedObjectName().getObjectName()) { + // information tables that supports pushdown predicate + case InformationSchema.TABLE_DISK_USAGE: + return combineFilterAndScan(node, context.inheritedPredicate); + default: + return node; + } + } + @Override public PlanNode visitDeviceTableScan( DeviceTableScanNode tableScanNode, RewriteContext context) { @@ -454,8 +472,11 @@ public PlanNode visitDeviceTableScan( return combineFilterAndScan(tableScanNode, context.inheritedPredicate); } - public PlanNode combineFilterAndScan(DeviceTableScanNode tableScanNode, Expression predicate) { - SplitExpression splitExpression = splitPredicate(tableScanNode, predicate); + public PlanNode combineFilterAndScan(TableScanNode tableScanNode, Expression predicate) { + SplitExpression splitExpression = + tableScanNode instanceof InformationSchemaTableScanNode + ? splitPredicate((InformationSchemaTableScanNode) tableScanNode, predicate) + : splitPredicate((DeviceTableScanNode) tableScanNode, predicate); // exist expressions can push down to scan operator if (!splitExpression.getExpressionsCanPushDown().isEmpty()) { @@ -468,10 +489,11 @@ public PlanNode combineFilterAndScan(DeviceTableScanNode tableScanNode, Expressi // extract global time filter and set it to DeviceTableScanNode Pair resultPair = extractGlobalTimeFilter(pushDownPredicate, splitExpression.getTimeColumnName()); - if (resultPair.left != null) { - tableScanNode.setTimePredicate(resultPair.left); + Boolean hasValueFilter = resultPair.getRight(); + if (tableScanNode instanceof DeviceTableScanNode && resultPair.left != null) { + ((DeviceTableScanNode) tableScanNode).setTimePredicate(resultPair.left); } - if (Boolean.TRUE.equals(resultPair.right)) { + if (Boolean.TRUE.equals(hasValueFilter)) { if (pushDownPredicate instanceof LogicalExpression && ((LogicalExpression) pushDownPredicate).getTerms().size() == 1) { tableScanNode.setPushDownPredicate( @@ -485,7 +507,10 @@ public PlanNode combineFilterAndScan(DeviceTableScanNode tableScanNode, Expressi } // do index scan after expressionCanPushDown is processed - getDeviceEntriesWithDataPartitions(tableScanNode, splitExpression.getMetadataExpressions()); + if (tableScanNode instanceof DeviceTableScanNode) { + getDeviceEntriesWithDataPartitions( + (DeviceTableScanNode) tableScanNode, splitExpression.getMetadataExpressions()); + } // exist expressions can not push down to scan operator if (!splitExpression.getExpressionsCannotPushDown().isEmpty()) { @@ -501,6 +526,40 @@ public PlanNode combineFilterAndScan(DeviceTableScanNode tableScanNode, Expressi return tableScanNode; } + private SplitExpression splitPredicate( + InformationSchemaTableScanNode node, Expression predicate) { + Set columnsThatSupportPushDownPredicate = + InformationSchema.getColumnsSupportPushDownPredicate( + node.getQualifiedObjectName().getObjectName()); + List expressionsCanPushDown = new ArrayList<>(); + List expressionsCannotPushDown = new ArrayList<>(); + if (predicate instanceof LogicalExpression + && ((LogicalExpression) predicate).getOperator() == LogicalExpression.Operator.AND) { + + for (Expression expression : ((LogicalExpression) predicate).getTerms()) { + if (PredicateCombineIntoTableScanChecker.check( + columnsThatSupportPushDownPredicate, expression)) { + expressionsCanPushDown.add(expression); + } else { + expressionsCannotPushDown.add(expression); + } + } + + return new SplitExpression( + Collections.emptyList(), expressionsCanPushDown, expressionsCannotPushDown, null); + } + + if (PredicateCombineIntoTableScanChecker.check( + columnsThatSupportPushDownPredicate, predicate)) { + expressionsCanPushDown.add(predicate); + } else { + expressionsCannotPushDown.add(predicate); + } + + return new SplitExpression( + Collections.emptyList(), expressionsCanPushDown, expressionsCannotPushDown, null); + } + private SplitExpression splitPredicate(DeviceTableScanNode node, Expression predicate) { Set idOrAttributeColumnNames = new HashSet<>(node.getAssignments().size()); Set timeOrMeasurementColumnNames = new HashSet<>(node.getAssignments().size()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java index d939a85d7b457..1f32b82ee2ed3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java @@ -141,6 +141,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentSqlDialectStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; @@ -1638,6 +1639,15 @@ public TSStatus visitShowQueries(ShowQueriesStatement statement, TreeAccessCheck return SUCCEED; } + @Override + public TSStatus visitShowDiskUsage( + ShowDiskUsageStatement showDiskUsageStatement, TreeAccessCheckContext context) { + return checkGlobalAuth( + context.setAuditLogOperation(AuditLogOperation.QUERY), + PrivilegeType.SYSTEM, + () -> showDiskUsageStatement.getPathPattern().toString()); + } + @Override public TSStatus visitShowRegion(ShowRegionStatement statement, TreeAccessCheckContext context) { return checkGlobalAuth(context, PrivilegeType.MAINTAIN, () -> ""); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index 5b43b467a3620..4ee56d48e18f0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -425,6 +425,10 @@ protected R visitDescribeTable(DescribeTable node, C context) { return visitStatement(node, context); } + protected R visitShowDiskUsageOfTable(ShowDiskUsageOfTable node, C context) { + return visitStatement(node, context); + } + protected R visitSetProperties(SetProperties node, C context) { return visitStatement(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowDiskUsageOfTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowDiskUsageOfTable.java new file mode 100644 index 0000000000000..83acd147d0e40 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowDiskUsageOfTable.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import java.util.Optional; + +public class ShowDiskUsageOfTable extends ShowStatement { + + public ShowDiskUsageOfTable( + NodeLocation location, + String tableName, + Optional where, + Optional orderBy, + Optional offset, + Optional limit) { + super(location, tableName, where, orderBy, offset, limit); + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitShowDiskUsageOfTable(this, context); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 76107bfa30e7b..d29a23dba055c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.cache.CacheClearOptions; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.commons.schema.table.InformationSchema; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; @@ -187,6 +188,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDiskUsageOfTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowIndex; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowLoadedModels; @@ -1605,6 +1607,37 @@ public Node visitShowQueriesStatement(RelationalSqlParser.ShowQueriesStatementCo limit); } + @Override + public Node visitShowDiskUsageStatement(RelationalSqlParser.ShowDiskUsageStatementContext ctx) { + QualifiedName qualifiedName = getQualifiedName(ctx.tableName); + + if (!qualifiedName.getPrefix().isPresent()) { + throw new SemanticException("database is not specified"); + } + String database = qualifiedName.getPrefix().get().toString(); + String table = qualifiedName.getSuffix(); + Optional where = + Optional.of( + LogicalExpression.and( + new ComparisonExpression( + getLocation(ctx), + ComparisonExpression.Operator.EQUAL, + new Identifier(ColumnHeaderConstant.DATABASE.toLowerCase()), + new StringLiteral(database)), + new ComparisonExpression( + getLocation(ctx), + ComparisonExpression.Operator.EQUAL, + new Identifier(ColumnHeaderConstant.TABLE_NAME_TABLE_MODEL), + new StringLiteral(table)))); + return new ShowDiskUsageOfTable( + getLocation(ctx), + InformationSchema.TABLE_DISK_USAGE, + where, + Optional.empty(), + Optional.empty(), + Optional.empty()); + } + @Override public Node visitKillQueryStatement(RelationalSqlParser.KillQueryStatementContext ctx) { if (ctx.queryId == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/ShowRewrite.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/ShowRewrite.java index 12fd67d5017a4..3ab705341f6c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/ShowRewrite.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/ShowRewrite.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; @@ -28,14 +30,17 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupBy; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Relation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Select; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDiskUsageOfTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowQueriesStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowStatement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleGroupBy; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SingleColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; @@ -70,6 +75,33 @@ protected Node visitShowQueriesStatement(ShowQueriesStatement node, Void context return visitShowStatement(node, context); } + @Override + protected Node visitShowDiskUsageOfTable(ShowDiskUsageOfTable node, Void context) { + return simpleQuery( + selectList( + new SingleColumn(new Identifier(ColumnHeaderConstant.NODE_ID_TABLE_MODEL)), + new SingleColumn( + new FunctionCall( + QualifiedName.of(TableBuiltinAggregationFunction.SUM.getFunctionName()), + Collections.singletonList( + new Identifier(ColumnHeaderConstant.SIZE_IN_BYTES_TABLE_MODEL))), + new Identifier(ColumnHeaderConstant.SIZE_IN_BYTES_TABLE_MODEL))), + from(INFORMATION_DATABASE, node.getTableName()), + node.getWhere(), + Optional.of( + new GroupBy( + false, + Collections.singletonList( + new SimpleGroupBy( + Collections.singletonList( + new Identifier(ColumnHeaderConstant.NODE_ID_TABLE_MODEL)))))), + Optional.empty(), + Optional.empty(), + node.getOrderBy(), + node.getOffset(), + node.getLimit()); + } + @Override protected Node visitShowStatement(final ShowStatement showStatement, final Void context) { return simpleQuery( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java index 4284a484d573a..31f62cc6f1746 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java @@ -191,4 +191,5 @@ public enum StatementType { FAST_LAST_QUERY, SHOW_CONFIGURATION, + SHOW_DISK_USAGE, } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index 3bafdf8bfe0f0..82539ce13ed19 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -136,6 +136,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowConfigurationStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentSqlDialectStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; @@ -492,6 +493,10 @@ public R visitShowQueries(ShowQueriesStatement showQueriesStatement, C context) return visitStatement(showQueriesStatement, context); } + public R visitShowDiskUsage(ShowDiskUsageStatement showDiskUsageStatement, C context) { + return visitStatement(showDiskUsageStatement, context); + } + public R visitShowRegion(ShowRegionStatement showRegionStatement, C context) { return visitStatement(showRegionStatement, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowDiskUsageStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowDiskUsageStatement.java new file mode 100644 index 0000000000000..e03da031c197a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowDiskUsageStatement.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.sys; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowStatement; + +import java.util.Collections; +import java.util.List; + +public class ShowDiskUsageStatement extends ShowStatement { + private PartialPath pathPattern; + + public ShowDiskUsageStatement(PartialPath pathPattern) { + this.statementType = StatementType.SHOW_DISK_USAGE; + this.pathPattern = pathPattern; + } + + public PartialPath getPathPattern() { + return pathPattern; + } + + public List getSortItemList() { + return Collections.singletonList(new SortItem(OrderByKey.DATANODEID, Ordering.ASC)); + } + + @Override + public boolean isQuery() { + return true; + } + + @Override + public R accept(StatementVisitor visitor, C context) { + return visitor.visitShowDiskUsage(this, context); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java new file mode 100644 index 0000000000000..42e5c1a1820bd --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils; + +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.file.header.ChunkGroupHeader; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.MetadataIndexNode; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class DiskUsageStatisticUtil implements Closeable { + + protected static final Logger logger = LoggerFactory.getLogger(DiskUsageStatisticUtil.class); + protected List resourcesWithReadLock; + protected final Iterator iterator; + + public DiskUsageStatisticUtil(TsFileManager tsFileManager, long timePartition) { + List seqResources = tsFileManager.getTsFileListSnapshot(timePartition, true); + List unseqResources = tsFileManager.getTsFileListSnapshot(timePartition, false); + List resources = + Stream.concat(seqResources.stream(), unseqResources.stream()).collect(Collectors.toList()); + acquireReadLocks(resources); + iterator = resourcesWithReadLock.iterator(); + } + + public boolean hasNextFile() { + return iterator.hasNext(); + } + + public abstract long[] getResult(); + + protected void acquireReadLocks(List resources) { + this.resourcesWithReadLock = new ArrayList<>(resources.size()); + try { + for (TsFileResource resource : resources) { + resource.readLock(); + if (resource.isDeleted() || !resource.isClosed()) { + resource.readUnlock(); + continue; + } + resourcesWithReadLock.add(resource); + } + } catch (Exception e) { + releaseReadLocks(); + throw e; + } + } + + protected void releaseReadLocks() { + if (resourcesWithReadLock == null) { + return; + } + for (TsFileResource resource : resourcesWithReadLock) { + resource.readUnlock(); + } + resourcesWithReadLock = null; + } + + public abstract void calculateNextFile(); + + protected long calculateStartOffsetOfChunkGroup( + TsFileSequenceReader reader, + MetadataIndexNode firstMeasurementNodeOfCurrentDevice, + Pair deviceIsAlignedPair) + throws IOException { + int chunkGroupHeaderSize = + new ChunkGroupHeader(deviceIsAlignedPair.getLeft()).getSerializedSize(); + if (deviceIsAlignedPair.getRight()) { + List timeColumnTimeseriesMetadata = new ArrayList<>(1); + reader.readITimeseriesMetadata( + timeColumnTimeseriesMetadata, firstMeasurementNodeOfCurrentDevice, ""); + IChunkMetadata iChunkMetadata = + timeColumnTimeseriesMetadata.get(0).getChunkMetadataList().get(0); + return iChunkMetadata.getOffsetOfChunkHeader() - chunkGroupHeaderSize; + } else { + List timeseriesMetadataList = new ArrayList<>(); + reader.getDeviceTimeseriesMetadata( + timeseriesMetadataList, + firstMeasurementNodeOfCurrentDevice, + Collections.emptySet(), + true); + long minOffset = Long.MAX_VALUE; + for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { + for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) { + minOffset = Math.min(minOffset, chunkMetadata.getOffsetOfChunkHeader()); + break; + } + } + return minOffset - chunkGroupHeaderSize; + } + } + + @Override + public void close() throws IOException { + releaseReadLocks(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/StorageEngineTimePartitionIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/StorageEngineTimePartitionIterator.java new file mode 100644 index 0000000000000..1546f6f1abed2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/StorageEngineTimePartitionIterator.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils; + +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; + +import java.util.Iterator; +import java.util.Optional; + +public class StorageEngineTimePartitionIterator { + private final Iterator dataRegionIterator; + private final Optional dataRegionFilter; + private final Optional timePartitionFilter; + private DataRegion currentDataRegion; + private long currentTimePartition; + private Iterator timePartitionIterator; + + public StorageEngineTimePartitionIterator( + Optional dataRegionFilter, + Optional timePartitionFilter) { + this.dataRegionIterator = StorageEngine.getInstance().getAllDataRegions().iterator(); + this.dataRegionFilter = dataRegionFilter; + this.timePartitionFilter = timePartitionFilter; + } + + public boolean next() throws Exception { + while (true) { + if (timePartitionIterator != null && timePartitionIterator.hasNext()) { + currentTimePartition = timePartitionIterator.next(); + if (timePartitionFilter.isPresent() + && !timePartitionFilter.get().apply(currentDataRegion, currentTimePartition)) { + continue; + } + return true; + } else if (!nextDataRegion()) { + return false; + } // should not have else branch + } + } + + private boolean nextDataRegion() throws Exception { + while (dataRegionIterator.hasNext()) { + currentDataRegion = dataRegionIterator.next(); + if (currentDataRegion == null || currentDataRegion.isDeleted()) { + continue; + } + if (dataRegionFilter.isPresent() && !dataRegionFilter.get().apply(currentDataRegion)) { + continue; + } + timePartitionIterator = currentDataRegion.getTimePartitions().iterator(); + if (timePartitionIterator.hasNext()) { + return true; + } + } + return false; + } + + public DataRegion currentDataRegion() { + return currentDataRegion; + } + + public long currentTimePartition() { + return currentTimePartition; + } + + @FunctionalInterface + public interface DataRegionFilterFunc { + boolean apply(DataRegion currentDataRegion) throws Exception; + } + + @FunctionalInterface + public interface TimePartitionFilterFunc { + boolean apply(DataRegion currentDataRegion, long currentTimePartition) throws Exception; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java new file mode 100644 index 0000000000000..c50be4caa9860 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils; + +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.MetadataIndexNode; +import org.apache.tsfile.file.metadata.TsFileMetadata; +import org.apache.tsfile.read.TsFileDeviceIterator; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class TableDiskUsageStatisticUtil extends DiskUsageStatisticUtil { + private final Map tableIndexMap; + private final long[] resultArr; + + public TableDiskUsageStatisticUtil( + TsFileManager tsFileManager, long timePartition, List tableNames) { + super(tsFileManager, timePartition); + this.tableIndexMap = new HashMap<>(); + for (int i = 0; i < tableNames.size(); i++) { + tableIndexMap.put(tableNames.get(i), i); + } + this.resultArr = new long[tableNames.size()]; + } + + @Override + public long[] getResult() { + return resultArr; + } + + @Override + public void calculateNextFile() { + TsFileResource tsFileResource = iterator.next(); + if (tsFileResource.isDeleted()) { + return; + } + try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getTsFilePath())) { + TsFileMetadata tsFileMetadata = reader.readFileMetadata(); + if (!hasSatisfiedData(tsFileMetadata)) { + return; + } + int allSatisfiedTableIndex = getAllSatisfiedTableIndex(tsFileMetadata); + if (allSatisfiedTableIndex > 0) { + // size of tsfile - size of (tsfile magic string + version number + all metadata + metadata + // marker) + resultArr[allSatisfiedTableIndex] += + (tsFileResource.getTsFileSize() + - reader.getAllMetadataSize() + - 1 + - TSFileConfig.MAGIC_STRING.getBytes().length + - 1); + return; + } + calculateDiskUsageInBytesByOffset(reader); + } catch (Exception e) { + logger.error("Failed to scan file {}", tsFileResource.getTsFile().getAbsolutePath(), e); + } + } + + private boolean hasSatisfiedData(TsFileMetadata tsFileMetadata) { + Map tableMetadataIndexNodeMap = + tsFileMetadata.getTableMetadataIndexNodeMap(); + return tableIndexMap.keySet().stream().anyMatch(tableMetadataIndexNodeMap::containsKey); + } + + private int getAllSatisfiedTableIndex(TsFileMetadata tsFileMetadata) { + if (tsFileMetadata.getTableMetadataIndexNodeMap().size() != 1) { + return -1; + } + String satisfiedTableName = + tsFileMetadata.getTableMetadataIndexNodeMap().keySet().iterator().next(); + return tableIndexMap.get(satisfiedTableName); + } + + private void calculateDiskUsageInBytesByOffset(TsFileSequenceReader reader) throws IOException { + TsFileMetadata tsFileMetadata = reader.readFileMetadata(); + Map tableMetadataIndexNodeMap = + tsFileMetadata.getTableMetadataIndexNodeMap(); + String nextTable = null; + Iterator iterator = tableMetadataIndexNodeMap.keySet().iterator(); + Map tableOffsetMap = new HashMap<>(); + while (iterator.hasNext()) { + String currentTable = iterator.next(); + while (currentTable != null && tableIndexMap.containsKey(currentTable)) { + nextTable = iterator.hasNext() ? iterator.next() : null; + long tableSize = + calculateTableSize(tableOffsetMap, tsFileMetadata, reader, currentTable, nextTable); + resultArr[tableIndexMap.get(currentTable)] += tableSize; + currentTable = nextTable; + } + } + } + + private long calculateTableSize( + Map tableOffsetMap, + TsFileMetadata tsFileMetadata, + TsFileSequenceReader reader, + String tableName, + String nextTable) { + long startOffset, endOffset; + if (nextTable == null) { + endOffset = tsFileMetadata.getMetaOffset(); + } else { + endOffset = getTableOffset(tableOffsetMap, reader, nextTable); + } + startOffset = getTableOffset(tableOffsetMap, reader, tableName); + return endOffset - startOffset; + } + + private long getTableOffset( + Map tableOffsetMap, TsFileSequenceReader reader, String tableName) { + return tableOffsetMap.computeIfAbsent( + tableName, + k -> { + try { + TsFileDeviceIterator deviceIterator = reader.getTableDevicesIteratorWithIsAligned(k); + Pair pair = deviceIterator.next(); + return calculateStartOffsetOfChunkGroup( + reader, deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), pair); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java new file mode 100644 index 0000000000000..0aa4da37e6728 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.MetadataIndexNode; +import org.apache.tsfile.read.TsFileDeviceIterator; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; + +import java.io.IOException; + +public class TreeDiskUsageStatisticUtil extends DiskUsageStatisticUtil { + + private final PartialPath pathPattern; + private final boolean isPrefixPathPattern; + private long result; + + public TreeDiskUsageStatisticUtil( + TsFileManager tsFileManager, long timePartition, PartialPath pathPattern) { + super(tsFileManager, timePartition); + this.pathPattern = pathPattern; + this.isPrefixPathPattern = pathPattern.isPrefixPath(); + this.result = 0; + } + + @Override + public long[] getResult() { + return new long[] {result}; + } + + @Override + public void calculateNextFile() { + TsFileResource tsFileResource = iterator.next(); + if (tsFileResource.isDeleted()) { + return; + } + + try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getTsFilePath())) { + TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + while (deviceIterator.hasNext()) { + Pair deviceIsAlignedPair = deviceIterator.next(); + if (!matchPathPattern(deviceIsAlignedPair.getLeft())) { + continue; + } + MetadataIndexNode nodeOfFirstMatchedDevice = + deviceIterator.getFirstMeasurementNodeOfCurrentDevice(); + Pair nextNotMatchedDevice = null; + MetadataIndexNode nodeOfNextNotMatchedDevice = null; + while (deviceIterator.hasNext()) { + Pair currentDevice = deviceIterator.next(); + if (!matchPathPattern(currentDevice.getLeft())) { + nextNotMatchedDevice = currentDevice; + nodeOfNextNotMatchedDevice = deviceIterator.getFirstMeasurementNodeOfCurrentDevice(); + break; + } + } + result += + calculatePathPatternSize( + reader, + deviceIsAlignedPair, + nodeOfFirstMatchedDevice, + nextNotMatchedDevice, + nodeOfNextNotMatchedDevice); + if (isPrefixPathPattern) { + break; + } + } + } catch (Exception e) { + logger.error("Failed to scan file {}", tsFileResource.getTsFile().getAbsolutePath(), e); + } + } + + private long calculatePathPatternSize( + TsFileSequenceReader reader, + Pair firstMatchedDevice, + MetadataIndexNode nodeOfFirstMatchedDevice, + Pair nextNotMatchedDevice, + MetadataIndexNode nodeOfNextNotMatchedDevice) + throws IOException { + long startOffset, endOffset; + if (nextNotMatchedDevice == null) { + endOffset = reader.readFileMetadata().getMetaOffset(); + } else { + endOffset = + calculateStartOffsetOfChunkGroup( + reader, nodeOfNextNotMatchedDevice, nextNotMatchedDevice); + } + startOffset = + calculateStartOffsetOfChunkGroup(reader, nodeOfFirstMatchedDevice, firstMatchedDevice); + return endOffset - startOffset; + } + + private boolean matchPathPattern(IDeviceID deviceID) throws IllegalPathException { + return pathPattern.matchFullPath(CompactionPathUtils.getPath(deviceID)); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index 30997db31e1e2..25c114f5e8fc3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -325,6 +325,10 @@ private ColumnHeaderConstant() { public static final String SHOW_CONFIGURATIONS_DEFAULT_VALUE = "default_value"; public static final String SHOW_CONFIGURATIONS_DESCRIPTION = "description"; + public static final String SIZE_IN_BYTES = "SizeInBytes"; + public static final String SIZE_IN_BYTES_TABLE_MODEL = "size_in_bytes"; + public static final String TIME_PARTITION_TABLE_MODEL = "time_partition"; + public static final List lastQueryColumnHeaders = ImmutableList.of( new ColumnHeader(TIMESERIES, TSDataType.TEXT), @@ -611,6 +615,11 @@ private ColumnHeaderConstant() { new ColumnHeader(ELAPSED_TIME, TSDataType.FLOAT), new ColumnHeader(STATEMENT, TSDataType.TEXT)); + public static final List showDiskUsageColumnHeaders = + ImmutableList.of( + new ColumnHeader(DATA_NODE_ID, TSDataType.INT32), + new ColumnHeader(SIZE_IN_BYTES, TSDataType.INT64)); + public static final List showSpaceQuotaColumnHeaders = ImmutableList.of( new ColumnHeader(DATABASE, TSDataType.TEXT), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java index c58ad14989c43..8ececbae9ad22 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java @@ -21,13 +21,18 @@ import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.commons.schema.table.column.AttributeColumnSchema; +import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema; import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; import org.apache.tsfile.enums.TSDataType; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Locale; import java.util.Map; +import java.util.Set; public class InformationSchema { public static final String INFORMATION_DATABASE = "information_schema"; @@ -50,6 +55,7 @@ public class InformationSchema { public static final String NODES = "nodes"; public static final String CONFIG_NODES = "config_nodes"; public static final String DATA_NODES = "data_nodes"; + public static final String TABLE_DISK_USAGE = "table_disk_usage"; static { final TsTable queriesTable = new TsTable(QUERIES); @@ -362,12 +368,43 @@ public class InformationSchema { ColumnHeaderConstant.SCHEMA_CONSENSUS_PORT_TABLE_MODEL, TSDataType.INT32)); dataNodesTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); schemaTables.put(DATA_NODES, dataNodesTable); + + final TsTable tableDiskUsageTable = new TsTable(TABLE_DISK_USAGE); + tableDiskUsageTable.addColumnSchema( + new FieldColumnSchema(ColumnHeaderConstant.DATABASE, TSDataType.STRING)); + tableDiskUsageTable.addColumnSchema( + new FieldColumnSchema(ColumnHeaderConstant.TABLE_NAME_TABLE_MODEL, TSDataType.STRING)); + tableDiskUsageTable.addColumnSchema( + new FieldColumnSchema(ColumnHeaderConstant.NODE_ID_TABLE_MODEL, TSDataType.INT32)); + tableDiskUsageTable.addColumnSchema( + new FieldColumnSchema(ColumnHeaderConstant.REGION_ID_TABLE_MODEL, TSDataType.INT32)); + tableDiskUsageTable.addColumnSchema( + new FieldColumnSchema(ColumnHeaderConstant.TIME_PARTITION_TABLE_MODEL, TSDataType.INT64)); + tableDiskUsageTable.addColumnSchema( + new FieldColumnSchema(ColumnHeaderConstant.SIZE_IN_BYTES_TABLE_MODEL, TSDataType.INT64)); + tableDiskUsageTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); + schemaTables.put(TABLE_DISK_USAGE, tableDiskUsageTable); } public static Map getSchemaTables() { return schemaTables; } + public static Set getColumnsSupportPushDownPredicate(String tableName) { + switch (tableName) { + case TABLE_DISK_USAGE: + return new HashSet<>( + Arrays.asList( + ColumnHeaderConstant.DATABASE, + ColumnHeaderConstant.TABLE_NAME_TABLE_MODEL, + ColumnHeaderConstant.NODE_ID_TABLE_MODEL, + ColumnHeaderConstant.REGION_ID_TABLE_MODEL, + ColumnHeaderConstant.TIME_PARTITION_TABLE_MODEL)); + default: + return Collections.emptySet(); + } + } + private InformationSchema() { // Utils } diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 59231b6de2bcc..8be427cab938c 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -137,6 +137,7 @@ statement | setSystemStatusStatement | showVersionStatement | showQueriesStatement + | showDiskUsageStatement | killQueryStatement | loadConfigurationStatement | setConfigurationStatement @@ -639,6 +640,9 @@ showQueriesStatement limitOffsetClause ; +showDiskUsageStatement + : SHOW DISK_USAGE FROM (tableName=qualifiedName) + ; killQueryStatement : KILL (QUERY queryId=string | ALL QUERIES) @@ -1396,7 +1400,7 @@ nonReserved : ABSENT | ADD | ADMIN | AFTER | ALL | ANALYZE | ANY | ARRAY | ASC | AT | ATTRIBUTE | AUDIT | AUTHORIZATION | BEGIN | BERNOULLI | BOTH | CACHE | CALL | CALLED | CASCADE | CATALOG | CATALOGS | CHAR | CHARACTER | CHARSET | CLEAR | CLUSTER | CLUSTERID | COLUMN | COLUMNS | COMMENT | COMMIT | COMMITTED | CONDITION | CONDITIONAL | CONFIGNODES | CONFIGNODE | CONFIGURATION | CONNECTOR | CONSTANT | COPARTITION | COUNT | CURRENT - | DATA | DATABASE | DATABASES | DATANODE | DATANODES | DATASET | DATE | DAY | DECLARE | DEFAULT | DEFINE | DEFINER | DENY | DESC | DESCRIPTOR | DETAILS| DETERMINISTIC | DEVICES | DISTRIBUTED | DO | DOUBLE + | DATA | DATABASE | DATABASES | DATANODE | DATANODES | DATASET | DATE | DAY | DECLARE | DEFAULT | DEFINE | DEFINER | DENY | DESC | DESCRIPTOR | DETAILS| DETERMINISTIC | DEVICES | DISK_USAGE | DISTRIBUTED | DO | DOUBLE | ELSEIF | EMPTY | ENCODING | ERROR | EXCLUDING | EXPLAIN | EXTRACTOR | FETCH | FIELD | FILTER | FINAL | FIRST | FLUSH | FOLLOWING | FORMAT | FUNCTION | FUNCTIONS | GRACE | GRANT | GRANTED | GRANTS | GRAPHVIZ | GROUPS @@ -1513,6 +1517,7 @@ DESCRIPTOR: 'DESCRIPTOR'; DETAILS: 'DETAILS'; DETERMINISTIC: 'DETERMINISTIC'; DEVICES: 'DEVICES'; +DISK_USAGE: 'DISK_USAGE'; DISTINCT: 'DISTINCT'; DISTRIBUTED: 'DISTRIBUTED'; DO: 'DO'; diff --git a/pom.xml b/pom.xml index c86df5f482d93..b09d6db166fbe 100644 --- a/pom.xml +++ b/pom.xml @@ -177,7 +177,7 @@ 0.14.1 1.9 1.5.6-3 - 2.2.0-251010-SNAPSHOT + 2.2.0-251027-SNAPSHOT