Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions hoptimator-k8s/src/test/resources/k8s-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ spec:
- CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_source` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10')
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
- CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole')
- INSERT INTO `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN`, `MEMBER_URN`) SELECT * FROM `ADS`.`AD_CLICKS_source`
- INSERT INTO `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN`, `MEMBER_URN`) SELECT `CAMPAIGN_URN`, `MEMBER_URN` FROM `ADS`.`AD_CLICKS_source`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand All @@ -218,7 +218,7 @@ spec:
- CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10')
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
- CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole')
- INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT * FROM `ADS`.`AD_CLICKS`
- INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `CAMPAIGN_URN` AS `PAGE_URN`, `MEMBER_URN` FROM `ADS`.`AD_CLICKS`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand All @@ -245,3 +245,24 @@ spec:
upgradeMode: stateless
state: running
!specify PAGE_VIEWS

create or replace materialized view PROFILE."MEMBERS$test" AS SELECT "PAGE_URN" AS "COMPANY_URN", "MEMBER_URN" FROM ADS.PAGE_VIEWS;
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: profile-database-members-test
spec:
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
- CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10')
- CREATE DATABASE IF NOT EXISTS `PROFILE` WITH ()
- CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='blackhole')
- INSERT INTO `PROFILE`.`MEMBERS` (`COMPANY_URN`, `MEMBER_URN`) SELECT `PAGE_URN` AS `COMPANY_URN`, `MEMBER_URN` FROM `ADS`.`PAGE_VIEWS`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
!specify MEMBERS
2 changes: 1 addition & 1 deletion hoptimator-kafka/src/test/resources/kafka-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ spec:
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k1'='v1', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
- CREATE DATABASE IF NOT EXISTS `KAFKA` WITH ()
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k2'='v2', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
- INSERT INTO `KAFKA`.`existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2`
- INSERT INTO `KAFKA`.`existing-topic-1` (`KEY`, `VALUE`) SELECT `KEY`, `VALUE` FROM `KAFKA`.`existing-topic-2`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 2
upgradeMode: stateless
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.SqlShuttle;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.Util;


/**
Expand Down Expand Up @@ -198,6 +197,20 @@ public void implement(SqlWriter w) {
if (node instanceof SqlSelect && ((SqlSelect) node).getSelectList() != null) {
SqlSelect select = (SqlSelect) node;
select.setSelectList((SqlNodeList) Objects.requireNonNull(select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)));
SqlNodeList selectList = select.getSelectList();

// Check if this is a SELECT * and replace with explicit columns
if (selectList.size() == 1 && selectList.get(0) instanceof SqlIdentifier) {
SqlIdentifier id = (SqlIdentifier) selectList.get(0);
if (id.isStar()) {
// Replace SELECT * with explicit column list
List<SqlNode> explicitColumns = new ArrayList<>();
for (RelDataTypeField field : relNode.getRowType().getFieldList()) {
explicitColumns.add(new SqlIdentifier(field.getName(), SqlParserPos.ZERO));
}
select.setSelectList(new SqlNodeList(explicitColumns, SqlParserPos.ZERO));
}
}
}
// Apply table name replacements if any
if (!tableNameReplacements.isEmpty()) {
Expand Down Expand Up @@ -366,6 +379,7 @@ public void implement(SqlWriter w) {
// Drops non-target columns, for use case: INSERT INTO (col1, col2) SELECT * FROM ...
private static RelNode dropFields(RelNode relNode, ImmutablePairList<Integer, String> targetFields) {
List<Integer> cols = new ArrayList<>();
List<String> aliases = new ArrayList<>();
List<String> targetFieldNames = targetFields.rightList();
List<RelDataTypeField> sourceFields = relNode.getRowType().getFieldList();

Expand All @@ -377,10 +391,11 @@ private static RelNode dropFields(RelNode relNode, ImmutablePairList<Integer, St
if (targetFieldNames.contains(field.getName())
&& !field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
cols.add(i);
aliases.add(field.getName());
}
}

return createForceProject(relNode, cols);
return createForceProject(relNode, cols, aliases);
}

// Otherwise (e.g., TableScan), the projection was optimized away.
Expand All @@ -391,16 +406,17 @@ private static RelNode dropFields(RelNode relNode, ImmutablePairList<Integer, St
RelDataTypeField field = sourceFields.get(fieldIndex);
if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
cols.add(fieldIndex);
aliases.add(targetFields.rightList().get(i));
}
}
}

return createForceProject(relNode, cols);
return createForceProject(relNode, cols, aliases);
}
}

static RelNode createForceProject(final RelNode child, final List<Integer> posList) {
return createForceProject(RelFactories.DEFAULT_PROJECT_FACTORY, child, posList);
static RelNode createForceProject(final RelNode child, final List<Integer> posList, final List<String> aliases) {
return createForceProject(RelFactories.DEFAULT_PROJECT_FACTORY, child, posList, aliases);
}

// By default, "projectNamed" will try to provide an optimization by not creating a new project if the
Expand All @@ -417,9 +433,7 @@ static RelNode createForceProject(final RelNode child, final List<Integer> posLi
// This implementation is largely a duplicate of RelOptUtil.createProject(relNode, cols); which does not allow
// overriding the "force" argument of "projectNamed".
static RelNode createForceProject(final RelFactories.ProjectFactory factory,
final RelNode child, final List<Integer> posList) {
RelDataType rowType = child.getRowType();
final List<String> fieldNames = rowType.getFieldNames();
final RelNode child, final List<Integer> posList, final List<String> aliases) {
final RelBuilder relBuilder =
RelBuilder.proto(factory).create(child.getCluster(), null);
final List<RexNode> exprs = new AbstractList<>() {
Expand All @@ -434,10 +448,9 @@ public RexNode get(int index) {
return relBuilder.getRexBuilder().makeInputRef(child, pos);
}
};
final List<String> names = Util.select(fieldNames, posList);
return relBuilder
.push(child)
.projectNamed(exprs, names, true)
.projectNamed(exprs, aliases, true)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests for ScriptImplementor suffix functionality to handle source/sink table name collisions.
* Tests for ScriptImplementor
*/
public class ScriptImplementorSuffixTest {
public class ScriptImplementorTest {
@Test
public void testConnectorWithSuffix() {
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
Expand Down Expand Up @@ -171,4 +173,60 @@ public void testFullPipelineWithCollision() {
assertTrue(sql.contains("INSERT INTO `ADS`.`AD_CLICKS_sink`"),
"Should insert into sink table. Got: " + sql);
}

@Test
public void testExplicitColumnEnumeration() {
// Test for Flink 1.20 regression where INSERT with SELECT * fails
// when sink has more columns than source
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);

// Source table: 2 columns
RelDataType sourceType = typeFactory.builder()
.add("KEY_source", typeFactory.createSqlType(SqlTypeName.VARCHAR))
.add("nestedValue_source", typeFactory.builder()
.add("innerInt_source", typeFactory.createSqlType(SqlTypeName.INTEGER))
.add("innerArray_source", typeFactory.createArrayType(
typeFactory.createSqlType(SqlTypeName.INTEGER), -1))
.build())
.build();

// Create schema with source table
SchemaPlus rootSchema = Frameworks.createRootSchema(true);
SchemaPlus sourceSchema = rootSchema.add("source", new AbstractSchema());
sourceSchema.add("table", new AbstractTable() {
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return sourceType;
}
});

RelBuilder builder = RelBuilder.create(
Frameworks.newConfigBuilder()
.defaultSchema(rootSchema)
.build());

// Create a query: SELECT KEY_source as KEY_sink, nestedValue_source as nestedValue_sink FROM source
// This simulates the materialized view query
RelNode query = builder
.scan("source", "table")
.project(
builder.field("KEY_source"),
builder.field("nestedValue_source"))
.build();

// Target fields for INSERT - only the 2 columns we're actually inserting
ImmutablePairList<Integer, String> targetFields = ImmutablePairList.copyOf(Arrays.asList(
new AbstractMap.SimpleEntry<>(0, "KEY_sink"),
new AbstractMap.SimpleEntry<>(1, "nestedValue_sink")
));

String sql = ScriptImplementor.empty()
.insert(null, "sink", "mypipeline", null, query, targetFields)
.sql();

assertEquals(
"INSERT INTO `sink`.`mypipeline` (`KEY_sink`, `nestedValue_sink`) "
+ "SELECT `KEY_source` AS `KEY_sink`, `nestedValue_source` AS `nestedValue_sink` "
+ "FROM `source`.`table`;", sql);
}
}