diff --git a/misc/dbt-materialize/CHANGELOG.md b/misc/dbt-materialize/CHANGELOG.md index 1f7d1e8c39fd8..c23f7362f4bc3 100644 --- a/misc/dbt-materialize/CHANGELOG.md +++ b/misc/dbt-materialize/CHANGELOG.md @@ -2,6 +2,13 @@ ## Unreleased +* Support unmanaged origin clusters in `deploy_init`. The deployment + cluster is now created by cloning the production cluster's replicas + (including SIZE and AVAILABILITY ZONE attributes). Internal replicas + are skipped. `deploy_init` errors with a clear message if any + user-facing replica lacks a SIZE (orchestrated replicas using + COMPUTECTL/STORAGECTL ADDRESSES cannot be cloned). + ## 1.9.9 - 2026-05-18 * Add support for the `partition_by` configuration in materialized views, diff --git a/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_unmanaged_cluster.sql b/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_unmanaged_cluster.sql new file mode 100644 index 0000000000000..20880849083f1 --- /dev/null +++ b/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_unmanaged_cluster.sql @@ -0,0 +1,156 @@ +-- Copyright Materialize, Inc. and contributors. All rights reserved. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License in the LICENSE file at the +-- root of this repository, or online at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +{# +This macro creates an unmanaged cluster by cloning the replicas of an +existing unmanaged origin cluster. SIZE and AVAILABILITY ZONE are copied +per replica. Internal replicas are skipped. Replicas without a SIZE +(orchestrated replicas using COMPUTECTL/STORAGECTL ADDRESSES) cannot be +cloned and produce a compiler error. + + ## Arguments + - cluster_name (str): The logical cluster name. The deploy suffix is applied + via `force_deploy_suffix`. + - origin_cluster (str): The name of the existing unmanaged cluster whose + replicas should be cloned. + - ignore_existing_objects (bool, optional): Whether to ignore existing + objects in the deploy cluster. Defaults to false. + - force_deploy_suffix (bool, optional): Whether to forcefully add a deploy + suffix to the cluster name. Defaults to false. +#} +{% macro create_unmanaged_cluster( + cluster_name, + origin_cluster, + ignore_existing_objects=false, + force_deploy_suffix=false +) %} + + {% if not cluster_name %} + {{ exceptions.raise_compiler_error("cluster_name must be provided") }} + {% endif %} + + {% if not origin_cluster %} + {{ exceptions.raise_compiler_error("origin_cluster must be provided") }} + {% endif %} + + {% set deploy_cluster = adapter.generate_final_cluster_name(cluster_name, force_deploy_suffix) %} + + {% if cluster_exists(deploy_cluster) %} + {{ log("Deployment cluster " ~ deploy_cluster ~ " already exists", info=True) }} + {% set cluster_empty %} + WITH dataflows AS ( + SELECT mz_indexes.id + FROM mz_indexes + JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + + UNION ALL + + SELECT mz_materialized_views.id + FROM mz_materialized_views + JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + + UNION ALL + + SELECT mz_sources.id + FROM mz_sources + JOIN mz_clusters ON mz_clusters.id = mz_sources.cluster_id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + + UNION ALL + + SELECT mz_sinks.id + FROM mz_sinks + JOIN mz_clusters ON mz_clusters.id = mz_sinks.cluster_id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + ) + + SELECT count(*) + FROM dataflows + WHERE id LIKE 'u%' + {% endset %} + + {% set cluster_object_count = run_query(cluster_empty) %} + {% if execute %} + {% if cluster_object_count and cluster_object_count.columns[0] and cluster_object_count.rows[0][0] > 0 %} + {% if check_cluster_ci_tag(deploy_cluster) %} + {{ log("Cluster " ~ deploy_cluster ~ " was already created for this pull request", info=True) }} + {% elif ignore_existing_objects %} + {{ log("[Warning] Deployment cluster " ~ deploy_cluster ~ " is not empty", info=True) }} + {{ log("[Warning] Confirm the objects it contains are expected before deployment", info=True) }} + {% else %} + {{ exceptions.raise_compiler_error(" + Deployment cluster " ~ deploy_cluster ~ " already exists and is not empty. + This is potentially dangerous as you may end up deploying objects to production you + do not intend. + + If you are certain the objects in this cluster are supposed to exist, you can ignore this + error by setting ignore_existing_objects to True. + + dbt run-operation create_unmanaged_cluster --args '{ignore_existing_objects: True}' + ") }} + {% endif %} + {% endif %} + {% endif %} + {% else %} + {{ log("Creating deployment cluster " ~ deploy_cluster ~ " from unmanaged origin " ~ origin_cluster, info=True) }} + + {% set replica_query %} + SELECT cr.name, cr.size, cr.availability_zone + FROM mz_cluster_replicas cr + JOIN mz_clusters c ON cr.cluster_id = c.id + LEFT JOIN mz_internal.mz_internal_cluster_replicas icr ON icr.id = cr.id + WHERE c.name = {{ dbt.string_literal(origin_cluster) }} + AND icr.id IS NULL + ORDER BY cr.name + {% endset %} + + {% set replica_results = run_query(replica_query) %} + + {% if execute %} + {% for row in replica_results.rows %} + {% if row[1] is none %} + {{ exceptions.raise_compiler_error( + "Cannot clone replica '" ~ row[0] ~ "' of unmanaged cluster '" + ~ origin_cluster ~ "': it has no SIZE. Orchestrated replicas " + ~ "(using COMPUTECTL/STORAGECTL ADDRESSES) cannot be cloned by " + ~ "blue/green deployments. Recreate the cluster as managed or " + ~ "replace this replica with a sized one." + ) }} + {% endif %} + {% endfor %} + + {% set replica_clauses = [] %} + {% for row in replica_results.rows %} + {% set parts = ["SIZE " ~ dbt.string_literal(row[1])] %} + {% if row[2] is not none %} + {% do parts.append("AVAILABILITY ZONE " ~ dbt.string_literal(row[2])) %} + {% endif %} + {% do replica_clauses.append(adapter.quote(row[0]) ~ " (" ~ parts | join(", ") ~ ")") %} + {% endfor %} + + {% set create_cluster_ddl %} + CREATE CLUSTER {{ adapter.quote(deploy_cluster) }} REPLICAS ( + {{ replica_clauses | join(",\n ") }} + ) + {% endset %} + {{ run_query(create_cluster_ddl) }} + {{ set_cluster_ci_tag(deploy_cluster) }} + {% endif %} + {% endif %} + + {{ return(deploy_cluster) }} +{% endmacro %} diff --git a/misc/dbt-materialize/dbt/include/materialize/macros/deploy/deploy_init.sql b/misc/dbt-materialize/dbt/include/materialize/macros/deploy/deploy_init.sql index 8b7d85399375e..90d426c7d5ff5 100644 --- a/misc/dbt-materialize/dbt/include/materialize/macros/deploy/deploy_init.sql +++ b/misc/dbt-materialize/dbt/include/materialize/macros/deploy/deploy_init.sql @@ -114,8 +114,6 @@ c.managed, c.size, c.replication_factor, - c.id AS cluster_id, - c.name AS cluster_name, cs.type AS schedule_type, cs.refresh_hydration_time_estimate FROM mz_clusters c @@ -131,23 +129,28 @@ {% set managed = results[0] %} {% set size = results[1] %} {% set replication_factor = results[2] %} - {% set schedule_type = results[5] %} - {% set refresh_hydration_time_estimate = results[6] %} - - {% if not managed %} - {{ exceptions.raise_compiler_error("Production cluster " ~ origin_cluster ~ " is not managed") }} + {% set schedule_type = results[3] %} + {% set refresh_hydration_time_estimate = results[4] %} + + {% if managed %} + {% set deploy_cluster = create_cluster( + cluster_name=cluster, + size=size, + replication_factor=replication_factor, + schedule_type=schedule_type, + refresh_hydration_time_estimate=refresh_hydration_time_estimate, + ignore_existing_objects=ignore_existing_objects, + force_deploy_suffix=True + ) %} + {% else %} + {% set deploy_cluster = create_unmanaged_cluster( + cluster_name=cluster, + origin_cluster=origin_cluster, + ignore_existing_objects=ignore_existing_objects, + force_deploy_suffix=True + ) %} {% endif %} - {% set deploy_cluster = create_cluster( - cluster_name=cluster, - size=size, - replication_factor=replication_factor, - schedule_type=schedule_type, - refresh_hydration_time_estimate=refresh_hydration_time_estimate, - ignore_existing_objects=ignore_existing_objects, - force_deploy_suffix=True - ) %} - {{ internal_copy_cluster_grants(cluster, deploy_cluster) }} {% endif %} {% endfor %} diff --git a/misc/dbt-materialize/tests/adapter/test_deploy.py b/misc/dbt-materialize/tests/adapter/test_deploy.py index f29a7b842fa79..ce70b92aaf4c3 100644 --- a/misc/dbt-materialize/tests/adapter/test_deploy.py +++ b/misc/dbt-materialize/tests/adapter/test_deploy.py @@ -581,13 +581,119 @@ def test_dbt_deploy_missing_deployment_schema(self, project): run_dbt(["run-operation", "deploy_promote"], expect_pass=False) - def test_fails_on_unmanaged_cluster(self, project): + def test_dbt_deploy_init_unmanaged_empty(self, project): project.run_sql("CREATE CLUSTER prod REPLICAS ()") project.run_sql("CREATE SCHEMA prod") project.run_sql("CREATE SCHEMA staging") - project.run_sql("CREATE SCHEMA staging_dbt_deploy") - run_dbt(["run-operation", "deploy_init"], expect_pass=False) + run_dbt(["run-operation", "deploy_init"]) + + managed = project.run_sql( + "SELECT managed FROM mz_clusters WHERE name = 'prod_dbt_deploy'", + fetch="one", + ) + assert managed == (False,) + + replica_count = project.run_sql( + """ + SELECT count(*) + FROM mz_cluster_replicas cr + JOIN mz_clusters c ON cr.cluster_id = c.id + WHERE c.name = 'prod_dbt_deploy' + """, + fetch="one", + ) + assert replica_count == (0,) + + def test_dbt_deploy_init_unmanaged_single_replica(self, project): + project.run_sql("CREATE CLUSTER prod REPLICAS (r1 (SIZE 'scale=1,workers=1'))") + project.run_sql("CREATE SCHEMA prod") + project.run_sql("CREATE SCHEMA staging") + + run_dbt(["run-operation", "deploy_init"]) + + managed = project.run_sql( + "SELECT managed FROM mz_clusters WHERE name = 'prod_dbt_deploy'", + fetch="one", + ) + assert managed == (False,) + + replicas = project.run_sql( + """ + SELECT cr.name, cr.size + FROM mz_cluster_replicas cr + JOIN mz_clusters c ON cr.cluster_id = c.id + WHERE c.name = 'prod_dbt_deploy' + ORDER BY cr.name + """, + fetch="all", + ) + assert replicas == [("r1", "scale=1,workers=1")] + + def test_dbt_deploy_init_unmanaged_multi_replica(self, project): + project.run_sql( + "CREATE CLUSTER prod REPLICAS (" + "r1 (SIZE 'scale=1,workers=1'), " + "r2 (SIZE 'scale=1,workers=1'))" + ) + project.run_sql("CREATE SCHEMA prod") + project.run_sql("CREATE SCHEMA staging") + + run_dbt(["run-operation", "deploy_init"]) + + replicas = project.run_sql( + """ + SELECT cr.name, cr.size + FROM mz_cluster_replicas cr + JOIN mz_clusters c ON cr.cluster_id = c.id + WHERE c.name = 'prod_dbt_deploy' + ORDER BY cr.name + """, + fetch="all", + ) + assert replicas == [ + ("r1", "scale=1,workers=1"), + ("r2", "scale=1,workers=1"), + ] + + def test_dbt_deploy_init_unmanaged_idempotent(self, project): + project.run_sql("CREATE CLUSTER prod REPLICAS (r1 (SIZE 'scale=1,workers=1'))") + project.run_sql("CREATE SCHEMA prod") + project.run_sql("CREATE SCHEMA staging") + + run_dbt(["run-operation", "deploy_init"]) + run_dbt(["run-operation", "deploy_init"]) + + replicas = project.run_sql( + """ + SELECT cr.name, cr.size + FROM mz_cluster_replicas cr + JOIN mz_clusters c ON cr.cluster_id = c.id + WHERE c.name = 'prod_dbt_deploy' + ORDER BY cr.name + """, + fetch="all", + ) + assert replicas == [("r1", "scale=1,workers=1")] + + def test_dbt_deploy_init_unmanaged_errors_on_unsized_replica(self, project): + try: + project.run_sql( + "CREATE CLUSTER prod REPLICAS " + "(r1 (STORAGECTL ADDRESSES ['s:1234'], " + "COMPUTECTL ADDRESSES ['c:1234'], " + "WORKERS 1))" + ) + except psycopg2.Error as e: + pytest.skip(f"local Materialize image rejects unsized replicas: {e}") + project.run_sql("CREATE SCHEMA prod") + project.run_sql("CREATE SCHEMA staging") + + _, log_output = run_dbt_and_capture( + ["run-operation", "deploy_init"], expect_pass=False + ) + assert "r1" in log_output + assert "no SIZE" in log_output def test_dbt_deploy_init_with_refresh_hydration_time(self, project): project.run_sql(