1919 SourceQuery ,
2020 InsertOverwriteStrategy ,
2121)
22-
2322if t .TYPE_CHECKING :
2423 from sqlmesh .core ._typing import SchemaName , TableName
2524 from sqlmesh .core .engine_adapter ._typing import QueryOrDF
@@ -104,8 +103,13 @@ def _get_data_objects(
104103 """
105104 schema_name = to_schema (schema_name )
106105 schema = schema_name .db
107-
108- info_schema_tables = exp .table_ ("tables" , db = "information_schema" , catalog = schema_name .catalog , alias = "t" )
106+ catalog = schema_name .catalog
107+
108+ # In Athena, information_schema queries spanning catalogs often fail with CATALOG_NOT_FOUND.
109+ # We need to temporarily set the default catalog to the target catalog to execute this query successfully
110+ # or use system views depending on exact driver support. By omitting the catalog from the table explicitly
111+ # and setting it via connection, we ensure it maps to the correct AWS/S3 integration natively.
112+ info_schema_tables = exp .table_ ("tables" , db = "information_schema" , alias = "t" )
109113
110114 query = (
111115 exp .select (
@@ -126,7 +130,25 @@ def _get_data_objects(
126130 if object_names :
127131 query = query .where (exp .column ("table_name" , table = "t" ).isin (* object_names ))
128132
129- df = self .fetchdf (query )
133+ current_catalog = self .get_current_catalog ()
134+
135+ if catalog and catalog != self ._default_catalog :
136+ if current_catalog != catalog :
137+ self .set_current_catalog (catalog )
138+
139+ try :
140+ df = self .fetchdf (query )
141+
142+ # For queries that don't return the catalog in the result (some drivers/engines),
143+ # fill it in if it's missing or empty and we explicitly queried for a specific catalog
144+ if catalog and df is not None and not df .empty and "catalog" in df .columns :
145+ df ["catalog" ] = df ["catalog" ].fillna (catalog )
146+ # Replace empty strings with the catalog as well
147+ df ["catalog" ] = df ["catalog" ].replace ("" , catalog )
148+
149+ finally :
150+ if catalog and catalog != self ._default_catalog and current_catalog is not None and current_catalog != catalog :
151+ self .set_current_catalog (current_catalog )
130152
131153 return [
132154 DataObject (
@@ -164,7 +186,11 @@ def columns(
164186 table = exp .to_table (table_name )
165187 # note: the data_type column contains the full parameterized type, eg 'varchar(10)'
166188
167- info_schema_columns = exp .table_ ("columns" , db = "information_schema" , catalog = table .catalog )
189+ catalog = table .catalog
190+
191+ # Fetching column info across catalogs often fails in Athena (CATALOG_NOT_FOUND)
192+ # So we strip the catalog and set the current catalog dynamically
193+ info_schema_columns = exp .table_ ("columns" , db = "information_schema" )
168194
169195 query = (
170196 exp .select ("column_name" , "data_type" )
@@ -173,6 +199,12 @@ def columns(
173199 .order_by ("ordinal_position" )
174200 )
175201
202+ current_catalog = self .get_current_catalog ()
203+
204+ if catalog and catalog != self ._default_catalog :
205+ if current_catalog != catalog :
206+ self .set_current_catalog (catalog )
207+
176208 try :
177209 result = self .fetchdf (query , quote_identifiers = True )
178210 return {
@@ -185,13 +217,8 @@ def columns(
185217 # and rely on the set_current_catalog mechanism (applied at the EngineAdapter method level)
186218 # to set the catalog in the execution context.
187219 describe_table = table .copy ()
188- catalog = describe_table .catalog
189- current_catalog = self .get_current_catalog ()
190-
191220 if catalog and catalog != self ._default_catalog :
192221 describe_table .set ("catalog" , None )
193- if catalog != current_catalog :
194- self .set_current_catalog (catalog )
195222
196223 try :
197224 self .execute (exp .Describe (this = describe_table , kind = "TABLE" ))
@@ -209,9 +236,10 @@ def columns(
209236 if column_name and column_name .strip () and column_type and column_type .strip ()
210237 }
211238 finally :
212- if catalog and catalog != self ._default_catalog and current_catalog != catalog :
213- if current_catalog is not None :
214- self .set_current_catalog (current_catalog )
239+ pass # context reset is handled in outer finally block
240+ finally :
241+ if catalog and catalog != self ._default_catalog and current_catalog is not None and current_catalog != catalog :
242+ self .set_current_catalog (current_catalog )
215243
216244 def _drop_object (
217245 self ,
@@ -263,7 +291,7 @@ def _create_schema(
263291 if not any (p for p in properties if isinstance (p , exp .LocationProperty )):
264292 properties .append (location )
265293
266- if schema .catalog and schema . catalog != self . _default_catalog :
294+ if schema .catalog :
267295 target_schema = schema .copy ()
268296 catalog = target_schema .catalog
269297 target_schema .set ("catalog" , None )
@@ -430,7 +458,7 @@ def _build_create_table_exp(
430458 # But we also need to strip it from the generated CREATE TABLE statement.
431459 # Note: We must strip the catalog from the table in the schema if table_name_or_schema is a schema.
432460 target_table = create_table .this if isinstance (create_table , exp .Schema ) else create_table
433- if not expression and target_table .catalog and target_table . catalog != self . _default_catalog :
461+ if not expression and target_table .catalog :
434462 target_table .set ("catalog" , None )
435463
436464 return exp .Create (
@@ -515,7 +543,7 @@ def _build_table_properties_exp(
515543 exp .PartitionedByProperty (this = exp .Schema (expressions = schema_expressions ))
516544 )
517545 else :
518- if is_s3_table :
546+ if is_s3_table and expression :
519547 array_exprs = []
520548 for e in schema_expressions :
521549 e_copy = e .copy ()
@@ -640,28 +668,28 @@ def _query_table_type_or_raise(self, table: exp.Table) -> TableType:
640668 # Note: SHOW TBLPROPERTIES gets parsed by SQLGlot as an exp.Command anyway so we just use a string here
641669 # This also means we need to use dialect="hive" instead of dialect="athena" so that the identifiers get the correct quoting (backticks)
642670 target_table = table .copy ()
643- if target_table .catalog and target_table .catalog != self ._default_catalog :
644- catalog = target_table .catalog
671+ catalog = target_table .catalog
672+
673+ current_catalog = self .get_current_catalog ()
674+ if catalog and catalog != self ._default_catalog :
645675 target_table .set ("catalog" , None )
646-
647- current_catalog = self .get_current_catalog ()
648676 if current_catalog != catalog :
649677 self .set_current_catalog (catalog )
650-
651- try :
652- for row in self .fetchall (f"SHOW TBLPROPERTIES { target_table .sql (dialect = 'hive' , identify = True )} " ):
653- row_lower = row [0 ].lower ()
654- if "external" in row_lower and "true" in row_lower :
655- return "hive"
656- finally :
657- if current_catalog is not None and current_catalog != catalog :
658- self .set_current_catalog (current_catalog )
659- else :
678+
679+ try :
660680 for row in self .fetchall (f"SHOW TBLPROPERTIES { target_table .sql (dialect = 'hive' , identify = True )} " ):
661681 # This query returns a single column with values like 'EXTERNAL\tTRUE'
662682 row_lower = row [0 ].lower ()
663683 if "external" in row_lower and "true" in row_lower :
664684 return "hive"
685+ except Exception :
686+ # If SHOW TBLPROPERTIES fails (e.g. S3 Tables might not support it), assume iceberg
687+ # S3 tables are always iceberg anyway
688+ pass
689+ finally :
690+ if catalog and catalog != self ._default_catalog and current_catalog is not None and current_catalog != catalog :
691+ self .set_current_catalog (current_catalog )
692+
665693 return "iceberg"
666694
667695 def _is_hive_partitioned_table (self , table : exp .Table ) -> bool :
0 commit comments