From e45851d513a4e9329811a98b9d780a0ebfce26c6 Mon Sep 17 00:00:00 2001 From: Seth Wiesman Date: Tue, 19 May 2026 14:50:01 -0400 Subject: [PATCH 1/3] dbt-materialize: support unmanaged clusters in deploy_init The deploy_init macro previously hard-failed when the production cluster was unmanaged, blocking blue/green deployments for any Materialize user not on a managed cluster. Add a dispatch path that, for unmanaged origins, queries mz_cluster_replicas and emits CREATE CLUSTER ... REPLICAS (...) cloning each user-facing replica's SIZE and AVAILABILITY ZONE. Internal replicas are filtered via mz_internal_cluster_replicas. Replicas without a SIZE (orchestrated replicas using COMPUTECTL/STORAGECTL ADDRESSES) cannot be faithfully cloned and now produce a compiler error that names the offending replica. The new create_unmanaged_cluster macro mirrors the structure of the existing create_cluster macro (pre-existence check, CI tag re-entry, DDL emission, CI tag set) so the managed path is untouched. Co-Authored-By: Claude Opus 4.7 --- misc/dbt-materialize/CHANGELOG.md | 7 + .../macros/ci/create_unmanaged_cluster.sql | 156 ++++++++++++++++++ .../materialize/macros/deploy/deploy_init.sql | 37 +++-- .../tests/adapter/test_deploy.py | 112 ++++++++++++- 4 files changed, 292 insertions(+), 20 deletions(-) create mode 100644 misc/dbt-materialize/dbt/include/materialize/macros/ci/create_unmanaged_cluster.sql diff --git a/misc/dbt-materialize/CHANGELOG.md b/misc/dbt-materialize/CHANGELOG.md index 1f7d1e8c39fd8..c23f7362f4bc3 100644 --- a/misc/dbt-materialize/CHANGELOG.md +++ b/misc/dbt-materialize/CHANGELOG.md @@ -2,6 +2,13 @@ ## Unreleased +* Support unmanaged origin clusters in `deploy_init`. The deployment + cluster is now created by cloning the production cluster's replicas + (including SIZE and AVAILABILITY ZONE attributes). Internal replicas + are skipped. `deploy_init` errors with a clear message if any + user-facing replica lacks a SIZE (orchestrated replicas using + COMPUTECTL/STORAGECTL ADDRESSES cannot be cloned). + ## 1.9.9 - 2026-05-18 * Add support for the `partition_by` configuration in materialized views, diff --git a/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_unmanaged_cluster.sql b/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_unmanaged_cluster.sql new file mode 100644 index 0000000000000..20880849083f1 --- /dev/null +++ b/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_unmanaged_cluster.sql @@ -0,0 +1,156 @@ +-- Copyright Materialize, Inc. and contributors. All rights reserved. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License in the LICENSE file at the +-- root of this repository, or online at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +{# +This macro creates an unmanaged cluster by cloning the replicas of an +existing unmanaged origin cluster. SIZE and AVAILABILITY ZONE are copied +per replica. Internal replicas are skipped. Replicas without a SIZE +(orchestrated replicas using COMPUTECTL/STORAGECTL ADDRESSES) cannot be +cloned and produce a compiler error. + + ## Arguments + - cluster_name (str): The logical cluster name. The deploy suffix is applied + via `force_deploy_suffix`. + - origin_cluster (str): The name of the existing unmanaged cluster whose + replicas should be cloned. + - ignore_existing_objects (bool, optional): Whether to ignore existing + objects in the deploy cluster. Defaults to false. + - force_deploy_suffix (bool, optional): Whether to forcefully add a deploy + suffix to the cluster name. Defaults to false. +#} +{% macro create_unmanaged_cluster( + cluster_name, + origin_cluster, + ignore_existing_objects=false, + force_deploy_suffix=false +) %} + + {% if not cluster_name %} + {{ exceptions.raise_compiler_error("cluster_name must be provided") }} + {% endif %} + + {% if not origin_cluster %} + {{ exceptions.raise_compiler_error("origin_cluster must be provided") }} + {% endif %} + + {% set deploy_cluster = adapter.generate_final_cluster_name(cluster_name, force_deploy_suffix) %} + + {% if cluster_exists(deploy_cluster) %} + {{ log("Deployment cluster " ~ deploy_cluster ~ " already exists", info=True) }} + {% set cluster_empty %} + WITH dataflows AS ( + SELECT mz_indexes.id + FROM mz_indexes + JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + + UNION ALL + + SELECT mz_materialized_views.id + FROM mz_materialized_views + JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + + UNION ALL + + SELECT mz_sources.id + FROM mz_sources + JOIN mz_clusters ON mz_clusters.id = mz_sources.cluster_id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + + UNION ALL + + SELECT mz_sinks.id + FROM mz_sinks + JOIN mz_clusters ON mz_clusters.id = mz_sinks.cluster_id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + ) + + SELECT count(*) + FROM dataflows + WHERE id LIKE 'u%' + {% endset %} + + {% set cluster_object_count = run_query(cluster_empty) %} + {% if execute %} + {% if cluster_object_count and cluster_object_count.columns[0] and cluster_object_count.rows[0][0] > 0 %} + {% if check_cluster_ci_tag(deploy_cluster) %} + {{ log("Cluster " ~ deploy_cluster ~ " was already created for this pull request", info=True) }} + {% elif ignore_existing_objects %} + {{ log("[Warning] Deployment cluster " ~ deploy_cluster ~ " is not empty", info=True) }} + {{ log("[Warning] Confirm the objects it contains are expected before deployment", info=True) }} + {% else %} + {{ exceptions.raise_compiler_error(" + Deployment cluster " ~ deploy_cluster ~ " already exists and is not empty. + This is potentially dangerous as you may end up deploying objects to production you + do not intend. + + If you are certain the objects in this cluster are supposed to exist, you can ignore this + error by setting ignore_existing_objects to True. + + dbt run-operation create_unmanaged_cluster --args '{ignore_existing_objects: True}' + ") }} + {% endif %} + {% endif %} + {% endif %} + {% else %} + {{ log("Creating deployment cluster " ~ deploy_cluster ~ " from unmanaged origin " ~ origin_cluster, info=True) }} + + {% set replica_query %} + SELECT cr.name, cr.size, cr.availability_zone + FROM mz_cluster_replicas cr + JOIN mz_clusters c ON cr.cluster_id = c.id + LEFT JOIN mz_internal.mz_internal_cluster_replicas icr ON icr.id = cr.id + WHERE c.name = {{ dbt.string_literal(origin_cluster) }} + AND icr.id IS NULL + ORDER BY cr.name + {% endset %} + + {% set replica_results = run_query(replica_query) %} + + {% if execute %} + {% for row in replica_results.rows %} + {% if row[1] is none %} + {{ exceptions.raise_compiler_error( + "Cannot clone replica '" ~ row[0] ~ "' of unmanaged cluster '" + ~ origin_cluster ~ "': it has no SIZE. Orchestrated replicas " + ~ "(using COMPUTECTL/STORAGECTL ADDRESSES) cannot be cloned by " + ~ "blue/green deployments. Recreate the cluster as managed or " + ~ "replace this replica with a sized one." + ) }} + {% endif %} + {% endfor %} + + {% set replica_clauses = [] %} + {% for row in replica_results.rows %} + {% set parts = ["SIZE " ~ dbt.string_literal(row[1])] %} + {% if row[2] is not none %} + {% do parts.append("AVAILABILITY ZONE " ~ dbt.string_literal(row[2])) %} + {% endif %} + {% do replica_clauses.append(adapter.quote(row[0]) ~ " (" ~ parts | join(", ") ~ ")") %} + {% endfor %} + + {% set create_cluster_ddl %} + CREATE CLUSTER {{ adapter.quote(deploy_cluster) }} REPLICAS ( + {{ replica_clauses | join(",\n ") }} + ) + {% endset %} + {{ run_query(create_cluster_ddl) }} + {{ set_cluster_ci_tag(deploy_cluster) }} + {% endif %} + {% endif %} + + {{ return(deploy_cluster) }} +{% endmacro %} diff --git a/misc/dbt-materialize/dbt/include/materialize/macros/deploy/deploy_init.sql b/misc/dbt-materialize/dbt/include/materialize/macros/deploy/deploy_init.sql index 8b7d85399375e..90d426c7d5ff5 100644 --- a/misc/dbt-materialize/dbt/include/materialize/macros/deploy/deploy_init.sql +++ b/misc/dbt-materialize/dbt/include/materialize/macros/deploy/deploy_init.sql @@ -114,8 +114,6 @@ c.managed, c.size, c.replication_factor, - c.id AS cluster_id, - c.name AS cluster_name, cs.type AS schedule_type, cs.refresh_hydration_time_estimate FROM mz_clusters c @@ -131,23 +129,28 @@ {% set managed = results[0] %} {% set size = results[1] %} {% set replication_factor = results[2] %} - {% set schedule_type = results[5] %} - {% set refresh_hydration_time_estimate = results[6] %} - - {% if not managed %} - {{ exceptions.raise_compiler_error("Production cluster " ~ origin_cluster ~ " is not managed") }} + {% set schedule_type = results[3] %} + {% set refresh_hydration_time_estimate = results[4] %} + + {% if managed %} + {% set deploy_cluster = create_cluster( + cluster_name=cluster, + size=size, + replication_factor=replication_factor, + schedule_type=schedule_type, + refresh_hydration_time_estimate=refresh_hydration_time_estimate, + ignore_existing_objects=ignore_existing_objects, + force_deploy_suffix=True + ) %} + {% else %} + {% set deploy_cluster = create_unmanaged_cluster( + cluster_name=cluster, + origin_cluster=origin_cluster, + ignore_existing_objects=ignore_existing_objects, + force_deploy_suffix=True + ) %} {% endif %} - {% set deploy_cluster = create_cluster( - cluster_name=cluster, - size=size, - replication_factor=replication_factor, - schedule_type=schedule_type, - refresh_hydration_time_estimate=refresh_hydration_time_estimate, - ignore_existing_objects=ignore_existing_objects, - force_deploy_suffix=True - ) %} - {{ internal_copy_cluster_grants(cluster, deploy_cluster) }} {% endif %} {% endfor %} diff --git a/misc/dbt-materialize/tests/adapter/test_deploy.py b/misc/dbt-materialize/tests/adapter/test_deploy.py index f29a7b842fa79..ce70b92aaf4c3 100644 --- a/misc/dbt-materialize/tests/adapter/test_deploy.py +++ b/misc/dbt-materialize/tests/adapter/test_deploy.py @@ -581,13 +581,119 @@ def test_dbt_deploy_missing_deployment_schema(self, project): run_dbt(["run-operation", "deploy_promote"], expect_pass=False) - def test_fails_on_unmanaged_cluster(self, project): + def test_dbt_deploy_init_unmanaged_empty(self, project): project.run_sql("CREATE CLUSTER prod REPLICAS ()") project.run_sql("CREATE SCHEMA prod") project.run_sql("CREATE SCHEMA staging") - project.run_sql("CREATE SCHEMA staging_dbt_deploy") - run_dbt(["run-operation", "deploy_init"], expect_pass=False) + run_dbt(["run-operation", "deploy_init"]) + + managed = project.run_sql( + "SELECT managed FROM mz_clusters WHERE name = 'prod_dbt_deploy'", + fetch="one", + ) + assert managed == (False,) + + replica_count = project.run_sql( + """ + SELECT count(*) + FROM mz_cluster_replicas cr + JOIN mz_clusters c ON cr.cluster_id = c.id + WHERE c.name = 'prod_dbt_deploy' + """, + fetch="one", + ) + assert replica_count == (0,) + + def test_dbt_deploy_init_unmanaged_single_replica(self, project): + project.run_sql("CREATE CLUSTER prod REPLICAS (r1 (SIZE 'scale=1,workers=1'))") + project.run_sql("CREATE SCHEMA prod") + project.run_sql("CREATE SCHEMA staging") + + run_dbt(["run-operation", "deploy_init"]) + + managed = project.run_sql( + "SELECT managed FROM mz_clusters WHERE name = 'prod_dbt_deploy'", + fetch="one", + ) + assert managed == (False,) + + replicas = project.run_sql( + """ + SELECT cr.name, cr.size + FROM mz_cluster_replicas cr + JOIN mz_clusters c ON cr.cluster_id = c.id + WHERE c.name = 'prod_dbt_deploy' + ORDER BY cr.name + """, + fetch="all", + ) + assert replicas == [("r1", "scale=1,workers=1")] + + def test_dbt_deploy_init_unmanaged_multi_replica(self, project): + project.run_sql( + "CREATE CLUSTER prod REPLICAS (" + "r1 (SIZE 'scale=1,workers=1'), " + "r2 (SIZE 'scale=1,workers=1'))" + ) + project.run_sql("CREATE SCHEMA prod") + project.run_sql("CREATE SCHEMA staging") + + run_dbt(["run-operation", "deploy_init"]) + + replicas = project.run_sql( + """ + SELECT cr.name, cr.size + FROM mz_cluster_replicas cr + JOIN mz_clusters c ON cr.cluster_id = c.id + WHERE c.name = 'prod_dbt_deploy' + ORDER BY cr.name + """, + fetch="all", + ) + assert replicas == [ + ("r1", "scale=1,workers=1"), + ("r2", "scale=1,workers=1"), + ] + + def test_dbt_deploy_init_unmanaged_idempotent(self, project): + project.run_sql("CREATE CLUSTER prod REPLICAS (r1 (SIZE 'scale=1,workers=1'))") + project.run_sql("CREATE SCHEMA prod") + project.run_sql("CREATE SCHEMA staging") + + run_dbt(["run-operation", "deploy_init"]) + run_dbt(["run-operation", "deploy_init"]) + + replicas = project.run_sql( + """ + SELECT cr.name, cr.size + FROM mz_cluster_replicas cr + JOIN mz_clusters c ON cr.cluster_id = c.id + WHERE c.name = 'prod_dbt_deploy' + ORDER BY cr.name + """, + fetch="all", + ) + assert replicas == [("r1", "scale=1,workers=1")] + + def test_dbt_deploy_init_unmanaged_errors_on_unsized_replica(self, project): + try: + project.run_sql( + "CREATE CLUSTER prod REPLICAS " + "(r1 (STORAGECTL ADDRESSES ['s:1234'], " + "COMPUTECTL ADDRESSES ['c:1234'], " + "WORKERS 1))" + ) + except psycopg2.Error as e: + pytest.skip(f"local Materialize image rejects unsized replicas: {e}") + project.run_sql("CREATE SCHEMA prod") + project.run_sql("CREATE SCHEMA staging") + + _, log_output = run_dbt_and_capture( + ["run-operation", "deploy_init"], expect_pass=False + ) + assert "r1" in log_output + assert "no SIZE" in log_output def test_dbt_deploy_init_with_refresh_hydration_time(self, project): project.run_sql( From 7074c80896a645e25d6d9805977effe5bbc1a884 Mon Sep 17 00:00:00 2001 From: Seth Wiesman Date: Wed, 20 May 2026 09:37:46 -0500 Subject: [PATCH 2/3] address review feedback --- misc/dbt-materialize/CHANGELOG.md | 9 +- .../materialize/macros/ci/create_cluster.sql | 60 +----------- .../macros/ci/create_unmanaged_cluster.sql | 77 +++------------- ...internal_check_existing_deploy_cluster.sql | 91 +++++++++++++++++++ .../tests/adapter/test_deploy.py | 8 +- 5 files changed, 115 insertions(+), 130 deletions(-) create mode 100644 misc/dbt-materialize/dbt/include/materialize/macros/ci/internal_check_existing_deploy_cluster.sql diff --git a/misc/dbt-materialize/CHANGELOG.md b/misc/dbt-materialize/CHANGELOG.md index c23f7362f4bc3..bb18d58a03f5a 100644 --- a/misc/dbt-materialize/CHANGELOG.md +++ b/misc/dbt-materialize/CHANGELOG.md @@ -2,12 +2,9 @@ ## Unreleased -* Support unmanaged origin clusters in `deploy_init`. The deployment - cluster is now created by cloning the production cluster's replicas - (including SIZE and AVAILABILITY ZONE attributes). Internal replicas - are skipped. `deploy_init` errors with a clear message if any - user-facing replica lacks a SIZE (orchestrated replicas using - COMPUTECTL/STORAGECTL ADDRESSES cannot be cloned). +* Support unmanaged clusters in `deploy_init`. The deployment cluster + is now created by cloning the production cluster's replicas (including + `SIZE` and `AVAILABILITY ZONE`). ## 1.9.9 - 2026-05-18 diff --git a/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_cluster.sql b/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_cluster.sql index d3f66b832a755..2b9c2fd9cafd2 100644 --- a/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_cluster.sql +++ b/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_cluster.sql @@ -51,67 +51,11 @@ This macro creates a cluster with the specified properties. {% set deploy_cluster = adapter.generate_final_cluster_name(cluster_name, force_deploy_suffix) %} {% if cluster_exists(deploy_cluster) %} - {{ log("Deployment cluster " ~ deploy_cluster ~ " already exists", info=True) }} - {% set cluster_empty %} - WITH dataflows AS ( - SELECT mz_indexes.id - FROM mz_indexes - JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id - WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} - - UNION ALL - - SELECT mz_materialized_views.id - FROM mz_materialized_views - JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id - WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} - - UNION ALL - - SELECT mz_sources.id - FROM mz_sources - JOIN mz_clusters ON mz_clusters.id = mz_sources.cluster_id - WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} - - UNION ALL - - SELECT mz_sinks.id - FROM mz_sinks - JOIN mz_clusters ON mz_clusters.id = mz_sinks.cluster_id - WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} - ) - - SELECT count(*) - FROM dataflows - WHERE id LIKE 'u%' - {% endset %} - - {% set cluster_object_count = run_query(cluster_empty) %} - {% if execute %} - {% if cluster_object_count and cluster_object_count.columns[0] and cluster_object_count.rows[0][0] > 0 %} - {% if check_cluster_ci_tag(deploy_cluster) %} - {{ log("Cluster " ~ deploy_cluster ~ " was already created for this pull request", info=True) }} - {% elif ignore_existing_objects %} - {{ log("[Warning] Deployment cluster " ~ deploy_cluster ~ " is not empty", info=True) }} - {{ log("[Warning] Confirm the objects it contains are expected before deployment", info=True) }} - {% else %} - {{ exceptions.raise_compiler_error(" - Deployment cluster " ~ deploy_cluster ~ " already exists and is not empty. - This is potentially dangerous as you may end up deploying objects to production you - do not intend. - - If you are certain the objects in this cluster are supposed to exist, you can ignore this - error by setting ignore_existing_objects to True. - - dbt run-operation create_cluster --args '{ignore_existing_objects: True}' - ") }} - {% endif %} - {% endif %} - {% endif %} + {{ internal_check_existing_deploy_cluster(deploy_cluster, ignore_existing_objects) }} {% else %} {{ log("Creating deployment cluster " ~ deploy_cluster, info=True)}} {% set create_cluster_ddl %} - CREATE CLUSTER {{ deploy_cluster }} ( + CREATE CLUSTER {{ adapter.quote(deploy_cluster) }} ( SIZE = {{ dbt.string_literal(size) }} {% if replication_factor is not none and ( schedule_type == 'manual' or schedule_type is none ) %} , REPLICATION FACTOR = {{ replication_factor }} diff --git a/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_unmanaged_cluster.sql b/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_unmanaged_cluster.sql index 20880849083f1..243a667dd0613 100644 --- a/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_unmanaged_cluster.sql +++ b/misc/dbt-materialize/dbt/include/materialize/macros/ci/create_unmanaged_cluster.sql @@ -48,63 +48,7 @@ cloned and produce a compiler error. {% set deploy_cluster = adapter.generate_final_cluster_name(cluster_name, force_deploy_suffix) %} {% if cluster_exists(deploy_cluster) %} - {{ log("Deployment cluster " ~ deploy_cluster ~ " already exists", info=True) }} - {% set cluster_empty %} - WITH dataflows AS ( - SELECT mz_indexes.id - FROM mz_indexes - JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id - WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} - - UNION ALL - - SELECT mz_materialized_views.id - FROM mz_materialized_views - JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id - WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} - - UNION ALL - - SELECT mz_sources.id - FROM mz_sources - JOIN mz_clusters ON mz_clusters.id = mz_sources.cluster_id - WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} - - UNION ALL - - SELECT mz_sinks.id - FROM mz_sinks - JOIN mz_clusters ON mz_clusters.id = mz_sinks.cluster_id - WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} - ) - - SELECT count(*) - FROM dataflows - WHERE id LIKE 'u%' - {% endset %} - - {% set cluster_object_count = run_query(cluster_empty) %} - {% if execute %} - {% if cluster_object_count and cluster_object_count.columns[0] and cluster_object_count.rows[0][0] > 0 %} - {% if check_cluster_ci_tag(deploy_cluster) %} - {{ log("Cluster " ~ deploy_cluster ~ " was already created for this pull request", info=True) }} - {% elif ignore_existing_objects %} - {{ log("[Warning] Deployment cluster " ~ deploy_cluster ~ " is not empty", info=True) }} - {{ log("[Warning] Confirm the objects it contains are expected before deployment", info=True) }} - {% else %} - {{ exceptions.raise_compiler_error(" - Deployment cluster " ~ deploy_cluster ~ " already exists and is not empty. - This is potentially dangerous as you may end up deploying objects to production you - do not intend. - - If you are certain the objects in this cluster are supposed to exist, you can ignore this - error by setting ignore_existing_objects to True. - - dbt run-operation create_unmanaged_cluster --args '{ignore_existing_objects: True}' - ") }} - {% endif %} - {% endif %} - {% endif %} + {{ internal_check_existing_deploy_cluster(deploy_cluster, ignore_existing_objects) }} {% else %} {{ log("Creating deployment cluster " ~ deploy_cluster ~ " from unmanaged origin " ~ origin_cluster, info=True) }} @@ -121,17 +65,22 @@ cloned and produce a compiler error. {% set replica_results = run_query(replica_query) %} {% if execute %} + {% set unsized_replicas = [] %} {% for row in replica_results.rows %} {% if row[1] is none %} - {{ exceptions.raise_compiler_error( - "Cannot clone replica '" ~ row[0] ~ "' of unmanaged cluster '" - ~ origin_cluster ~ "': it has no SIZE. Orchestrated replicas " - ~ "(using COMPUTECTL/STORAGECTL ADDRESSES) cannot be cloned by " - ~ "blue/green deployments. Recreate the cluster as managed or " - ~ "replace this replica with a sized one." - ) }} + {% do unsized_replicas.append(row[0]) %} {% endif %} {% endfor %} + {% if unsized_replicas %} + {{ exceptions.raise_compiler_error( + "Cannot clone the following replicas of unmanaged cluster '" + ~ origin_cluster ~ "' because they have no SIZE: " + ~ unsized_replicas | join(", ") ~ ". Orchestrated replicas " + ~ "(using COMPUTECTL/STORAGECTL ADDRESSES) cannot be cloned by " + ~ "blue/green deployments. Recreate the cluster as managed or " + ~ "replace these replicas with sized ones." + ) }} + {% endif %} {% set replica_clauses = [] %} {% for row in replica_results.rows %} diff --git a/misc/dbt-materialize/dbt/include/materialize/macros/ci/internal_check_existing_deploy_cluster.sql b/misc/dbt-materialize/dbt/include/materialize/macros/ci/internal_check_existing_deploy_cluster.sql new file mode 100644 index 0000000000000..5bdbde92b39e8 --- /dev/null +++ b/misc/dbt-materialize/dbt/include/materialize/macros/ci/internal_check_existing_deploy_cluster.sql @@ -0,0 +1,91 @@ +-- Copyright Materialize, Inc. and contributors. All rights reserved. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License in the LICENSE file at the +-- root of this repository, or online at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +{# +Shared pre-existence check for the cluster-creation macros invoked by +`deploy_init`. When the deployment cluster already exists, this macro +counts user objects in it and either: + + - logs and continues, if the cluster was created earlier in the same + pull request (matching CI tag) or `ignore_existing_objects` is true, + - raises a compiler error otherwise. + +Keeping this in one place ensures the managed (`create_cluster`) and +unmanaged (`create_unmanaged_cluster`) paths stay in lockstep. + + ## Arguments + - deploy_cluster (str): The fully-qualified deployment cluster name. + - ignore_existing_objects (bool): Whether to tolerate a non-empty + pre-existing deployment cluster. +#} +{% macro internal_check_existing_deploy_cluster(deploy_cluster, ignore_existing_objects) %} + {{ log("Deployment cluster " ~ deploy_cluster ~ " already exists", info=True) }} + {% set cluster_empty %} + WITH dataflows AS ( + SELECT mz_indexes.id + FROM mz_indexes + JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + + UNION ALL + + SELECT mz_materialized_views.id + FROM mz_materialized_views + JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + + UNION ALL + + SELECT mz_sources.id + FROM mz_sources + JOIN mz_clusters ON mz_clusters.id = mz_sources.cluster_id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + + UNION ALL + + SELECT mz_sinks.id + FROM mz_sinks + JOIN mz_clusters ON mz_clusters.id = mz_sinks.cluster_id + WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }} + ) + + SELECT count(*) + FROM dataflows + WHERE id LIKE 'u%' + {% endset %} + + {% set cluster_object_count = run_query(cluster_empty) %} + {% if execute %} + {% if cluster_object_count and cluster_object_count.columns[0] and cluster_object_count.rows[0][0] > 0 %} + {% if check_cluster_ci_tag(deploy_cluster) %} + {{ log("Cluster " ~ deploy_cluster ~ " was already created for this pull request", info=True) }} + {% elif ignore_existing_objects %} + {{ log("[Warning] Deployment cluster " ~ deploy_cluster ~ " is not empty", info=True) }} + {{ log("[Warning] Confirm the objects it contains are expected before deployment", info=True) }} + {% else %} + {{ exceptions.raise_compiler_error(" + Deployment cluster " ~ deploy_cluster ~ " already exists and is not empty. + This is potentially dangerous as you may end up deploying objects to production you + do not intend. + + If you are certain the objects in this cluster are supposed to exist, you can ignore this + error by re-running deploy_init with ignore_existing_objects: True: + + dbt run-operation deploy_init --args '{ignore_existing_objects: True}' + ") }} + {% endif %} + {% endif %} + {% endif %} +{% endmacro %} diff --git a/misc/dbt-materialize/tests/adapter/test_deploy.py b/misc/dbt-materialize/tests/adapter/test_deploy.py index ce70b92aaf4c3..6b874fbeb2169 100644 --- a/misc/dbt-materialize/tests/adapter/test_deploy.py +++ b/misc/dbt-materialize/tests/adapter/test_deploy.py @@ -679,9 +679,12 @@ def test_dbt_deploy_init_unmanaged_idempotent(self, project): def test_dbt_deploy_init_unmanaged_errors_on_unsized_replica(self, project): try: project.run_sql( - "CREATE CLUSTER prod REPLICAS " - "(r1 (STORAGECTL ADDRESSES ['s:1234'], " + "CREATE CLUSTER prod REPLICAS (" + "r1 (STORAGECTL ADDRESSES ['s:1234'], " "COMPUTECTL ADDRESSES ['c:1234'], " + "WORKERS 1), " + "r2 (STORAGECTL ADDRESSES ['s:1235'], " + "COMPUTECTL ADDRESSES ['c:1235'], " "WORKERS 1))" ) except psycopg2.Error as e: @@ -693,6 +696,7 @@ def test_dbt_deploy_init_unmanaged_errors_on_unsized_replica(self, project): ["run-operation", "deploy_init"], expect_pass=False ) assert "r1" in log_output + assert "r2" in log_output assert "no SIZE" in log_output def test_dbt_deploy_init_with_refresh_hydration_time(self, project): From 05a31d5479171d43ac25b307ccb76eb759d00d6a Mon Sep 17 00:00:00 2001 From: Seth Wiesman Date: Wed, 20 May 2026 11:03:38 -0500 Subject: [PATCH 3/3] fix whitespace --- misc/dbt-materialize/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/dbt-materialize/CHANGELOG.md b/misc/dbt-materialize/CHANGELOG.md index bb18d58a03f5a..7b5923be2696d 100644 --- a/misc/dbt-materialize/CHANGELOG.md +++ b/misc/dbt-materialize/CHANGELOG.md @@ -4,7 +4,7 @@ * Support unmanaged clusters in `deploy_init`. The deployment cluster is now created by cloning the production cluster's replicas (including - `SIZE` and `AVAILABILITY ZONE`). + `SIZE` and `AVAILABILITY ZONE`). ## 1.9.9 - 2026-05-18