diff --git a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html index b8e5d606f21a6..45f9777d06384 100644 --- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html +++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html @@ -77,6 +77,12 @@ Boolean Enables a multi-way join operator for a chain of streaming joins. This operator processes multiple inputs at once, reducing the state size considerably by avoiding intermediate results. It supports regular INNER and LEFT joins.

Note: This is an experimental feature and not recommended for production just yet. The operator's internal implementation and state layout is subject to changes due to ongoing relevant optimizations. These might break savepoint compatibility across Flink versions and the goal is to have a stable version in the next release. + +
table.optimizer.multi-join.use-for-binary-join

Streaming + false + Boolean + Allows binary multi join (multi join with 2 inputs). +
table.optimizer.multiple-input-enabled

Batch true diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java index a02915f100293..1ffcfdf13dda9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java @@ -368,6 +368,16 @@ public class OptimizerConfigOptions { + "These might break savepoint compatibility across Flink versions and the goal is to have a stable version in the next release.") .build()); + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) + public static final ConfigOption TABLE_OPTIMIZER_USE_MULTI_JOIN_FOR_BINARY_JOIN = + key("table.optimizer.multi-join.use-for-binary-join") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text("Allows binary multi join (multi join with 2 inputs).") + .build()); + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) public static final ConfigOption TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED = key("table.optimizer.incremental-agg-enabled") diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/BinaryMultiJoinToJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/BinaryMultiJoinToJoinRule.java new file mode 100644 index 0000000000000..18503df741389 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/BinaryMultiJoinToJoinRule.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.rules.MultiJoin; +import org.apache.calcite.rel.rules.TransformationRule; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilderFactory; +import org.immutables.value.Value; + +/** Rule for transform {@link MultiJoin} with 2 inputs back to {@link Join}. */ +@Value.Enclosing +public class BinaryMultiJoinToJoinRule extends RelRule + implements TransformationRule { + + public static final BinaryMultiJoinToJoinRule INSTANCE = + BinaryMultiJoinToJoinRule.Config.DEFAULT.toRule(); + + /** Creates a JoinToMultiJoinRule. */ + public BinaryMultiJoinToJoinRule(BinaryMultiJoinToJoinRule.Config config) { + super(config); + } + + @Deprecated // to be removed before 2.0 + public BinaryMultiJoinToJoinRule(Class clazz) { + this(BinaryMultiJoinToJoinRule.Config.DEFAULT.withOperandFor(clazz)); + } + + @Deprecated // to be removed before 2.0 + public BinaryMultiJoinToJoinRule( + Class joinClass, RelBuilderFactory relBuilderFactory) { + this( + BinaryMultiJoinToJoinRule.Config.DEFAULT + .withRelBuilderFactory(relBuilderFactory) + .as(BinaryMultiJoinToJoinRule.Config.class) + .withOperandFor(joinClass)); + } + + /** This rule matches binary multi joins. */ + @Override + public boolean matches(RelOptRuleCall call) { + MultiJoin multiJoin = call.rel(0); + return isEnabledViaConfig(multiJoin) && multiJoin.getInputs().size() < 3; + } + + /** This rule transform binary multi joins to regular joins. */ + @Override + public void onMatch(RelOptRuleCall call) { + MultiJoin multiJoin = call.rel(0); + Preconditions.checkArgument( + multiJoin.getInputs().size() == 2, + "Only binary multi-join can be transformed into regular join."); + + RexNode condition = multiJoin.getOuterJoinConditions().get(1); + Join join = + LogicalJoin.create( + multiJoin.getInputs().get(0), + multiJoin.getInputs().get(1), + multiJoin.getHints(), + Preconditions.checkNotNull(condition), + multiJoin.getVariablesSet(), + multiJoin.getJoinTypes().get(1)); + call.transformTo(join); + } + + /** + * Checks if multi-join optimization and not use binary multi join option are enabled via + * configuration. + * + * @param multiJoin the multi join node + * @return true if TABLE_OPTIMIZER_MULTI_JOIN_ENABLED is set to true + */ + private boolean isEnabledViaConfig(MultiJoin multiJoin) { + final TableConfig tableConfig = ShortcutUtils.unwrapTableConfig(multiJoin); + return tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED) + && tableConfig.get( + OptimizerConfigOptions.TABLE_OPTIMIZER_USE_MULTI_JOIN_FOR_BINARY_JOIN); + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface Config extends RelRule.Config { + BinaryMultiJoinToJoinRule.Config DEFAULT = + ImmutableBinaryMultiJoinToJoinRule.Config.builder() + .build() + .as(BinaryMultiJoinToJoinRule.Config.class) + .withOperandFor(MultiJoin.class); + + @Override + default BinaryMultiJoinToJoinRule toRule() { + return new BinaryMultiJoinToJoinRule(this); + } + + /** Defines an operand tree for the given classes. */ + default BinaryMultiJoinToJoinRule.Config withOperandFor( + Class joinClass) { + return withOperandSupplier( + b0 -> + b0.operand(joinClass) + .inputs( + b1 -> b1.operand(RelNode.class).anyInputs(), + b2 -> b2.operand(RelNode.class).anyInputs())) + .as(BinaryMultiJoinToJoinRule.Config.class); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala index 8ec32c242fa48..433a0bbd65b97 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala @@ -36,6 +36,7 @@ object FlinkStreamProgram { val PREDICATE_PUSHDOWN = "predicate_pushdown" val JOIN_REORDER = "join_reorder" val MULTI_JOIN = "multi_join" + val BINARY_MULTI_JOIN = "binary_multi_join" val PROJECT_REWRITE = "project_rewrite" val LOGICAL = "logical" val LOGICAL_REWRITE = "logical_rewrite" @@ -248,6 +249,21 @@ object FlinkStreamProgram { .build() ) + chainedProgram.addLast( + BINARY_MULTI_JOIN, + FlinkGroupProgramBuilder + .newBuilder[StreamOptimizeContext] + .addProgram( + FlinkHepRuleSetProgramBuilder.newBuilder + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(FlinkStreamRuleSets.BINARY_MULTI_JOIN_RULES) + .build(), + "transform binary multi joins back into regular join" + ) + .build() + ) + // project rewrite chainedProgram.addLast( PROJECT_REWRITE, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index 05de12a6950a9..c9429e7c173a6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -248,6 +248,11 @@ object FlinkStreamRuleSets { JoinToMultiJoinRule.INSTANCE ) + val BINARY_MULTI_JOIN_RULES: RuleSet = RuleSets.ofList( + // transform binary MultiJoin back into regular join + BinaryMultiJoinToJoinRule.INSTANCE + ) + /** RuleSet to do logical optimize. This RuleSet is a sub-set of [[LOGICAL_OPT_RULES]]. */ private val LOGICAL_RULES: RuleSet = RuleSets.ofList( // scan optimization diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java index fc387fddeb01f..b4339d85a49d9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java @@ -209,6 +209,28 @@ void testThreeWayInnerJoinRelPlanNoCommonJoinKey() { + " ON u.cash = p.price"); } + @Test + @Tag("no-common-join-key") + void testThreeWayInnerJoinRelPlanNoCommonJoinKeyAllowedBinaryMultiJoin() { + util.getTableEnv() + .getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_USE_MULTI_JOIN_FOR_BINARY_JOIN, true); + util.verifyRelPlan( + "\nSELECT\n" + + " u.user_id,\n" + + " u.name,\n" + + " o.order_id,\n" + + " p.payment_id\n" + + "FROM Users u\n" + + "INNER JOIN Orders o\n" + + " ON u.user_id = o.user_id\n" + + "INNER JOIN Payments p\n" + + " ON u.cash = p.price"); + util.getTableEnv() + .getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_USE_MULTI_JOIN_FOR_BINARY_JOIN, false); + } + @Test void testThreeWayInnerJoinExecPlan() { util.verifyExecPlan( diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml index 3a5e888ca25c4..92900ccb9a8b1 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml @@ -2097,6 +2097,48 @@ Calc(select=[user_id, name, order_id, payment_id]) : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) +- Exchange(distribution=[hash[price]]) +- TableSourceScan(table=[[default_catalog, default_database, Payments, project=[payment_id, price], metadata=[]]], fields=[payment_id, price]) +]]> + + + + + + + + + + +