Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ REAL: 'REAL';
REBALANCE: 'REBALANCE';
RECENT: 'RECENT';
RECOVER: 'RECOVER';
RECURSIVE: 'RECURSIVE';
RECYCLE: 'RECYCLE';
REFRESH: 'REFRESH';
REFERENCES: 'REFERENCES';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ querySpecification
;

cte
: WITH aliasQuery (COMMA aliasQuery)*
: WITH RECURSIVE? aliasQuery (COMMA aliasQuery)*
;

aliasQuery
Expand Down Expand Up @@ -2256,6 +2256,7 @@ nonReserved
| RANDOM
| RECENT
| RECOVER
| RECURSIVE
| RECYCLE
| REFRESH
| REPEATABLE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package org.apache.doris.analysis;

import org.apache.doris.catalog.RecursiveCteTempTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.thrift.TDescriptorTable;
Expand Down Expand Up @@ -100,6 +101,10 @@ public TDescriptorTable toThrift() {
}

for (TableIf tbl : referencedTbls.values()) {
if (tbl instanceof RecursiveCteTempTable) {
// skip recursive cte temp table
continue;
}
result.addToTableDescriptors(tbl.toThrift());
}
thriftDescTable = result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.common.SystemIdGenerator;

import com.google.common.collect.ImmutableList;

import java.util.List;

public class RecursiveCteTempTable extends Table {
public RecursiveCteTempTable(String tableName, List<Column> fullSchema) {
super(SystemIdGenerator.getNextId(), tableName, TableType.RECURSIVE_CTE_TEMP_TABLE, fullSchema);
}

@Override
public List<String> getFullQualifiers() {
return ImmutableList.of(name);
}
}
19 changes: 12 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,13 @@ default PrimaryKeyConstraint tryGetPrimaryKeyForForeignKeyUnsafe(
default void addForeignConstraint(String name, ImmutableList<String> columns,
TableIf referencedTable, ImmutableList<String> referencedColumns, boolean replay) {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
ForeignKeyConstraint foreignKeyConstraint =
new ForeignKeyConstraint(name, columns, referencedTable, referencedColumns);
ForeignKeyConstraint foreignKeyConstraint = new ForeignKeyConstraint(name, columns, referencedTable,
referencedColumns);
checkConstraintNotExistenceUnsafe(name, foreignKeyConstraint, constraintMap);
PrimaryKeyConstraint requirePrimaryKeyName = new PrimaryKeyConstraint(name,
foreignKeyConstraint.getReferencedColumnNames());
PrimaryKeyConstraint primaryKeyConstraint =
tryGetPrimaryKeyForForeignKeyUnsafe(requirePrimaryKeyName, referencedTable);
PrimaryKeyConstraint primaryKeyConstraint = tryGetPrimaryKeyForForeignKeyUnsafe(requirePrimaryKeyName,
referencedTable);
primaryKeyConstraint.addForeignTable(this);
constraintMap.put(name, foreignKeyConstraint);
if (!replay) {
Expand Down Expand Up @@ -446,10 +446,13 @@ default boolean needReadLockWhenPlan() {
*/
enum TableType {
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE,
@Deprecated ICEBERG, @Deprecated HUDI, JDBC,
@Deprecated
ICEBERG, @Deprecated
HUDI, JDBC,
TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE,
HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, LAKESOUl_EXTERNAL_TABLE, DICTIONARY, DORIS_EXTERNAL_TABLE;
HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, LAKESOUl_EXTERNAL_TABLE, DICTIONARY, DORIS_EXTERNAL_TABLE,
RECURSIVE_CTE_TEMP_TABLE;

public String toEngineName() {
switch (this) {
Expand Down Expand Up @@ -492,6 +495,8 @@ public String toEngineName() {
return "dictionary";
case DORIS_EXTERNAL_TABLE:
return "External_Doris";
case RECURSIVE_CTE_TEMP_TABLE:
return "RecursiveCteTempTable";
default:
return null;
}
Expand Down Expand Up @@ -531,6 +536,7 @@ public String toMysqlType() {
case MATERIALIZED_VIEW:
case TRINO_CONNECTOR_EXTERNAL_TABLE:
case DORIS_EXTERNAL_TABLE:
case RECURSIVE_CTE_TEMP_TABLE:
return "BASE TABLE";
default:
return null;
Expand Down Expand Up @@ -664,4 +670,3 @@ default Optional<TableValuedFunctionRefInfo> getSysTableFunctionRef(
return Optional.empty();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public class CascadesContext implements ScheduleContext {
private final boolean isEnableExprTrace;

private int groupExpressionCount = 0;
private Optional<String> currentRecursiveCteName;
private List<Slot> recursiveCteOutputs;

/**
* Constructor of OptimizerContext.
Expand All @@ -142,7 +144,8 @@ public class CascadesContext implements ScheduleContext {
*/
private CascadesContext(Optional<CascadesContext> parent, Optional<CTEId> currentTree,
StatementContext statementContext, Plan plan, Memo memo,
CTEContext cteContext, PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder) {
CTEContext cteContext, PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder,
Optional<String> currentRecursiveCteName, List<Slot> recursiveCteOutputs) {
this.parent = Objects.requireNonNull(parent, "parent should not null");
this.currentTree = Objects.requireNonNull(currentTree, "currentTree should not null");
this.statementContext = Objects.requireNonNull(statementContext, "statementContext should not null");
Expand All @@ -167,6 +170,8 @@ private CascadesContext(Optional<CascadesContext> parent, Optional<CTEId> curren
this.isEnableExprTrace = false;
}
this.isLeadingDisableJoinReorder = isLeadingDisableJoinReorder;
this.currentRecursiveCteName = currentRecursiveCteName;
this.recursiveCteOutputs = recursiveCteOutputs;
}

/** init a temporary context to rewrite expression */
Expand All @@ -181,7 +186,7 @@ public static CascadesContext initTempContext() {
}
return newContext(Optional.empty(), Optional.empty(),
statementContext, DUMMY_PLAN,
new CTEContext(), PhysicalProperties.ANY, false);
new CTEContext(), PhysicalProperties.ANY, false, Optional.empty(), ImmutableList.of());
}

/**
Expand All @@ -190,24 +195,25 @@ public static CascadesContext initTempContext() {
public static CascadesContext initContext(StatementContext statementContext,
Plan initPlan, PhysicalProperties requireProperties) {
return newContext(Optional.empty(), Optional.empty(), statementContext,
initPlan, new CTEContext(), requireProperties, false);
initPlan, new CTEContext(), requireProperties, false, Optional.empty(), ImmutableList.of());
}

/**
* use for analyze cte. we must pass CteContext from outer since we need to get right scope of cte
*/
public static CascadesContext newContextWithCteContext(CascadesContext cascadesContext,
Plan initPlan, CTEContext cteContext) {
Plan initPlan, CTEContext cteContext, Optional<String> currentRecursiveCteName,
List<Slot> recursiveCteOutputs) {
return newContext(Optional.of(cascadesContext), Optional.empty(),
cascadesContext.getStatementContext(), initPlan, cteContext, PhysicalProperties.ANY,
cascadesContext.isLeadingDisableJoinReorder
);
cascadesContext.isLeadingDisableJoinReorder, currentRecursiveCteName, recursiveCteOutputs);
}

public static CascadesContext newCurrentTreeContext(CascadesContext context) {
return CascadesContext.newContext(context.getParent(), context.getCurrentTree(), context.getStatementContext(),
context.getRewritePlan(), context.getCteContext(),
context.getCurrentJobContext().getRequiredProperties(), context.isLeadingDisableJoinReorder);
context.getCurrentJobContext().getRequiredProperties(), context.isLeadingDisableJoinReorder,
Optional.empty(), ImmutableList.of());
}

/**
Expand All @@ -216,14 +222,17 @@ public static CascadesContext newCurrentTreeContext(CascadesContext context) {
public static CascadesContext newSubtreeContext(Optional<CTEId> subtree, CascadesContext context,
Plan plan, PhysicalProperties requireProperties) {
return CascadesContext.newContext(Optional.of(context), subtree, context.getStatementContext(),
plan, context.getCteContext(), requireProperties, context.isLeadingDisableJoinReorder);
plan, context.getCteContext(), requireProperties, context.isLeadingDisableJoinReorder, Optional.empty(),
ImmutableList.of());
}

private static CascadesContext newContext(Optional<CascadesContext> parent, Optional<CTEId> subtree,
StatementContext statementContext, Plan initPlan, CTEContext cteContext,
PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder) {
PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder,
Optional<String> currentRecursiveCteName, List<Slot> recursiveCteOutputs) {
return new CascadesContext(parent, subtree, statementContext, initPlan, null,
cteContext, requireProperties, isLeadingDisableJoinReorder);
cteContext, requireProperties, isLeadingDisableJoinReorder, currentRecursiveCteName,
recursiveCteOutputs);
}

public CascadesContext getRoot() {
Expand All @@ -250,6 +259,18 @@ public synchronized boolean isTimeout() {
return isTimeout;
}

public Optional<String> getCurrentRecursiveCteName() {
return currentRecursiveCteName;
}

public List<Slot> getRecursiveCteOutputs() {
return recursiveCteOutputs;
}

public boolean isAnalyzingRecursiveCteAnchorChild() {
return currentRecursiveCteName.isPresent() && recursiveCteOutputs.isEmpty();
}

/**
* Init memo with plan
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteRecursiveChild;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
Expand Down Expand Up @@ -218,6 +221,8 @@
import org.apache.doris.planner.PartitionSortNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.RecursiveCteNode;
import org.apache.doris.planner.RecursiveCteScanNode;
import org.apache.doris.planner.RepeatNode;
import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.planner.ResultSink;
Expand Down Expand Up @@ -1170,6 +1175,28 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT
return planFragment;
}

@Override
public PlanFragment visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan recursiveCteScan,
PlanTranslatorContext context) {
TableIf table = recursiveCteScan.getTable();
List<Slot> slots = ImmutableList.copyOf(recursiveCteScan.getOutput());
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context);

RecursiveCteScanNode scanNode = new RecursiveCteScanNode(table != null ? table.getName() : "",
context.nextPlanNodeId(), tupleDescriptor, context.getScanContext());
scanNode.setNereidsId(recursiveCteScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(recursiveCteScan.getId(), scanNode.getId());
Utils.execWithUncheckedException(scanNode::initScanRangeLocations);

translateRuntimeFilter(recursiveCteScan, scanNode, context);

context.addScanNode(scanNode, recursiveCteScan);
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, recursiveCteScan);
context.addPlanFragment(planFragment);
updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), recursiveCteScan);
return planFragment;
}

private List<Expr> translateToExprs(List<Expression> expressions, PlanTranslatorContext context) {
List<Expr> exprs = Lists.newArrayListWithCapacity(expressions.size());
for (Expression expression : expressions) {
Expand Down Expand Up @@ -2378,8 +2405,10 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
if (inputPlanNode instanceof OlapScanNode) {
((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet);
}
updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet,
requiredByProjectSlotIdSet, context);
if (!(inputPlanNode instanceof RecursiveCteScanNode)) {
updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet,
requiredByProjectSlotIdSet, context);
}
} else {
if (project.child() instanceof PhysicalDeferMaterializeTopN) {
inputFragment.setOutputExprs(allProjectionExprs);
Expand All @@ -2392,6 +2421,78 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
return inputFragment;
}

@Override
public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanTranslatorContext context) {
List<PlanFragment> childrenFragments = new ArrayList<>();
for (Plan plan : recursiveCte.children()) {
childrenFragments.add(plan.accept(this, context));
}

TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), null, context);
List<SlotDescriptor> outputSlotDescs = new ArrayList<>(setTuple.getSlots());

RecursiveCteNode recursiveCteNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(),
recursiveCte.getCteName(), recursiveCte.isUnionAll());
List<List<Expr>> distributeExprLists = getDistributeExprs(recursiveCte.children().toArray(new Plan[0]));
recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists);
recursiveCteNode.setNereidsId(recursiveCte.getId());
List<List<Expression>> resultExpressionLists = Lists.newArrayList();
context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), recursiveCteNode.getId());
for (List<SlotReference> regularChildrenOutput : recursiveCte.getRegularChildrenOutputs()) {
resultExpressionLists.add(new ArrayList<>(regularChildrenOutput));
}

for (PlanFragment childFragment : childrenFragments) {
recursiveCteNode.addChild(childFragment.getPlanRoot());
}

List<List<Expr>> materializedResultExprLists = Lists.newArrayList();
for (int i = 0; i < resultExpressionLists.size(); ++i) {
List<Expression> resultExpressionList = resultExpressionLists.get(i);
List<Expr> exprList = Lists.newArrayList();
Preconditions.checkState(resultExpressionList.size() == outputSlotDescs.size());
for (int j = 0; j < resultExpressionList.size(); ++j) {
exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), context));
// TODO: reconsider this, we may change nullable info in previous nereids rules not here.
outputSlotDescs.get(j)
.setIsNullable(outputSlotDescs.get(j).getIsNullable() || exprList.get(j).isNullable());
}
materializedResultExprLists.add(exprList);
}
recursiveCteNode.setMaterializedResultExprLists(materializedResultExprLists);
Preconditions.checkState(recursiveCteNode.getMaterializedResultExprLists().size()
== recursiveCteNode.getChildren().size());

PlanFragment recursiveCteFragment;
if (childrenFragments.isEmpty()) {
recursiveCteFragment = createPlanFragment(recursiveCteNode,
DataPartition.UNPARTITIONED, recursiveCte);
context.addPlanFragment(recursiveCteFragment);
} else {
int childrenSize = childrenFragments.size();
recursiveCteFragment = childrenFragments.get(childrenSize - 1);
for (int i = childrenSize - 2; i >= 0; i--) {
context.mergePlanFragment(childrenFragments.get(i), recursiveCteFragment);
for (PlanFragment child : childrenFragments.get(i).getChildren()) {
recursiveCteFragment.addChild(child);
}
}
setPlanRoot(recursiveCteFragment, recursiveCteNode, recursiveCte);
}

recursiveCteFragment.updateDataPartition(DataPartition.UNPARTITIONED);
recursiveCteFragment.setOutputPartition(DataPartition.UNPARTITIONED);

return recursiveCteFragment;
}

@Override
public PlanFragment visitPhysicalRecursiveCteRecursiveChild(
PhysicalRecursiveCteRecursiveChild<? extends Plan> recursiveChild,
PlanTranslatorContext context) {
return recursiveChild.child().accept(this, context);
}

/**
* Returns a new fragment with a UnionNode as its root. The data partition of the
* returned fragment and how the data of the child fragments is consumed depends on the
Expand Down
Loading
Loading