Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions docs/lakebridge/docs/reconcile/example_config.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ Consider the below tables that we want to reconcile:
:::
```json
{
"source_catalog": "source_catalog",
"source_schema": "source_schema",
"target_catalog": "target_catalog",
"target_schema": "target_schema",
"tables": [
{
"source_name": "product_prod",
Expand Down Expand Up @@ -92,10 +88,6 @@ Consider the below tables that we want to reconcile:
:::
```json
{
"source_catalog": "source_catalog",
"source_schema": "source_schema",
"target_catalog": "target_catalog",
"target_schema": "target_schema",
"tables": [
{
"aggregates": [{
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the changes in here are, this looks like static documentation setup rather than content-related?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is reconcile notebook exported as html from databricks workspace. the change in the notebook is deleting the removed config

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions src/databricks/labs/lakebridge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,6 @@ def reconcile(*, w: WorkspaceClient) -> None:
logger.debug(f"User: {user}")
recon_runner = ReconcileRunner(
ctx.workspace_client,
ctx.installation,
ctx.install_state,
ctx.prompts,
)
Expand All @@ -666,7 +665,6 @@ def aggregates_reconcile(*, w: WorkspaceClient) -> None:
logger.debug(f"User: {user}")
recon_runner = ReconcileRunner(
ctx.workspace_client,
ctx.installation,
ctx.install_state,
ctx.prompts,
)
Expand Down
26 changes: 8 additions & 18 deletions src/databricks/labs/lakebridge/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,17 @@ def v2_migrate(cls, raw: dict[str, Any]) -> dict[str, Any]:
@dataclass
class TableRecon:
__file__ = "recon_config.yml"
__version__ = 1
__version__ = 2

source_schema: str
target_catalog: str
target_schema: str
tables: list[Table]
source_catalog: str | None = None
Comment on lines -215 to -219
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're changing the schema of something we store in the user's workspace, I think we need:

  • To bump the version number.
  • Tests for the migration/loading path.

Even if loading ignores (now unknown) fields, an older version of the code will choke if it tries to load an instance persisted with this new schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is a backwards compatible change, I didn't bump the version. So I added a test for testing that.

, an older version of the code will choke if it tries to load an instance persisted with this new schema.

And I am not aware of any forward-compatibility support in blueprint. or did I misunderstood something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the issue:

an older version of the code will choke if it tries to load an instance persisted with this new schema.

The scenario is:

  1. Someone runs with this version of the code.
  2. The file is created.
  3. For whatever reason (downgrade, cut'n'paste to another team-member, different environment), someone runs an older version of the code.

This leads to:

  • If the version is not bumped: the older version will choke because it expects fields that are missing.
  • If the version is bumped: the older version will detect that it can't be loaded and raise an IllegalState error with something like "expected version 1, got 2".

Both are errors, but the second one is controlled and expected, and also much easier to diagnose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. just pushed version 2


def __post_init__(self):
self.source_schema = self.source_schema.lower()
self.target_schema = self.target_schema.lower()
self.target_catalog = self.target_catalog.lower()
self.source_catalog = self.source_catalog.lower() if self.source_catalog else self.source_catalog
@classmethod
def v1_migrate(cls, raw: dict[str, Any]) -> dict[str, Any]:
old_keys = ["source_catalog", "source_schema", "target_catalog", "target_schema"]
for key in old_keys:
raw.pop(key, None)
raw["version"] = 2
return raw


@dataclass
Expand All @@ -246,12 +244,6 @@ class ValidationResult:
exception_msg: str | None


@dataclass
class ReconcileTablesConfig:
filter_type: str # all/include/exclude
tables_list: list[str] # [*, table1, table2]


@dataclass
class ReconcileMetadataConfig:
catalog: str = "remorph"
Expand All @@ -269,8 +261,6 @@ class ReconcileConfig:
secret_scope: str
database_config: DatabaseConfig
metadata_config: ReconcileMetadataConfig
job_id: str | None = None
tables: ReconcileTablesConfig | None = None
Comment on lines -272 to -273
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here, regarding version/schema change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can bump the version but I would not (here at least).

  • this was never set in the old code and has no impact on the existing installations
  • Implement databricks creds manager #2123 bumps the version so no need to do it wice
  • these are optional values so the older code will continue working



@dataclass
Expand Down
56 changes: 2 additions & 54 deletions src/databricks/labs/lakebridge/reconcile/runner.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import logging
import webbrowser

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.installation import SerdeError
from databricks.labs.blueprint.installer import InstallState
from databricks.labs.blueprint.tui import Prompts
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound, PermissionDenied

from databricks.labs.lakebridge.config import ReconcileConfig, TableRecon
from databricks.labs.lakebridge.deployment.recon import RECON_JOB_NAME
from databricks.labs.lakebridge.reconcile.recon_config import RECONCILE_OPERATION_NAME

Expand All @@ -21,18 +17,15 @@ class ReconcileRunner:
def __init__(
self,
ws: WorkspaceClient,
installation: Installation,
install_state: InstallState,
prompts: Prompts,
):
self._ws = ws
self._installation = installation
self._install_state = install_state
self._prompts = prompts

def run(self, operation_name=RECONCILE_OPERATION_NAME):
reconcile_config = self._get_verified_recon_config()
job_id = self._get_recon_job_id(reconcile_config)
job_id = self._get_recon_job_id()
logger.info(f"Triggering the reconcile job with job_id: `{job_id}`")
wait = self._ws.jobs.run_now(job_id, job_parameters={"operation_name": operation_name})
if not wait.run_id:
Expand All @@ -45,52 +38,7 @@ def run(self, operation_name=RECONCILE_OPERATION_NAME):
if self._prompts.confirm(f"Would you like to open the job run URL `{job_run_url}` in the browser?"):
webbrowser.open(job_run_url)

def _get_verified_recon_config(self) -> ReconcileConfig:
try:
recon_config = self._installation.load(ReconcileConfig)
except NotFound as err:
raise SystemExit("Cannot find existing `reconcile` installation. Please try reinstalling.") from err
except (PermissionDenied, SerdeError, ValueError, AttributeError) as e:
install_dir = self._installation.install_folder()
raise SystemExit(
f"Existing `reconcile` installation at {install_dir} is corrupted. Please try reinstalling."
) from e

self._verify_recon_table_config(recon_config)
return recon_config

def _verify_recon_table_config(self, recon_config):
source_catalog_or_schema = (
recon_config.database_config.source_catalog
if recon_config.database_config.source_catalog
else recon_config.database_config.source_schema
)
# Filename pattern for recon table config `recon_config_<SOURCE>_<CATALOG_OR_SCHEMA>_<FILTER_TYPE>.json`
# Example: recon_config_snowflake_sample_data_all.json
filename = f"recon_config_{recon_config.data_source}_{source_catalog_or_schema}_{recon_config.report_type}.json"
try:
logger.debug(f"Loading recon table config `{filename}` from workspace.")
self._installation.load(TableRecon, filename=filename)
except NotFound as e:
err_msg = (
"Cannot find recon table configuration in existing `reconcile` installation. "
f"Please provide the configuration file {filename} in the workspace."
)
logger.error(f"{err_msg}. For more details, please refer to the docs {_RECON_DOCS_URL}")
raise SystemExit(err_msg) from e
except (PermissionDenied, SerdeError, ValueError, AttributeError) as e:
install_dir = self._installation.install_folder()
err_msg = (
f"Cannot load corrupted recon table configuration from {install_dir}/{filename}. "
f"Please validate the file."
)
logger.error(f"{err_msg}. For more details, please refer to the docs {_RECON_DOCS_URL}")
raise SystemExit(err_msg) from e

def _get_recon_job_id(self, reconcile_config: ReconcileConfig) -> int:
if reconcile_config.job_id:
logger.debug("Reconcile job id found in the reconcile config.")
return int(reconcile_config.job_id)
def _get_recon_job_id(self) -> int:
if RECON_JOB_NAME in self._install_state.jobs:
logger.debug("Reconcile job id found in the install state.")
return int(self._install_state.jobs[RECON_JOB_NAME])
Expand Down
20 changes: 0 additions & 20 deletions tests/integration/reconcile/query_builder/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,10 +666,6 @@ def mock_for_report_type_data(
normalized_table_conf_with_opts.drop_columns = ["`s_acctbal`"]
normalized_table_conf_with_opts.column_thresholds = None
table_recon = TableRecon(
source_catalog="org",
source_schema="data",
target_catalog="org",
target_schema="data",
tables=[normalized_table_conf_with_opts],
)
src_schema, tgt_schema = table_schema_ansi_ansi
Expand Down Expand Up @@ -877,10 +873,6 @@ def mock_for_report_type_schema(
normalized_table_conf_with_opts, table_schema_ansi_ansi, query_store, mock_spark, setup_metadata_table
):
table_recon = TableRecon(
source_catalog="org",
source_schema="data",
target_catalog="org",
target_schema="data",
tables=[normalized_table_conf_with_opts],
)
src_schema, tgt_schema = table_schema_ansi_ansi
Expand Down Expand Up @@ -1077,10 +1069,6 @@ def mock_for_report_type_all(
normalized_table_conf_with_opts.drop_columns = ["`s_acctbal`"]
normalized_table_conf_with_opts.column_thresholds = None
table_recon = TableRecon(
source_catalog="org",
source_schema="data",
target_catalog="org",
target_schema="data",
tables=[normalized_table_conf_with_opts],
)
src_schema, tgt_schema = table_schema_oracle_ansi
Expand Down Expand Up @@ -1341,10 +1329,6 @@ def mock_for_report_type_row(
normalized_table_conf_with_opts.drop_columns = ["`s_acctbal`"]
normalized_table_conf_with_opts.column_thresholds = None
table_recon = TableRecon(
source_catalog="org",
source_schema="data",
target_catalog="org",
target_schema="data",
tables=[normalized_table_conf_with_opts],
)
src_schema, tgt_schema = table_schema_ansi_ansi
Expand Down Expand Up @@ -1570,10 +1554,6 @@ def mock_for_recon_exception(normalized_table_conf_with_opts, setup_metadata_tab
normalized_table_conf_with_opts.column_thresholds = None
normalized_table_conf_with_opts.join_columns = None
table_recon = TableRecon(
source_catalog="org",
source_schema="data",
target_catalog="org",
target_schema="data",
tables=[normalized_table_conf_with_opts],
)
source = MockDataSource({}, {})
Expand Down
1 change: 1 addition & 0 deletions tests/unit/contexts/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def test_workspace_context_attributes_not_none(ws):
"schema": "reconcile",
"volume": "reconcile_volume",
},
"job_id": "12345", # removed as it was never used
"version": 1,
},
"state.json": {
Expand Down
Loading
Loading