From 1c42dc7dfee608bb89362d56785228ac03f4db3f Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Fri, 6 Feb 2026 10:16:35 -0500 Subject: [PATCH 1/2] Force ScriptImplementor to explicitly name columns in SELECT --- hoptimator-k8s/src/test/resources/k8s-ddl.id | 25 +++++++- .../util/planner/ScriptImplementor.java | 33 +++++++--- ...ixTest.java => ScriptImplementorTest.java} | 62 ++++++++++++++++++- 3 files changed, 106 insertions(+), 14 deletions(-) rename hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/{ScriptImplementorSuffixTest.java => ScriptImplementorTest.java} (73%) diff --git a/hoptimator-k8s/src/test/resources/k8s-ddl.id b/hoptimator-k8s/src/test/resources/k8s-ddl.id index 7f8b0201..251a8bb5 100644 --- a/hoptimator-k8s/src/test/resources/k8s-ddl.id +++ b/hoptimator-k8s/src/test/resources/k8s-ddl.id @@ -197,7 +197,7 @@ spec: - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_source` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') - CREATE DATABASE IF NOT EXISTS `ADS` WITH () - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') - - INSERT INTO `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN`, `MEMBER_URN`) SELECT * FROM `ADS`.`AD_CLICKS_source` + - INSERT INTO `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN`, `MEMBER_URN`) SELECT `CAMPAIGN_URN`, `MEMBER_URN` FROM `ADS`.`AD_CLICKS_source` jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless @@ -218,7 +218,7 @@ spec: - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') - CREATE DATABASE IF NOT EXISTS `ADS` WITH () - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') - - INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT * FROM `ADS`.`AD_CLICKS` + - INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `CAMPAIGN_URN` AS `PAGE_URN`, `MEMBER_URN` FROM `ADS`.`AD_CLICKS` jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless @@ -245,3 +245,24 @@ spec: upgradeMode: stateless state: running !specify PAGE_VIEWS + +create or replace materialized view PROFILE."MEMBERS$test" AS SELECT "PAGE_URN" AS "COMPANY_URN", "MEMBER_URN" FROM ADS.PAGE_VIEWS; +apiVersion: flink.apache.org/v1beta1 +kind: FlinkSessionJob +metadata: + name: profile-database-members-test +spec: + deploymentName: basic-session-deployment + job: + entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner + args: + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') + - CREATE DATABASE IF NOT EXISTS `PROFILE` WITH () + - CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='blackhole') + - INSERT INTO `PROFILE`.`MEMBERS` (`COMPANY_URN`, `MEMBER_URN`) SELECT `PAGE_URN` AS `COMPANY_URN`, `MEMBER_URN` FROM `ADS`.`PAGE_VIEWS` + jarURI: file:///opt/hoptimator-flink-runner.jar + parallelism: 1 + upgradeMode: stateless + state: running +!specify MEMBERS diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index a800566a..33fab61b 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -42,7 +42,6 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.util.SqlShuttle; import org.apache.calcite.tools.RelBuilder; -import org.apache.calcite.util.Util; /** @@ -198,6 +197,20 @@ public void implement(SqlWriter w) { if (node instanceof SqlSelect && ((SqlSelect) node).getSelectList() != null) { SqlSelect select = (SqlSelect) node; select.setSelectList((SqlNodeList) Objects.requireNonNull(select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR))); + SqlNodeList selectList = select.getSelectList(); + + // Check if this is a SELECT * and replace with explicit columns + if (selectList.size() == 1 && selectList.get(0) instanceof SqlIdentifier) { + SqlIdentifier id = (SqlIdentifier) selectList.get(0); + if (id.isStar()) { + // Replace SELECT * with explicit column list + List explicitColumns = new ArrayList<>(); + for (RelDataTypeField field : relNode.getRowType().getFieldList()) { + explicitColumns.add(new SqlIdentifier(field.getName(), SqlParserPos.ZERO)); + } + select.setSelectList(new SqlNodeList(explicitColumns, SqlParserPos.ZERO)); + } + } } // Apply table name replacements if any if (!tableNameReplacements.isEmpty()) { @@ -366,6 +379,7 @@ public void implement(SqlWriter w) { // Drops non-target columns, for use case: INSERT INTO (col1, col2) SELECT * FROM ... private static RelNode dropFields(RelNode relNode, ImmutablePairList targetFields) { List cols = new ArrayList<>(); + List aliases = new ArrayList<>(); List targetFieldNames = targetFields.rightList(); List sourceFields = relNode.getRowType().getFieldList(); @@ -377,10 +391,11 @@ private static RelNode dropFields(RelNode relNode, ImmutablePairList posList) { - return createForceProject(RelFactories.DEFAULT_PROJECT_FACTORY, child, posList); + static RelNode createForceProject(final RelNode child, final List posList, final List aliases) { + return createForceProject(RelFactories.DEFAULT_PROJECT_FACTORY, child, posList, aliases); } // By default, "projectNamed" will try to provide an optimization by not creating a new project if the @@ -417,9 +433,7 @@ static RelNode createForceProject(final RelNode child, final List posLi // This implementation is largely a duplicate of RelOptUtil.createProject(relNode, cols); which does not allow // overriding the "force" argument of "projectNamed". static RelNode createForceProject(final RelFactories.ProjectFactory factory, - final RelNode child, final List posList) { - RelDataType rowType = child.getRowType(); - final List fieldNames = rowType.getFieldNames(); + final RelNode child, final List posList, final List aliases) { final RelBuilder relBuilder = RelBuilder.proto(factory).create(child.getCluster(), null); final List exprs = new AbstractList<>() { @@ -434,10 +448,9 @@ public RexNode get(int index) { return relBuilder.getRexBuilder().makeInputRef(child, pos); } }; - final List names = Util.select(fieldNames, posList); return relBuilder .push(child) - .projectNamed(exprs, names, true) + .projectNamed(exprs, aliases, true) .build(); } diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorSuffixTest.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorTest.java similarity index 73% rename from hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorSuffixTest.java rename to hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorTest.java index 3ce50987..9d7e1f5b 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorSuffixTest.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorTest.java @@ -18,12 +18,14 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; /** - * Tests for ScriptImplementor suffix functionality to handle source/sink table name collisions. + * Tests for ScriptImplementor */ -public class ScriptImplementorSuffixTest { +public class ScriptImplementorTest { @Test public void testConnectorWithSuffix() { RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); @@ -171,4 +173,60 @@ public void testFullPipelineWithCollision() { assertTrue(sql.contains("INSERT INTO `ADS`.`AD_CLICKS_sink`"), "Should insert into sink table. Got: " + sql); } + + @Test + public void testExplicitColumnEnumeration() { + // Test for Flink 1.20 regression where INSERT with SELECT * fails + // when sink has more columns than source + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + + // Source table: 2 columns + RelDataType sourceType = typeFactory.builder() + .add("KEY_source", typeFactory.createSqlType(SqlTypeName.VARCHAR)) + .add("nestedValue_source", typeFactory.builder() + .add("innerInt_source", typeFactory.createSqlType(SqlTypeName.INTEGER)) + .add("innerArray_source", typeFactory.createArrayType( + typeFactory.createSqlType(SqlTypeName.INTEGER), -1)) + .build()) + .build(); + + // Create schema with source table + SchemaPlus rootSchema = Frameworks.createRootSchema(true); + SchemaPlus sourceSchema = rootSchema.add("source", new AbstractSchema()); + sourceSchema.add("table", new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return sourceType; + } + }); + + RelBuilder builder = RelBuilder.create( + Frameworks.newConfigBuilder() + .defaultSchema(rootSchema) + .build()); + + // Create a query: SELECT KEY_source as KEY_sink, nestedValue_source as nestedValue_sink FROM source + // This simulates the materialized view query + RelNode query = builder + .scan("source", "table") + .project( + builder.field("KEY_source"), + builder.field("nestedValue_source")) + .build(); + + // Target fields for INSERT - only the 2 columns we're actually inserting + ImmutablePairList targetFields = ImmutablePairList.copyOf(Arrays.asList( + new AbstractMap.SimpleEntry<>(0, "KEY_sink"), + new AbstractMap.SimpleEntry<>(1, "nestedValue_sink") + )); + + String sql = ScriptImplementor.empty() + .insert(null, "sink", "mypipeline", null, query, targetFields) + .sql(); + + assertEquals( + "INSERT INTO `sink`.`mypipeline` (`KEY_sink`, `nestedValue_sink`) " + + "SELECT `KEY_source` AS `KEY_sink`, `nestedValue_source` AS `nestedValue_sink` " + + "FROM `source`.`table`;", sql); + } } From 2e9644fd03f8b0edf76fe3a59ecd27fe342940fb Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Fri, 6 Feb 2026 11:33:56 -0500 Subject: [PATCH 2/2] Fix missed test --- hoptimator-kafka/src/test/resources/kafka-ddl.id | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hoptimator-kafka/src/test/resources/kafka-ddl.id b/hoptimator-kafka/src/test/resources/kafka-ddl.id index 7b39a2e8..f3c559df 100644 --- a/hoptimator-kafka/src/test/resources/kafka-ddl.id +++ b/hoptimator-kafka/src/test/resources/kafka-ddl.id @@ -35,7 +35,7 @@ spec: - CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k1'='v1', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json') - CREATE DATABASE IF NOT EXISTS `KAFKA` WITH () - CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k2'='v2', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json') - - INSERT INTO `KAFKA`.`existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2` + - INSERT INTO `KAFKA`.`existing-topic-1` (`KEY`, `VALUE`) SELECT `KEY`, `VALUE` FROM `KAFKA`.`existing-topic-2` jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 2 upgradeMode: stateless