Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ deploy-demo: deploy
kubectl apply -f ./deploy/samples/tabletriggers.yaml
kubectl apply -f ./deploy/samples/crontrigger.yaml
kubectl apply -f ./deploy/samples/user-jobs.yaml
kubectl apply -f ./deploy/samples/table-provisioning-job-template.yaml

undeploy-demo: undeploy
kubectl delete -f ./deploy/samples/demodb.yaml || echo "skipping"
Expand Down
26 changes: 26 additions & 0 deletions deploy/samples/table-provisioning-job-template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Database
metadata:
name: iceberg
spec:
catalog: ICEBERG
url: jdbc:demodb://names=ads
dialect: Calcite

---

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: JobTemplate
metadata:
name: table-provisioning-job-template
spec:
databases:
- iceberg
yaml: |
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: TableProvisionJob
metadata:
name: {{name}}-provision-job
spec:
table: {{table}}
database: {{database}}
1 change: 1 addition & 0 deletions generate-models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ docker run \
-u "$(pwd)/hoptimator-k8s/src/main/resources/pipelines.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/subscriptions.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/tableprovisionjobs.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/tabletriggers.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/views.crd.yaml" \
Expand Down
2 changes: 1 addition & 1 deletion hoptimator-k8s/generate-models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ docker run \
--network host \
ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6 \
/generate.sh -o "$(pwd)" -n "" -p "com.linkedin.hoptimator.k8s" \
-u "$(pwd)/src/main/resources/tabletriggers.crd.yaml" \
-u "$(pwd)/src/main/resources/tableprovisionjobs.crd.yaml" \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually use this file? I think just the top level generate-models.sh is used. We can likely delete this file.

&& echo "done."
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList;
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableProvisionJob;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableProvisionJobList;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger;
Expand Down Expand Up @@ -58,6 +60,9 @@ public final class K8sApiEndpoints {
public static final K8sApiEndpoint<V1alpha1JobTemplate, V1alpha1JobTemplateList> JOB_TEMPLATES =
new K8sApiEndpoint<>("JobTemplate", "hoptimator.linkedin.com", "v1alpha1", "jobtemplates", false,
V1alpha1JobTemplate.class, V1alpha1JobTemplateList.class);
public static final K8sApiEndpoint<V1alpha1TableProvisionJob, V1alpha1TableProvisionJobList> TABLE_PROVISION_JOBS =
new K8sApiEndpoint<>("TableProvisionJob", "hoptimator.linkedin.com", "v1alpha1",
"tableprovisionjobs", false, V1alpha1TableProvisionJob.class, V1alpha1TableProvisionJobList.class);
public static final K8sApiEndpoint<V1alpha1TableTrigger, V1alpha1TableTriggerList> TABLE_TRIGGERS =
new K8sApiEndpoint<>("TableTrigger", "hoptimator.linkedin.com", "v1alpha1", "tabletriggers", false,
V1alpha1TableTrigger.class, V1alpha1TableTriggerList.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.linkedin.hoptimator.Deployable;
import com.linkedin.hoptimator.Deployer;
import com.linkedin.hoptimator.DeployerProvider;
import com.linkedin.hoptimator.Job;
import com.linkedin.hoptimator.MaterializedView;
import com.linkedin.hoptimator.Sink;
import com.linkedin.hoptimator.Source;
import com.linkedin.hoptimator.SqlDialect;
import com.linkedin.hoptimator.ThrowingFunction;
import com.linkedin.hoptimator.Trigger;
import com.linkedin.hoptimator.View;

Expand All @@ -29,13 +35,27 @@ public <T extends Deployable> Collection<Deployer> deployers(T obj, Connection c
list.add(new K8sJobDeployer((Job) obj, context));
} else if (obj instanceof Source) {
list.add(new K8sSourceDeployer((Source) obj, context));
if (!(obj instanceof Sink)) {
// Sets up a table provisioning job for the source.
// The job would be a no-op if the source is already provisioned.
list.add(new K8sJobDeployer(jobFromSource((Source) obj), context));
}
Comment on lines +38 to +42
Copy link
Collaborator

@jogrogan jogrogan Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this have unintended side effects of deploying flink jobs now if you issue a CREATE TABLE statement on something like kafka topic which already has a job template defined, but is really meant to only be used in a CREATE MATERIALIZED VIEW pipeline flow?

I don't think I'm fully understanding why we need this change, it seems like a general template type such that we don't need custom crds per DB like how we have kafkatopics.hoptimator.linkedin.com & acls.hoptimator.linkedin.com & others internally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there would be any unintended side effects here. When a Job is created via jobFromSource, the SQL-related lazy evals (sql, query, fieldMap) all return null. So for the Flink JobTemplate references like {{flinksql}}, the template engine resolves it to null and skips the entire template and hence existing Flink JobTemplates won't accidentally fire for CREATE TABLE statements.

And for the TableProvisionJob CRD, yes, it's intentionally kept generic. It follows the same pattern as SqlJob, which is a generic CRD reconciled by the CPO adapter. Similarly, TableProvisionJob is a generic "provision this table's backing infrastructure" CRD. Different operators can watch it and act based on the database like we can have multiple reconcilers for different databases for example, Iceberg Provisioner, or OpenHouse Provisioner. This way we won't need a separate custom CRD for each database type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see that the flink-beam-sqljob-template are matching as a side effect here. Need to fix that 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea these jobtemplates are already a little flimsy for this reason.

But still I'm not convinced we need a new job template here, first this doesn't feel like a "job", there is no ongoing processing here, it's a one-off creation. One-off creations should happen in the hot path via the Deployer pattern if possible to allow us to fail fast back to users. If absolutely not possible due to reason outside our control we should either have a custom crd for that database type and a reconciler (or several listening if needed), or maybe we can consider having a new generic crd type and have many different reconcilers that look and differentiate which ones to act on by some "database" type field.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see that the flink-beam-sqljob-template are matching as a side effect here. Need to fix that 🤔

It was a silly issue, for the flink-beam-sqljob-template, when fieldMap.apply() returns null, Java concatenation produces "'null'" instead of null, so the template engine renders it instead of skipping it. fixed it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But still I'm not convinced we need a new job template here, first this doesn't feel like a "job", there is no ongoing processing here, it's a one-off creation. One-off creations should happen in the hot path via the Deployer pattern if possible to allow us to fail fast back to users. If absolutely not possible due to reason outside our control we should either have a custom crd for that database type and a reconciler (or several listening if needed), or maybe we can consider having a new generic crd type and have many different reconcilers that look and differentiate which ones to act on by some "database" type field.

I thought of using a Job CRD and the reconciler pattern for cases where table provisioning involves asynchronous external systems that can’t complete within the CREATE TABLE request path. For example, provisioning an Iceberg table might require registering a Kafka topic in nearline and coordinating with an external ETL engine that ingests data into the Iceberg table. The ETL engine may acknowledge the request quickly, but the actual provisioning can take longer and hence, I chose the reconciler pattern to watch the status until the table is fully onboarded for consumption.

Copy link
Collaborator

@jogrogan jogrogan Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, would we always want a CREATE TABLE on an iceberg table to do that though?

I could totally see a world where we have CREATE TABLE, then maybe some trigger than ingests data from somewhere periodically or a spark job that that runs periodically, etc. and that would be specified using the "CREATE TRIGGER" syntax on the table or "CREATE MATERIALIZED VIEW my_iceberg_table$spark_backfill AS ... FROM ...", etc. depending on the user's intent.

My point being, a separate syntax makes sense to me beyond CREATE TABLE if a user wanted to specify "create this iceberg table and backfill it from some kafka topic" (CREATE MATERIALIZED VIEW I think should already cover this). I do understand that would likely need some new crd in this case that is different from the "sqljobs" template we have today but it wouldn't be driven by CREATE TABLE.

"CREATE TABLE AS ..." may also be relevant. This syntax is supported but we don't use it yet, to me this could semantically mean CREATE TABLE + CREATE TRIGGER (as a one-off)

} else if (obj instanceof Trigger) {
list.add(new K8sTriggerDeployer((Trigger) obj, context));
}

return list;
}

private static Job jobFromSource(Source source) {
Sink sink = new Sink(source.database(), source.path(), source.options());
Map<String, ThrowingFunction<SqlDialect, String>> lazyEvals = new HashMap<>();
lazyEvals.put("sql", dialect -> null);
lazyEvals.put("query", dialect -> null);
lazyEvals.put("fieldMap", dialect -> null);
return new Job(source.table(), Collections.emptySet(), sink, lazyEvals);
}

@Override
public int priority() {
return 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public List<String> specify() throws SQLException {
.with("flinksql", () -> sql.apply(SqlDialect.FLINK))
.with("flinkconfigs", properties)
.with("fieldMap", () -> "'" + fieldMap.apply(SqlDialect.ANSI) + "'")
.with("fieldMap", () -> {
String raw = fieldMap.apply(SqlDialect.ANSI);
return raw != null ? "'" + raw + "'" : null;
})
.with(properties);
List<String> templates = jobTemplateApi.list()
.stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Kubernetes
* No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
*
* The version of the OpenAPI document: v1.21.1
*
*
* NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
* https://openapi-generator.tech
* Do not edit the class manually.
*/


package com.linkedin.hoptimator.k8s.models;

import java.util.Objects;
import java.util.Arrays;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.annotations.SerializedName;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableProvisionJobSpec;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableProvisionJobStatus;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.IOException;

/**
* Job to provision backing infrastructure for a table.
*/
@ApiModel(description = "Job to provision backing infrastructure for a table.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-03-13T23:58:00.641Z[Etc/UTC]")
public class V1alpha1TableProvisionJob implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
private String apiVersion;

public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
private String kind;

public static final String SERIALIZED_NAME_METADATA = "metadata";
@SerializedName(SERIALIZED_NAME_METADATA)
private V1ObjectMeta metadata = null;

public static final String SERIALIZED_NAME_SPEC = "spec";
@SerializedName(SERIALIZED_NAME_SPEC)
private V1alpha1TableProvisionJobSpec spec;

public static final String SERIALIZED_NAME_STATUS = "status";
@SerializedName(SERIALIZED_NAME_STATUS)
private V1alpha1TableProvisionJobStatus status;


public V1alpha1TableProvisionJob apiVersion(String apiVersion) {

this.apiVersion = apiVersion;
return this;
}

/**
* APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
* @return apiVersion
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources")

public String getApiVersion() {
return apiVersion;
}


public void setApiVersion(String apiVersion) {
this.apiVersion = apiVersion;
}


public V1alpha1TableProvisionJob kind(String kind) {

this.kind = kind;
return this;
}

/**
* Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
* @return kind
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds")

public String getKind() {
return kind;
}


public void setKind(String kind) {
this.kind = kind;
}


public V1alpha1TableProvisionJob metadata(V1ObjectMeta metadata) {

this.metadata = metadata;
return this;
}

/**
* Get metadata
* @return metadata
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "")

public V1ObjectMeta getMetadata() {
return metadata;
}


public void setMetadata(V1ObjectMeta metadata) {
this.metadata = metadata;
}


public V1alpha1TableProvisionJob spec(V1alpha1TableProvisionJobSpec spec) {

this.spec = spec;
return this;
}

/**
* Get spec
* @return spec
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "")

public V1alpha1TableProvisionJobSpec getSpec() {
return spec;
}


public void setSpec(V1alpha1TableProvisionJobSpec spec) {
this.spec = spec;
}


public V1alpha1TableProvisionJob status(V1alpha1TableProvisionJobStatus status) {

this.status = status;
return this;
}

/**
* Get status
* @return status
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "")

public V1alpha1TableProvisionJobStatus getStatus() {
return status;
}


public void setStatus(V1alpha1TableProvisionJobStatus status) {
this.status = status;
}


@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
V1alpha1TableProvisionJob v1alpha1TableProvisionJob = (V1alpha1TableProvisionJob) o;
return Objects.equals(this.apiVersion, v1alpha1TableProvisionJob.apiVersion) &&
Objects.equals(this.kind, v1alpha1TableProvisionJob.kind) &&
Objects.equals(this.metadata, v1alpha1TableProvisionJob.metadata) &&
Objects.equals(this.spec, v1alpha1TableProvisionJob.spec) &&
Objects.equals(this.status, v1alpha1TableProvisionJob.status);
}

@Override
public int hashCode() {
return Objects.hash(apiVersion, kind, metadata, spec, status);
}


@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class V1alpha1TableProvisionJob {\n");
sb.append(" apiVersion: ").append(toIndentedString(apiVersion)).append("\n");
sb.append(" kind: ").append(toIndentedString(kind)).append("\n");
sb.append(" metadata: ").append(toIndentedString(metadata)).append("\n");
sb.append(" spec: ").append(toIndentedString(spec)).append("\n");
sb.append(" status: ").append(toIndentedString(status)).append("\n");
sb.append("}");
return sb.toString();
}

/**
* Convert the given object to string with each line indented by 4 spaces
* (except the first line).
*/
private String toIndentedString(Object o) {
if (o == null) {
return "null";
}
return o.toString().replace("\n", "\n ");
}

}

Loading
Loading