From f1096c0f4f0ac29599e0db0b87bb22ab4740cc8d Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Thu, 12 Feb 2026 13:12:15 -0500 Subject: [PATCH 1/2] Migrate Subscription model to use new ScriptImplementor --- .../linkedin/hoptimator/catalog/DataType.java | 1 + .../linkedin/hoptimator/catalog/HopTable.java | 3 +- .../hoptimator/catalog/ScriptImplementor.java | 413 ------------------ .../hoptimator/catalog/TableFactory.java | 3 +- .../hoptimator/planner/PipelineRel.java | 8 +- .../util/planner/ScriptImplementor.java | 31 +- 6 files changed, 35 insertions(+), 424 deletions(-) delete mode 100644 hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java index b86ea582..4ea85b3f 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java @@ -3,6 +3,7 @@ import java.util.Collections; import java.util.stream.Collectors; +import com.linkedin.hoptimator.util.planner.ScriptImplementor; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java index 7126fb89..ab2d2cb3 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java @@ -4,6 +4,7 @@ import java.util.Collections; import java.util.Map; +import com.linkedin.hoptimator.util.planner.ScriptImplementor; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelNode; @@ -46,7 +47,7 @@ public HopTable(String database, String name, RelDataType rowType, Collection readResources, Collection writeResources, Map connectorConfig) { this(database, name, rowType, readResources, writeResources, - new ScriptImplementor.ConnectorImplementor(database, name, rowType, connectorConfig)); + new ScriptImplementor.ConnectorImplementor(null, database, name, null, rowType, connectorConfig)); } /** Convenience constructor for HopTables that only need a connector config. */ diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java deleted file mode 100644 index 2b83a718..00000000 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java +++ /dev/null @@ -1,413 +0,0 @@ -package com.linkedin.hoptimator.catalog; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.rel2sql.RelToSqlConverter; -import org.apache.calcite.rel.rel2sql.SqlImplementor; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.sql.SqlBasicTypeNameSpec; -import org.apache.calcite.sql.SqlCall; -import org.apache.calcite.sql.SqlCollectionTypeNameSpec; -import org.apache.calcite.sql.SqlDataTypeSpec; -import org.apache.calcite.sql.SqlDialect; -import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlRowTypeNameSpec; -import org.apache.calcite.sql.SqlSelect; -import org.apache.calcite.sql.SqlWriter; -import org.apache.calcite.sql.SqlWriterConfig; -import org.apache.calcite.sql.dialect.AnsiSqlDialect; -import org.apache.calcite.sql.fun.SqlRowOperator; -import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.pretty.SqlPrettyWriter; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.util.SqlShuttle; - - -/** - * An abstract way to write SQL scripts. - *

- * This enables Adapters to implement themselves without being tied to a - * specific compute engine or SQL dialect. - *

- * To generate a specific statement, implement this interface, or use one - * of the provided implementations, e.g. `QueryImplementor`. - *

- * To generate a script (more than one statement), start with `empty()` - * and append subsequent ScriptImplementors with `with(...)` etc. - *

- * e.g. - *

- * ScriptImplementor.empty() - * .database(db) - * .connector(db, name, rowType, configs) - *

- * ... would produce something like - *

- * CREATE DATABASE IF NOT EXIST `FOO`; - * CREATE TABLE `BAR` (NAME VARCHAR) WITH ('key1' = 'value1'); - */ -public interface ScriptImplementor { - - /** Writes arbitrary DDL/SQL */ - void implement(SqlWriter writer); - - /** Construct an empty ScriptImplementor */ - static ScriptImplementor empty() { - return w -> { - }; - } - - /** Append a subsequent ScriptImplementor */ - default ScriptImplementor with(ScriptImplementor next) { - return w -> { - implement(w); - next.implement(w); - }; - } - - /** Append a query */ - default ScriptImplementor query(RelNode relNode) { - return with(new QueryImplementor(relNode)); - } - - /** Append a connector definition, e.g. `CREATE TABLE ... WITH (...)` */ - default ScriptImplementor connector(String database, String name, RelDataType rowType, - Map connectorConfig) { - return with(new ConnectorImplementor(database, name, rowType, connectorConfig)); - } - - /** Append a database definition, e.g. `CREATE DATABASE ...` */ - default ScriptImplementor database(String database) { - return with(new DatabaseImplementor(database)); - } - - /** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */ - default ScriptImplementor insert(String database, String name, RelNode relNode) { - return with(new InsertImplementor(database, name, relNode)); - } - - /** Render the script as DDL/SQL in the default dialect */ - default String sql() { - return sql(AnsiSqlDialect.DEFAULT); - } - - /** Render the script as DDL/SQL in the given dialect */ - default String sql(SqlDialect dialect) { - SqlWriter w = new SqlPrettyWriter(SqlWriterConfig.of().withDialect(dialect)); - implement(w); - return w.toSqlString().getSql().replaceAll("\\n", " ").replaceAll(";", ";\n").trim(); - } - - /** Implements an arbitrary RelNode as a statement */ - class StatementImplementor implements ScriptImplementor { - private final RelNode relNode; - - public StatementImplementor(RelNode relNode) { - this.relNode = relNode; - } - - @Override - public void implement(SqlWriter w) { - RelToSqlConverter converter = new RelToSqlConverter(w.getDialect()); - w.literal(converter.visitRoot(relNode).asStatement().toSqlString(w.getDialect()).getSql()); - w.literal(";"); - } - } - - /** Implements an arbitrary RelNode as a query */ - class QueryImplementor implements ScriptImplementor { - private final RelNode relNode; - - // A `ROW(...)` operator which will unparse as just `(...)`. - private static final SqlRowOperator IMPLIED_ROW_OPERATOR = new SqlRowOperator(""); // empty string name - - // a shuttle that replaces `Row(...)` with just `(...)` - private static final SqlShuttle REMOVE_ROW_CONSTRUCTOR = new SqlShuttle() { - @Override - public SqlNode visit(SqlCall call) { - List operands = call.getOperandList().stream().map(x -> x.accept(this)).collect(Collectors.toList()); - if ((call.getKind() == SqlKind.ROW || call.getKind() == SqlKind.COLUMN_LIST - || call.getOperator() instanceof SqlRowOperator) && operands.size() > 1) { - return IMPLIED_ROW_OPERATOR.createCall(call.getParserPosition(), operands); - } else { - return call.getOperator().createCall(call.getParserPosition(), operands); - } - } - }; - - public QueryImplementor(RelNode relNode) { - this.relNode = relNode; - } - - @Override - public void implement(SqlWriter w) { - RelToSqlConverter converter = new RelToSqlConverter(w.getDialect()); - SqlImplementor.Result result = converter.visitRoot(relNode); - SqlSelect select = result.asSelect(); - if (select.getSelectList() != null) { - select.setSelectList((SqlNodeList) Objects.requireNonNull(select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR))); - } - w.literal(select.toSqlString(w.getDialect()).getSql()); - } - } - - /** - * Implements a CREATE TABLE...WITH... DDL statement. - *

- * N.B. the following magic: - * - field 'PRIMARY_KEY' is treated as a PRIMARY KEY - * - NULL fields are promoted to BYTES - */ - class ConnectorImplementor implements ScriptImplementor { - private final String database; - private final String name; - private final RelDataType rowType; - private final Map connectorConfig; - - public ConnectorImplementor(String database, String name, RelDataType rowType, - Map connectorConfig) { - this.database = database; - this.name = name; - this.rowType = rowType; - this.connectorConfig = connectorConfig; - } - - @Override - public void implement(SqlWriter w) { - w.keyword("CREATE TABLE IF NOT EXISTS"); - (new CompoundIdentifierImplementor(database, name)).implement(w); - SqlWriter.Frame frame1 = w.startList("(", ")"); - (new RowTypeSpecImplementor(rowType)).implement(w); - if (rowType.getField("PRIMARY_KEY", true, false) != null) { - w.sep(","); - w.literal("PRIMARY KEY (PRIMARY_KEY) NOT ENFORCED"); - } - w.endList(frame1); - // TODO support PARTITIONED BY for Tables that support it - w.keyword("WITH"); - SqlWriter.Frame frame2 = w.startList("(", ")"); - (new ConfigSpecImplementor(connectorConfig)).implement(w); - w.endList(frame2); - w.literal(";"); - } - } - - /** Implements a CREATE TEMPORARY VIEW DDL statement */ - class ViewImplementor implements ScriptImplementor { - private final String database; - private final String name; - private final RelNode relNode; - - public ViewImplementor(String database, String name, RelNode relNode) { - this.database = database; - this.name = name; - this.relNode = relNode; - } - - @Override - public void implement(SqlWriter w) { - w.keyword("CREATE TEMPORARY VIEW"); - (new CompoundIdentifierImplementor(database, name)).implement(w); - w.keyword("AS"); - (new QueryImplementor(relNode)).implement(w); - w.literal(";"); - } - } - - /** Implements an INSERT INTO statement. - *

- * N.B. the following magic: - * - NULL columns (e.g. `NULL AS KEY`) are elided from the pipeline - *

- * */ - class InsertImplementor implements ScriptImplementor { - private final String database; - private final String name; - private final RelNode relNode; - - public InsertImplementor(String database, String name, RelNode relNode) { - this.database = database; - this.name = name; - this.relNode = relNode; - } - - @Override - public void implement(SqlWriter w) { - w.keyword("INSERT INTO"); - (new CompoundIdentifierImplementor(database, name)).implement(w); - SqlWriter.Frame frame1 = w.startList("(", ")"); - RelNode project = dropNullFields(relNode); - (new ColumnListImplementor(project.getRowType())).implement(w); - w.endList(frame1); - (new QueryImplementor(project)).implement(w); - w.literal(";"); - } - - private static RelNode dropNullFields(RelNode relNode) { - List cols = new ArrayList<>(); - int i = 0; - for (RelDataTypeField field : relNode.getRowType().getFieldList()) { - if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) { - cols.add(i); - } - i++; - } - return RelOptUtil.createProject(relNode, cols); - } - } - - /** Implements a CREATE DATABASE IF NOT EXISTS statement */ - class DatabaseImplementor implements ScriptImplementor { - private final String database; - - public DatabaseImplementor(String database) { - this.database = database; - } - - @Override - public void implement(SqlWriter w) { - w.keyword("CREATE DATABASE IF NOT EXISTS"); - w.identifier(database, true); - w.keyword("WITH"); - SqlWriter.Frame parens = w.startList("(", ")"); - w.endList(parens); - w.literal(";"); - } - } - - /** Implements an identifier like TRACKING.'PageViewEvent' */ - class CompoundIdentifierImplementor implements ScriptImplementor { - private final String database; - private final String name; - - public CompoundIdentifierImplementor(String database, String name) { - this.database = database; - this.name = name; - } - - @Override - public void implement(SqlWriter w) { - SqlIdentifier identifier = new SqlIdentifier(Arrays.asList(database, name), SqlParserPos.ZERO); - identifier.unparse(w, 0, 0); - } - } - - /** Implements row type specs, e.g. `NAME VARCHAR(20), AGE INTEGER`. - *

- * N.B. the following magic: - * - NULL fields are promoted to BYTES - */ - class RowTypeSpecImplementor implements ScriptImplementor { - private final RelDataType dataType; - - public RowTypeSpecImplementor(RelDataType dataType) { - this.dataType = dataType; - } - - @Override - public void implement(SqlWriter w) { - List fieldNames = dataType.getFieldList() - .stream() - .map(RelDataTypeField::getName) - .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) - .collect(Collectors.toList()); - List fieldTypes = - dataType.getFieldList().stream().map(RelDataTypeField::getType).map(RowTypeSpecImplementor::toSpec).collect(Collectors.toList()); - for (int i = 0; i < fieldNames.size(); i++) { - w.sep(","); - fieldNames.get(i).unparse(w, 0, 0); - if (fieldTypes.get(i).getTypeName().getSimple().equals("NULL")) { - w.literal("BYTES"); // promote NULL fields to BYTES - } else { - fieldTypes.get(i).unparse(w, 0, 0); - } - } - } - - private static SqlDataTypeSpec toSpec(RelDataType dataType) { - if (dataType.isStruct()) { - List fieldNames = dataType.getFieldList() - .stream() - .map(RelDataTypeField::getName) - .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) - .collect(Collectors.toList()); - List fieldTypes = - dataType.getFieldList().stream().map(RelDataTypeField::getType).map(RowTypeSpecImplementor::toSpec).collect(Collectors.toList()); - return maybeNullable(dataType, - new SqlDataTypeSpec(new SqlRowTypeNameSpec(SqlParserPos.ZERO, fieldNames, fieldTypes), SqlParserPos.ZERO)); - } - RelDataType componentType = dataType.getComponentType(); - if (componentType != null) { - return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec( - new SqlBasicTypeNameSpec(componentType.getSqlTypeName(), SqlParserPos.ZERO), - dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO)); - } else { - return maybeNullable(dataType, - new SqlDataTypeSpec(new SqlBasicTypeNameSpec(dataType.getSqlTypeName(), SqlParserPos.ZERO), - SqlParserPos.ZERO)); - } - } - - private static SqlDataTypeSpec maybeNullable(RelDataType dataType, SqlDataTypeSpec spec) { - if (!dataType.isNullable()) { - return spec.withNullable(false); - } else { - // we don't want "VARCHAR NULL", only "VARCHAR NOT NULL" - return spec; - } - } - } - - /** Implements column lists, e.g. `NAME, AGE` */ - class ColumnListImplementor implements ScriptImplementor { - private final List fields; - - public ColumnListImplementor(RelDataType dataType) { - this(dataType.getFieldList()); - } - - public ColumnListImplementor(List fields) { - this.fields = fields; - } - - @Override - public void implement(SqlWriter w) { - List fieldNames = fields.stream() - .map(RelDataTypeField::getName) - .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) - .collect(Collectors.toList()); - for (SqlIdentifier fieldName : fieldNames) { - w.sep(","); - fieldName.unparse(w, 0, 0); - } - } - } - - /** Implements Flink's `('key'='value', ...)` clause */ - class ConfigSpecImplementor implements ScriptImplementor { - private final Map config; - - public ConfigSpecImplementor(Map config) { - this.config = config; - } - - @Override - public void implement(SqlWriter w) { - config.forEach((k, v) -> { - w.sep(","); - w.literal("'" + k + "'='" + v + "'"); - }); - } - } -} diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableFactory.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableFactory.java index b1f7d13d..2656189e 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableFactory.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableFactory.java @@ -2,6 +2,7 @@ import java.util.Collections; +import com.linkedin.hoptimator.util.planner.ScriptImplementor; import org.apache.calcite.rel.type.RelDataType; @@ -37,7 +38,7 @@ public ConnectorFactory(ConfigProvider configProvider) { public HopTable table(String database, String name, RelDataType rowType) { return new HopTable(database, name, rowType, resourceProvider.readResources(name), resourceProvider.writeResources(name), - ScriptImplementor.empty().connector(database, name, rowType, configProvider.config(name))); + ScriptImplementor.empty().connector(null, database, name, rowType, configProvider.config(name))); } } } diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java index 27de5b6a..d7c0727b 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java @@ -15,7 +15,7 @@ import com.linkedin.hoptimator.catalog.HopTable; import com.linkedin.hoptimator.catalog.Resource; -import com.linkedin.hoptimator.catalog.ScriptImplementor; +import com.linkedin.hoptimator.util.planner.ScriptImplementor; /** @@ -45,7 +45,7 @@ public interface PipelineRel extends RelNode { class Implementor { private final RelNode relNode; private final List resources = new ArrayList<>(); - private ScriptImplementor script = ScriptImplementor.empty().database("PIPELINE"); + private ScriptImplementor script = ScriptImplementor.empty().database(null, "PIPELINE"); public Implementor(RelNode relNode) { this.relNode = relNode; @@ -79,12 +79,12 @@ public ScriptImplementor query() { public ScriptImplementor insertInto(HopTable sink) { RelOptUtil.eq(sink.name(), sink.rowType(), "subscription", rowType(), Litmus.THROW); RelNode castRel = RelOptUtil.createCastRel(relNode, sink.rowType(), true); - return script.database(sink.database()).with(sink).insert(sink.database(), sink.name(), castRel); + return script.database(null, sink.database()).with(sink).insert(null, sink.database(), sink.name(), castRel); } /** Add any resources: SQL, DDL, etc. required to access the table. */ public void implement(HopTable table) { - script = script.database(table.database()).with(table); + script = script.database(null, table.database()).with(table); table.readResources().forEach(this::resource); } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 33fab61b..45c73881 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -11,6 +11,8 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; + +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.RelFactories; @@ -115,13 +117,19 @@ default ScriptImplementor database(@Nullable String catalog, String database) { } /** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */ - default ScriptImplementor insert(@Nullable String catalog, String schema, String table, RelNode relNode, ImmutablePairList targetFields) { + default ScriptImplementor insert(@Nullable String catalog, String schema, String table, RelNode relNode) { + return insert(catalog, schema, table, null, relNode, null, Collections.emptyMap()); + } + + /** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */ + default ScriptImplementor insert(@Nullable String catalog, String schema, String table, RelNode relNode, + @Nullable ImmutablePairList targetFields) { return insert(catalog, schema, table, null, relNode, targetFields, Collections.emptyMap()); } /** Append an insert statement with an optional suffix for the target table, e.g. `INSERT INTO ... SELECT ...` */ default ScriptImplementor insert(@Nullable String catalog, String schema, String table, @Nullable String suffix, RelNode relNode, - ImmutablePairList targetFields) { + @Nullable ImmutablePairList targetFields) { return insert(catalog, schema, table, suffix, relNode, targetFields, Collections.emptyMap()); } @@ -345,7 +353,7 @@ class InsertImplementor implements ScriptImplementor { private final Map tableNameReplacements; public InsertImplementor(@Nullable String catalog, String schema, String table, @Nullable String suffix, RelNode relNode, - ImmutablePairList targetFields, Map tableNameReplacements) { + @Nullable ImmutablePairList targetFields, Map tableNameReplacements) { this.catalog = catalog; this.schema = schema; this.table = table; @@ -360,12 +368,12 @@ public void implement(SqlWriter w) { w.keyword("INSERT INTO"); String effectiveTable = suffix != null ? table + suffix : table; (new CompoundIdentifierImplementor(catalog, schema, effectiveTable)).implement(w); - RelNode project = dropFields(relNode, targetFields); + RelNode project = targetFields == null ? dropNullFields(relNode) : dropFields(relNode, targetFields); // If the relNode is a Project (or subclass), the field names should already match the sink. // Otherwise, like in SELECT * situations, the relNode fields will match the source field names, so // we need to directly use targetFields to map correctly. - if (relNode instanceof Project) { + if (relNode instanceof Project || targetFields == null) { (new ColumnListImplementor(project.getRowType().getFieldNames())).implement(w); } else { (new ColumnListImplementor(targetFields.rightList())).implement(w); @@ -375,6 +383,19 @@ public void implement(SqlWriter w) { w.literal(";"); } + // Drops NULL fields + private static RelNode dropNullFields(RelNode relNode) { + List cols = new ArrayList<>(); + int i = 0; + for (RelDataTypeField field : relNode.getRowType().getFieldList()) { + if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) { + cols.add(i); + } + i++; + } + return RelOptUtil.createProject(relNode, cols); + } + // Drops NULL fields // Drops non-target columns, for use case: INSERT INTO (col1, col2) SELECT * FROM ... private static RelNode dropFields(RelNode relNode, ImmutablePairList targetFields) { From 4a23e6ab134c992324fbc75a7dff5ffd5fe2f92c Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Thu, 12 Feb 2026 13:40:30 -0500 Subject: [PATCH 2/2] Fix build --- Makefile | 2 +- hoptimator-flink-runner/Dockerfile-flink-operator | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 6a32a072..771dbf89 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,7 @@ undeploy-demo: undeploy deploy-flink: deploy kubectl create namespace flink || echo "skipping" kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" - helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.11.0/ + helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.13.0/ helm upgrade --install --atomic --set webhook.create=false,image.pullPolicy=Never,image.repository=docker.io/library/hoptimator-flink-operator,image.tag=latest --set-json='watchNamespaces=["default","flink"]' flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator kubectl apply -f ./deploy/dev/flink-session-cluster.yaml kubectl apply -f ./deploy/dev/flink-sql-gateway.yaml diff --git a/hoptimator-flink-runner/Dockerfile-flink-operator b/hoptimator-flink-runner/Dockerfile-flink-operator index 0af88674..312ac446 100644 --- a/hoptimator-flink-runner/Dockerfile-flink-operator +++ b/hoptimator-flink-runner/Dockerfile-flink-operator @@ -1,2 +1,2 @@ -FROM apache/flink-kubernetes-operator:1.11.0 +FROM apache/flink-kubernetes-operator:1.13.0 COPY ./build/libs/hoptimator-flink-runner-all.jar /opt/hoptimator-flink-runner.jar