@@ -86,7 +86,7 @@ def _create_cadence_dag_for_snapshot(self, snapshot: Snapshot) -> DAG:
8686
8787 with DAG (
8888 dag_id = dag_id ,
89- schedule_interval = snapshot .model .cron ,
89+ schedule_interval = snapshot .node .cron ,
9090 start_date = pendulum .instance (to_datetime (snapshot .unpaused_ts )),
9191 max_active_runs = 1 ,
9292 catchup = True ,
@@ -98,7 +98,7 @@ def _create_cadence_dag_for_snapshot(self, snapshot: Snapshot) -> DAG:
9898 ],
9999 default_args = {
100100 ** DAG_DEFAULT_ARGS ,
101- "email" : snapshot .model .owner ,
101+ "email" : snapshot .node .owner ,
102102 "email_on_failure" : True ,
103103 },
104104 ) as dag :
@@ -227,7 +227,7 @@ def _create_creation_tasks(
227227 self , new_snapshots : t .List [Snapshot ], ddl_concurrent_tasks : int
228228 ) -> t .Tuple [BaseOperator , BaseOperator ]:
229229 start_task = EmptyOperator (task_id = "snapshot_creation_start" )
230- end_task = EmptyOperator (task_id = "snapshot_creation_end" )
230+ end_task = EmptyOperator (task_id = "snapshot_creation_end" , trigger_rule = "none_failed" )
231231
232232 if not new_snapshots :
233233 start_task >> end_task
@@ -312,6 +312,7 @@ def _create_promotion_demotion_tasks(
312312 "environment" : environment ,
313313 "unpaused_dt" : request .unpaused_dt ,
314314 },
315+ trigger_rule = "none_failed" ,
315316 )
316317
317318 update_state_task >> migrate_tables_task
0 commit comments