|
17 | 17 | import concurrent.futures |
18 | 18 | import math |
19 | 19 | import threading |
| 20 | +import uuid |
| 21 | +import warnings |
20 | 22 | from typing import Literal, Mapping, Optional, Sequence, Tuple |
21 | 23 |
|
22 | 24 | import google.api_core.exceptions |
23 | 25 | import google.cloud.bigquery.job as bq_job |
24 | 26 | import google.cloud.bigquery.table as bq_table |
25 | 27 | import google.cloud.bigquery_storage_v1 |
| 28 | +import google.cloud.exceptions |
26 | 29 | from google.cloud import bigquery |
27 | 30 |
|
28 | 31 | import bigframes |
@@ -124,9 +127,7 @@ def to_sql( |
124 | 127 | else array_value.node |
125 | 128 | ) |
126 | 129 | node = self._substitute_large_local_sources(node) |
127 | | - compiled = compile.compiler().compile_sql( |
128 | | - compile.CompileRequest(node, sort_rows=ordered) |
129 | | - ) |
| 130 | + compiled = self._compile(node, ordered=ordered) |
130 | 131 | return compiled.sql |
131 | 132 |
|
132 | 133 | def execute( |
@@ -242,46 +243,55 @@ def _export_gbq( |
242 | 243 | # validate destination table |
243 | 244 | existing_table = self._maybe_find_existing_table(spec) |
244 | 245 |
|
245 | | - compiled = compile.compiler().compile_sql( |
246 | | - compile.CompileRequest(plan, sort_rows=False) |
247 | | - ) |
248 | | - sql = compiled.sql |
| 246 | + def run_with_compiler(compiler_name, compiler_id=None): |
| 247 | + compiled = self._compile(plan, ordered=False, compiler_name=compiler_name) |
| 248 | + sql = compiled.sql |
249 | 249 |
|
250 | | - if (existing_table is not None) and _is_schema_match( |
251 | | - existing_table.schema, array_value.schema |
252 | | - ): |
253 | | - # b/409086472: Uses DML for table appends and replacements to avoid |
254 | | - # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: |
255 | | - # https://cloud.google.com/bigquery/quotas#standard_tables |
256 | | - job_config = bigquery.QueryJobConfig() |
| 250 | + if (existing_table is not None) and _is_schema_match( |
| 251 | + existing_table.schema, array_value.schema |
| 252 | + ): |
| 253 | + # b/409086472: Uses DML for table appends and replacements to avoid |
| 254 | + # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: |
| 255 | + # https://cloud.google.com/bigquery/quotas#standard_tables |
| 256 | + job_config = bigquery.QueryJobConfig() |
| 257 | + |
| 258 | + ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql) |
| 259 | + if spec.if_exists == "append": |
| 260 | + sql = sg_sql.to_sql( |
| 261 | + sg_sql.insert(ir.expr.as_select_all(), spec.table) |
| 262 | + ) |
| 263 | + else: # for "replace" |
| 264 | + assert spec.if_exists == "replace" |
| 265 | + sql = sg_sql.to_sql( |
| 266 | + sg_sql.replace(ir.expr.as_select_all(), spec.table) |
| 267 | + ) |
| 268 | + else: |
| 269 | + dispositions = { |
| 270 | + "fail": bigquery.WriteDisposition.WRITE_EMPTY, |
| 271 | + "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, |
| 272 | + "append": bigquery.WriteDisposition.WRITE_APPEND, |
| 273 | + } |
| 274 | + job_config = bigquery.QueryJobConfig( |
| 275 | + write_disposition=dispositions[spec.if_exists], |
| 276 | + destination=spec.table, |
| 277 | + clustering_fields=spec.cluster_cols if spec.cluster_cols else None, |
| 278 | + ) |
257 | 279 |
|
258 | | - ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql) |
259 | | - if spec.if_exists == "append": |
260 | | - sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table)) |
261 | | - else: # for "replace" |
262 | | - assert spec.if_exists == "replace" |
263 | | - sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table)) |
264 | | - else: |
265 | | - dispositions = { |
266 | | - "fail": bigquery.WriteDisposition.WRITE_EMPTY, |
267 | | - "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, |
268 | | - "append": bigquery.WriteDisposition.WRITE_APPEND, |
269 | | - } |
270 | | - job_config = bigquery.QueryJobConfig( |
271 | | - write_disposition=dispositions[spec.if_exists], |
272 | | - destination=spec.table, |
273 | | - clustering_fields=spec.cluster_cols if spec.cluster_cols else None, |
| 280 | + # Attach data type usage to the job labels |
| 281 | + job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs |
| 282 | + job_config.labels["bigframes-compiler"] = ( |
| 283 | + f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name |
| 284 | + ) |
| 285 | + # TODO(swast): plumb through the api_name of the user-facing api that |
| 286 | + # caused this query. |
| 287 | + iterator, job = self._run_execute_query( |
| 288 | + sql=sql, |
| 289 | + job_config=job_config, |
| 290 | + session=array_value.session, |
274 | 291 | ) |
| 292 | + return iterator, job |
275 | 293 |
|
276 | | - # Attach data type usage to the job labels |
277 | | - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs |
278 | | - # TODO(swast): plumb through the api_name of the user-facing api that |
279 | | - # caused this query. |
280 | | - iterator, job = self._run_execute_query( |
281 | | - sql=sql, |
282 | | - job_config=job_config, |
283 | | - session=array_value.session, |
284 | | - ) |
| 294 | + iterator, job = self._compile_with_fallback(run_with_compiler) |
285 | 295 |
|
286 | 296 | has_special_dtype_col = any( |
287 | 297 | t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE) |
@@ -410,6 +420,43 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): |
410 | 420 | self.prepare_plan(array_value.node) |
411 | 421 | ) |
412 | 422 |
|
| 423 | + def _compile( |
| 424 | + self, |
| 425 | + node: nodes.BigFrameNode, |
| 426 | + *, |
| 427 | + ordered: bool = False, |
| 428 | + peek: Optional[int] = None, |
| 429 | + materialize_all_order_keys: bool = False, |
| 430 | + compiler_name: Literal["sqlglot", "ibis"] = "sqlglot", |
| 431 | + ) -> compile.CompileResult: |
| 432 | + return compile.compile_sql( |
| 433 | + compile.CompileRequest( |
| 434 | + node, |
| 435 | + sort_rows=ordered, |
| 436 | + peek_count=peek, |
| 437 | + materialize_all_order_keys=materialize_all_order_keys, |
| 438 | + ), |
| 439 | + compiler_name=compiler_name, |
| 440 | + ) |
| 441 | + |
| 442 | + def _compile_with_fallback(self, run_fn): |
| 443 | + compiler_option = bigframes.options.experiments.sql_compiler |
| 444 | + if compiler_option == "legacy": |
| 445 | + return run_fn("ibis") |
| 446 | + elif compiler_option == "experimental": |
| 447 | + return run_fn("sqlglot") |
| 448 | + else: # stable |
| 449 | + compiler_id = f"{uuid.uuid1().hex[:12]}" |
| 450 | + try: |
| 451 | + return run_fn("sqlglot", compiler_id=compiler_id) |
| 452 | + except google.cloud.exceptions.BadRequest as e: |
| 453 | + msg = bfe.format_message( |
| 454 | + f"Compiler ID {compiler_id}: BadRequest on sqlglot. " |
| 455 | + f"Falling back to ibis. Details: {e.message}" |
| 456 | + ) |
| 457 | + warnings.warn(msg, category=UserWarning) |
| 458 | + return run_fn("ibis", compiler_id=compiler_id) |
| 459 | + |
413 | 460 | def prepare_plan( |
414 | 461 | self, |
415 | 462 | plan: nodes.BigFrameNode, |
@@ -604,34 +651,43 @@ def _execute_plan_gbq( |
604 | 651 | ] |
605 | 652 | cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] |
606 | 653 |
|
607 | | - compiled = compile.compiler().compile_sql( |
608 | | - compile.CompileRequest( |
| 654 | + def run_with_compiler(compiler_name, compiler_id=None): |
| 655 | + compiled = self._compile( |
609 | 656 | plan, |
610 | | - sort_rows=ordered, |
611 | | - peek_count=peek, |
| 657 | + ordered=ordered, |
| 658 | + peek=peek, |
612 | 659 | materialize_all_order_keys=(cache_spec is not None), |
| 660 | + compiler_name=compiler_name, |
613 | 661 | ) |
614 | | - ) |
615 | | - # might have more columns than og schema, for hidden ordering columns |
616 | | - compiled_schema = compiled.sql_schema |
| 662 | + # might have more columns than og schema, for hidden ordering columns |
| 663 | + compiled_schema = compiled.sql_schema |
617 | 664 |
|
618 | | - destination_table: Optional[bigquery.TableReference] = None |
| 665 | + destination_table: Optional[bigquery.TableReference] = None |
619 | 666 |
|
620 | | - job_config = bigquery.QueryJobConfig() |
621 | | - if create_table: |
622 | | - destination_table = self.storage_manager.create_temp_table( |
623 | | - compiled_schema, cluster_cols |
| 667 | + job_config = bigquery.QueryJobConfig() |
| 668 | + if create_table: |
| 669 | + destination_table = self.storage_manager.create_temp_table( |
| 670 | + compiled_schema, cluster_cols |
| 671 | + ) |
| 672 | + job_config.destination = destination_table |
| 673 | + |
| 674 | + # Attach data type usage to the job labels |
| 675 | + job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs |
| 676 | + job_config.labels["bigframes-compiler"] = ( |
| 677 | + f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name |
624 | 678 | ) |
625 | | - job_config.destination = destination_table |
626 | | - |
627 | | - # Attach data type usage to the job labels |
628 | | - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs |
629 | | - iterator, query_job = self._run_execute_query( |
630 | | - sql=compiled.sql, |
631 | | - job_config=job_config, |
632 | | - query_with_job=(destination_table is not None), |
633 | | - session=plan.session, |
634 | | - ) |
| 679 | + iterator, query_job = self._run_execute_query( |
| 680 | + sql=compiled.sql, |
| 681 | + job_config=job_config, |
| 682 | + query_with_job=(destination_table is not None), |
| 683 | + session=plan.session, |
| 684 | + ) |
| 685 | + return iterator, query_job, compiled |
| 686 | + |
| 687 | + iterator, query_job, compiled = self._compile_with_fallback(run_with_compiler) |
| 688 | + |
| 689 | + # might have more columns than og schema, for hidden ordering columns |
| 690 | + compiled_schema = compiled.sql_schema |
635 | 691 |
|
636 | 692 | # we could actually cache even when caching is not explicitly requested, but being conservative for now |
637 | 693 | result_bq_data = None |
|
0 commit comments