Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions misc/dbt-materialize/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## Unreleased

* Support unmanaged origin clusters in `deploy_init`. The deployment
cluster is now created by cloning the production cluster's replicas
(including SIZE and AVAILABILITY ZONE attributes). Internal replicas
are skipped. `deploy_init` errors with a clear message if any
user-facing replica lacks a SIZE (orchestrated replicas using
COMPUTECTL/STORAGECTL ADDRESSES cannot be cloned).

## 1.9.9 - 2026-05-18

* Add support for the `partition_by` configuration in materialized views,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
-- Copyright Materialize, Inc. and contributors. All rights reserved.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License in the LICENSE file at the
-- root of this repository, or online at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

{#
This macro creates an unmanaged cluster by cloning the replicas of an
existing unmanaged origin cluster. SIZE and AVAILABILITY ZONE are copied
per replica. Internal replicas are skipped. Replicas without a SIZE
(orchestrated replicas using COMPUTECTL/STORAGECTL ADDRESSES) cannot be
cloned and produce a compiler error.

## Arguments
- cluster_name (str): The logical cluster name. The deploy suffix is applied
via `force_deploy_suffix`.
- origin_cluster (str): The name of the existing unmanaged cluster whose
replicas should be cloned.
- ignore_existing_objects (bool, optional): Whether to ignore existing
objects in the deploy cluster. Defaults to false.
- force_deploy_suffix (bool, optional): Whether to forcefully add a deploy
suffix to the cluster name. Defaults to false.
#}
{% macro create_unmanaged_cluster(
cluster_name,
origin_cluster,
ignore_existing_objects=false,
force_deploy_suffix=false
) %}

{% if not cluster_name %}
{{ exceptions.raise_compiler_error("cluster_name must be provided") }}
{% endif %}

{% if not origin_cluster %}
{{ exceptions.raise_compiler_error("origin_cluster must be provided") }}
{% endif %}

{% set deploy_cluster = adapter.generate_final_cluster_name(cluster_name, force_deploy_suffix) %}

{% if cluster_exists(deploy_cluster) %}
{{ log("Deployment cluster " ~ deploy_cluster ~ " already exists", info=True) }}
{% set cluster_empty %}
WITH dataflows AS (
SELECT mz_indexes.id
FROM mz_indexes
JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id
WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }}

UNION ALL

SELECT mz_materialized_views.id
FROM mz_materialized_views
JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id
WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }}

UNION ALL

SELECT mz_sources.id
FROM mz_sources
JOIN mz_clusters ON mz_clusters.id = mz_sources.cluster_id
WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }}

UNION ALL

SELECT mz_sinks.id
FROM mz_sinks
JOIN mz_clusters ON mz_clusters.id = mz_sinks.cluster_id
WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }}
)

SELECT count(*)
FROM dataflows
WHERE id LIKE 'u%'
{% endset %}

{% set cluster_object_count = run_query(cluster_empty) %}
{% if execute %}
{% if cluster_object_count and cluster_object_count.columns[0] and cluster_object_count.rows[0][0] > 0 %}
{% if check_cluster_ci_tag(deploy_cluster) %}
{{ log("Cluster " ~ deploy_cluster ~ " was already created for this pull request", info=True) }}
{% elif ignore_existing_objects %}
{{ log("[Warning] Deployment cluster " ~ deploy_cluster ~ " is not empty", info=True) }}
{{ log("[Warning] Confirm the objects it contains are expected before deployment", info=True) }}
{% else %}
{{ exceptions.raise_compiler_error("
Deployment cluster " ~ deploy_cluster ~ " already exists and is not empty.
This is potentially dangerous as you may end up deploying objects to production you
do not intend.

If you are certain the objects in this cluster are supposed to exist, you can ignore this
error by setting ignore_existing_objects to True.

dbt run-operation create_unmanaged_cluster --args '{ignore_existing_objects: True}'
") }}
{% endif %}
{% endif %}
{% endif %}
{% else %}
{{ log("Creating deployment cluster " ~ deploy_cluster ~ " from unmanaged origin " ~ origin_cluster, info=True) }}

{% set replica_query %}
SELECT cr.name, cr.size, cr.availability_zone
FROM mz_cluster_replicas cr
JOIN mz_clusters c ON cr.cluster_id = c.id
LEFT JOIN mz_internal.mz_internal_cluster_replicas icr ON icr.id = cr.id
WHERE c.name = {{ dbt.string_literal(origin_cluster) }}
AND icr.id IS NULL
ORDER BY cr.name
{% endset %}

{% set replica_results = run_query(replica_query) %}

{% if execute %}
{% for row in replica_results.rows %}
{% if row[1] is none %}
{{ exceptions.raise_compiler_error(
"Cannot clone replica '" ~ row[0] ~ "' of unmanaged cluster '"
~ origin_cluster ~ "': it has no SIZE. Orchestrated replicas "
~ "(using COMPUTECTL/STORAGECTL ADDRESSES) cannot be cloned by "
~ "blue/green deployments. Recreate the cluster as managed or "
~ "replace this replica with a sized one."
) }}
{% endif %}
{% endfor %}

{% set replica_clauses = [] %}
{% for row in replica_results.rows %}
{% set parts = ["SIZE " ~ dbt.string_literal(row[1])] %}
{% if row[2] is not none %}
{% do parts.append("AVAILABILITY ZONE " ~ dbt.string_literal(row[2])) %}
{% endif %}
{% do replica_clauses.append(adapter.quote(row[0]) ~ " (" ~ parts | join(", ") ~ ")") %}
{% endfor %}

{% set create_cluster_ddl %}
CREATE CLUSTER {{ adapter.quote(deploy_cluster) }} REPLICAS (
{{ replica_clauses | join(",\n ") }}
)
{% endset %}
{{ run_query(create_cluster_ddl) }}
{{ set_cluster_ci_tag(deploy_cluster) }}
{% endif %}
{% endif %}

{{ return(deploy_cluster) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@
c.managed,
c.size,
c.replication_factor,
c.id AS cluster_id,
c.name AS cluster_name,
cs.type AS schedule_type,
cs.refresh_hydration_time_estimate
FROM mz_clusters c
Expand All @@ -131,23 +129,28 @@
{% set managed = results[0] %}
{% set size = results[1] %}
{% set replication_factor = results[2] %}
{% set schedule_type = results[5] %}
{% set refresh_hydration_time_estimate = results[6] %}

{% if not managed %}
{{ exceptions.raise_compiler_error("Production cluster " ~ origin_cluster ~ " is not managed") }}
{% set schedule_type = results[3] %}
{% set refresh_hydration_time_estimate = results[4] %}

{% if managed %}
{% set deploy_cluster = create_cluster(
cluster_name=cluster,
size=size,
replication_factor=replication_factor,
schedule_type=schedule_type,
refresh_hydration_time_estimate=refresh_hydration_time_estimate,
ignore_existing_objects=ignore_existing_objects,
force_deploy_suffix=True
) %}
{% else %}
{% set deploy_cluster = create_unmanaged_cluster(
cluster_name=cluster,
origin_cluster=origin_cluster,
ignore_existing_objects=ignore_existing_objects,
force_deploy_suffix=True
) %}
{% endif %}

{% set deploy_cluster = create_cluster(
cluster_name=cluster,
size=size,
replication_factor=replication_factor,
schedule_type=schedule_type,
refresh_hydration_time_estimate=refresh_hydration_time_estimate,
ignore_existing_objects=ignore_existing_objects,
force_deploy_suffix=True
) %}

{{ internal_copy_cluster_grants(cluster, deploy_cluster) }}
{% endif %}
{% endfor %}
Expand Down
112 changes: 109 additions & 3 deletions misc/dbt-materialize/tests/adapter/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,13 +581,119 @@ def test_dbt_deploy_missing_deployment_schema(self, project):

run_dbt(["run-operation", "deploy_promote"], expect_pass=False)

def test_fails_on_unmanaged_cluster(self, project):
def test_dbt_deploy_init_unmanaged_empty(self, project):
project.run_sql("CREATE CLUSTER prod REPLICAS ()")
project.run_sql("CREATE SCHEMA prod")
project.run_sql("CREATE SCHEMA staging")
project.run_sql("CREATE SCHEMA staging_dbt_deploy")

run_dbt(["run-operation", "deploy_init"], expect_pass=False)
run_dbt(["run-operation", "deploy_init"])

managed = project.run_sql(
"SELECT managed FROM mz_clusters WHERE name = 'prod_dbt_deploy'",
fetch="one",
)
assert managed == (False,)

replica_count = project.run_sql(
"""
SELECT count(*)
FROM mz_cluster_replicas cr
JOIN mz_clusters c ON cr.cluster_id = c.id
WHERE c.name = 'prod_dbt_deploy'
""",
fetch="one",
)
assert replica_count == (0,)

def test_dbt_deploy_init_unmanaged_single_replica(self, project):
project.run_sql("CREATE CLUSTER prod REPLICAS (r1 (SIZE 'scale=1,workers=1'))")
project.run_sql("CREATE SCHEMA prod")
project.run_sql("CREATE SCHEMA staging")

run_dbt(["run-operation", "deploy_init"])

managed = project.run_sql(
"SELECT managed FROM mz_clusters WHERE name = 'prod_dbt_deploy'",
fetch="one",
)
assert managed == (False,)

replicas = project.run_sql(
"""
SELECT cr.name, cr.size
FROM mz_cluster_replicas cr
JOIN mz_clusters c ON cr.cluster_id = c.id
WHERE c.name = 'prod_dbt_deploy'
ORDER BY cr.name
""",
fetch="all",
)
assert replicas == [("r1", "scale=1,workers=1")]

def test_dbt_deploy_init_unmanaged_multi_replica(self, project):
project.run_sql(
"CREATE CLUSTER prod REPLICAS ("
"r1 (SIZE 'scale=1,workers=1'), "
"r2 (SIZE 'scale=1,workers=1'))"
)
project.run_sql("CREATE SCHEMA prod")
project.run_sql("CREATE SCHEMA staging")

run_dbt(["run-operation", "deploy_init"])

replicas = project.run_sql(
"""
SELECT cr.name, cr.size
FROM mz_cluster_replicas cr
JOIN mz_clusters c ON cr.cluster_id = c.id
WHERE c.name = 'prod_dbt_deploy'
ORDER BY cr.name
""",
fetch="all",
)
assert replicas == [
("r1", "scale=1,workers=1"),
("r2", "scale=1,workers=1"),
]

def test_dbt_deploy_init_unmanaged_idempotent(self, project):
project.run_sql("CREATE CLUSTER prod REPLICAS (r1 (SIZE 'scale=1,workers=1'))")
project.run_sql("CREATE SCHEMA prod")
project.run_sql("CREATE SCHEMA staging")

run_dbt(["run-operation", "deploy_init"])
run_dbt(["run-operation", "deploy_init"])

replicas = project.run_sql(
"""
SELECT cr.name, cr.size
FROM mz_cluster_replicas cr
JOIN mz_clusters c ON cr.cluster_id = c.id
WHERE c.name = 'prod_dbt_deploy'
ORDER BY cr.name
""",
fetch="all",
)
assert replicas == [("r1", "scale=1,workers=1")]

def test_dbt_deploy_init_unmanaged_errors_on_unsized_replica(self, project):
try:
project.run_sql(
"CREATE CLUSTER prod REPLICAS "
"(r1 (STORAGECTL ADDRESSES ['s:1234'], "
"COMPUTECTL ADDRESSES ['c:1234'], "
"WORKERS 1))"
)
except psycopg2.Error as e:
pytest.skip(f"local Materialize image rejects unsized replicas: {e}")
project.run_sql("CREATE SCHEMA prod")
project.run_sql("CREATE SCHEMA staging")

_, log_output = run_dbt_and_capture(
["run-operation", "deploy_init"], expect_pass=False
)
assert "r1" in log_output
assert "no SIZE" in log_output

def test_dbt_deploy_init_with_refresh_hydration_time(self, project):
project.run_sql(
Expand Down
Loading