From aca06b866c2c24182a81f1de0aba8d52f209aa13 Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Fri, 1 May 2026 12:12:53 +0000 Subject: [PATCH] Update differ to ouptut MCF files --- tools/import_differ/README.md | 2 +- tools/import_differ/differ_utils.py | 22 ++ tools/import_differ/import_differ.py | 301 ++++++++++-------- tools/import_differ/import_differ_test.py | 84 +++-- tools/import_differ/test/current/schema.mcf | 19 +- tools/import_differ/test/previous/schema.mcf | 18 +- .../test/results/differ_summary.json | 16 + .../test/results/import_diff.mcf | 103 ++++++ .../test/results/obs_diff_log.csv | 5 - .../test/results/obs_diff_samples.csv | 4 - .../test/results/obs_diff_summary.csv | 4 - .../test/results/point_analysis_summary.csv | 4 - .../test/results/schema_diff_log.csv | 8 - .../test/results/schema_diff_summary.csv | 2 - .../import_validation_test.py | 25 +- tools/import_validation/runner.py | 101 ++++-- tools/import_validation/runner_test.py | 49 +-- tools/import_validation/validator.py | 99 ++---- tools/import_validation/validator_test.py | 72 ++--- 19 files changed, 576 insertions(+), 362 deletions(-) create mode 100644 tools/import_differ/test/results/differ_summary.json create mode 100644 tools/import_differ/test/results/import_diff.mcf delete mode 100644 tools/import_differ/test/results/obs_diff_log.csv delete mode 100644 tools/import_differ/test/results/obs_diff_samples.csv delete mode 100644 tools/import_differ/test/results/obs_diff_summary.csv delete mode 100644 tools/import_differ/test/results/point_analysis_summary.csv delete mode 100644 tools/import_differ/test/results/schema_diff_log.csv delete mode 100644 tools/import_differ/test/results/schema_diff_summary.csv diff --git a/tools/import_differ/README.md b/tools/import_differ/README.md index 094fdea135..06a4aba117 100644 --- a/tools/import_differ/README.md +++ b/tools/import_differ/README.md @@ -24,7 +24,7 @@ python import_differ.py \ - previous\_data: Path to the previous data (wildcard on local/GCS supported). - output\_location: Path to the output data folder (local/GCS). - file\_format: Format of the input data (mcf,tfrecord). -- runner\_mode: Runner mode: local (Python) / cloud (Dataflow in Cloud). +- runner\_mode: Runner mode: native (Python) / direct (Java runner) /cloud (Dataflow in Cloud). - project\_id: GCP project Id for the dataflow job. - job\_name: Name of the differ dataflow job. diff --git a/tools/import_differ/differ_utils.py b/tools/import_differ/differ_utils.py index 8ec626efab..3e64e9dfc5 100644 --- a/tools/import_differ/differ_utils.py +++ b/tools/import_differ/differ_utils.py @@ -84,6 +84,28 @@ def write_json_data(data, dest: str, file: str, tmp_dir: str): upload_output_data(path, dest) +def write_mcf_nodes(nodes: list, dest: str, file: str, tmp_dir: str): + """ Writes mcf nodes to a file with the given path.""" + if dest.startswith('gs://'): + path = os.path.join(tmp_dir, file) + else: + path = os.path.join(dest, file) + with open(path, mode='w', encoding='utf-8') as out_file: + for node in nodes: + if 'Node' in node: + out_file.write(f'Node: {node["Node"]}\n') + elif 'dcid' in node: + out_file.write(f'dcid: {node["dcid"]}\n') + + for key, value in node.items(): + if key in ['Node', 'dcid']: + continue + out_file.write(f'{key}: {value}\n') + out_file.write('\n') + if dest.startswith('gs://'): + upload_output_data(path, dest) + + def upload_output_data(src: str, dest: str): client = storage.Client() bucket_name = dest.split('/')[2] diff --git a/tools/import_differ/import_differ.py b/tools/import_differ/import_differ.py index e0e2a1060c..f3d531d341 100644 --- a/tools/import_differ/import_differ.py +++ b/tools/import_differ/import_differ.py @@ -13,11 +13,16 @@ # limitations under the License. """ Utility to generate a dataset diff for import analysis.""" +import csv +import glob +import json import os import pandas as pd import random +import subprocess import sys import time +from collections import defaultdict from enum import Enum from absl import app @@ -30,8 +35,6 @@ import differ_utils -_SAMPLE_COUNT = 3 - _DATAFLOW_TEMPLATE_URL = 'gs://datcom-templates/templates/flex/differ.json' Diff = Enum('Diff', [ @@ -48,10 +51,8 @@ ('typeOf', 4), ('dcid', 5), ('diff_type', 6), - ('diff_size', 7), - ('observationAbout', 8), - ('key_combined', 9), - ('value_combined', 10), + ('key_combined', 7), + ('value_combined', 8), ]) _FLAGS = flags.FLAGS @@ -65,7 +66,8 @@ 'Path (local/GCS) to the output data folder.') flags.DEFINE_string('file_format', 'mcf', 'Format of the input data (mcf,tfrecord)') -flags.DEFINE_string('runner_mode', 'local', 'Runner mode (local/cloud)') +flags.DEFINE_string('runner_mode', 'native', + 'Runner mode (native/direct/cloud)') flags.DEFINE_string('job_name', 'differ', 'Name of the differ job.') flags.DEFINE_string('project_id', '', 'GCP project id for the dataflow job.') @@ -76,7 +78,7 @@ class ImportDiffer: Usage: $ python import_differ.py --current_data= --previous_data= --output_location= \ - --file_format= --runner_mode= --project_id= --job_name= + --file_format= --runner_mode= --project_id= --job_name= Summary output generated is of the form below showing counts of differences for each variable. @@ -88,11 +90,8 @@ class ImportDiffer: 3 dcid:var4 0 2 0 Detailed diff output is written to files for further analysis. - - obs_diff_summary.csv: diff summary for observation analysis - - obs_diff_samples.csv: sample diff for observation analysis - - obs_diff_log.csv: diff log for observations - - schema_diff_summary.csv: diff summary for schema analysis - - schema_diff_log.csv: diff log for schema nodes + - import-diff.mcf: combined MCF diff for observations and schema + - differ_summary.json: consolidated diff statistics """ @@ -103,7 +102,7 @@ def __init__(self, project_id='', job_name='differ', file_format='mcf', - runner_mode='local'): + runner_mode='native'): self.current_data = current_data self.previous_data = previous_data self.output_path = output_location @@ -117,14 +116,6 @@ def _cleanup_data(self, df: pd.DataFrame): df[column.name] = df.get(column.name, 0) df[column.name] = df[column.name].fillna(0).astype(int) - def _get_samples(self, row): - years = sorted(row[Column.observationDate.name]) - if len(years) > _SAMPLE_COUNT: - return [years[0]] + random.sample(years[1:-1], - _SAMPLE_COUNT - 2) + [years[-1]] - else: - return years - # Processes two dataset files to identify changes. def generate_diff(self, previous_df: pd.DataFrame, current_df: pd.DataFrame) -> pd.DataFrame: @@ -204,7 +195,9 @@ def split_data(self, mcf_nodes: list) -> (pd.DataFrame, pd.DataFrame): obs_list.append({ Column.key_combined.name: key_combined, - Column.value_combined.name: value_combined + Column.value_combined.name: value_combined, + 'Node': node.get('Node', ''), + 'dcid': node.get('dcid', '') }) else: node_id_key = str(node.get('Node', "")) @@ -214,7 +207,7 @@ def split_data(self, mcf_nodes: list) -> (pd.DataFrame, pd.DataFrame): continue values_to_combine = [] keys_to_combine = [node_id_key] - node.pop(Column.dcid.name) + node.pop(Column.dcid.name, None) node.pop('Node', None) value_keys = sorted(node.keys()) for key in value_keys: @@ -231,72 +224,75 @@ def split_data(self, mcf_nodes: list) -> (pd.DataFrame, pd.DataFrame): obs_df = pd.DataFrame(obs_list) return obs_df, schema_df - def observation_diff_analysis( - self, diff: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame): - """ - Performs observation diff analysis to identify data point changes. - - Returns: - summary and results from the analysis + def convert_diff_to_mcf_nodes(self, diff_df: pd.DataFrame, + is_obs: bool) -> list: """ - if diff.empty: - return pd.DataFrame(columns=[ - Column.variableMeasured.name, Diff.ADDED.name, - Diff.DELETED.name, Diff.MODIFIED.name - ]), pd.DataFrame(columns=[ - Column.variableMeasured.name, Column.diff_type.name, - Column.observationAbout.name, Column.observationDate.name, - Column.diff_size.name - ]) - - split_df = diff[Column.key_combined.name].str.split(';', expand=True) - diff[Column.variableMeasured.name] = split_df[0] - diff[Column.observationAbout.name] = split_df[1] - diff[Column.observationDate.name] = split_df[2] - - samples = diff.groupby( - [Column.variableMeasured.name, Column.diff_type.name], - observed=True, - as_index=False)[[ - Column.observationAbout.name, Column.observationDate.name - ]].agg(lambda x: x.tolist()) - samples[Column.diff_size.name] = samples.apply( - lambda row: len(row[Column.observationAbout.name]), axis=1) - samples[Column.observationAbout.name] = samples.apply( - lambda row: random.sample( - row[Column.observationAbout.name], - min(_SAMPLE_COUNT, len(row[Column.observationAbout.name]))), - axis=1) - samples[Column.observationDate.name] = samples.apply(self._get_samples, - axis=1) - summary = samples.pivot( - index=Column.variableMeasured.name, columns=Column.diff_type.name, values=Column.diff_size.name)\ - .reset_index().rename_axis(None, axis=1) - self._cleanup_data(summary) - samples.sort_values( - by=[Column.diff_type.name, Column.variableMeasured.name], - inplace=True) - return summary, samples - - def schema_diff_analysis(self, diff: pd.DataFrame) -> pd.DataFrame: - """ - Performs variable diff analysis to identify statvar changes. - - Returns: - summary from the analysis + Converts the diff dataframe back to MCF format nodes. """ - if diff.empty: - return pd.DataFrame(columns=[ - Column.diff_type.name, Diff.ADDED.name, Diff.DELETED.name, - Diff.MODIFIED.name - ]) - - result = diff[Column.diff_type.name].value_counts().reset_index() - summary = result.set_index( - Column.diff_type.name).transpose().rename_axis( - None, axis=1).reset_index(drop=True) - self._cleanup_data(summary) - return summary + all_nodes = [] + for diff_type in [ + Diff.ADDED.name, Diff.DELETED.name, Diff.MODIFIED.name + ]: + df_type = diff_df[diff_df[Column.diff_type.name] == diff_type] + if df_type.empty: + continue + + for _, row in df_type.iterrows(): + node = {} + key_combined = str(row[Column.key_combined.name]) + + # Determine which column to use for values and node IDs + suffix = '_x' if diff_type == Diff.DELETED.name else '_y' + + # Helper to get value from row, handles cases with or without suffix + def get_val(base_name): + col_name = base_name + suffix + if col_name in row: + return str(row[col_name]) + return str(row.get(base_name, '')) + + value_combined = get_val(Column.value_combined.name) + + if is_obs: + node_id = get_val('Node') + dcid_id = get_val('dcid') + if node_id and node_id != 'nan': + node['Node'] = node_id + if dcid_id and dcid_id != 'nan': + node['dcid'] = dcid_id + + # Reconstruct observation node + groupby_keys = [ + 'variableMeasured', 'observationAbout', + 'observationDate', 'observationPeriod', + 'measurementMethod', 'unit', 'scalingFactor' + ] + keys = key_combined.split(';') + for i, key in enumerate(groupby_keys): + if i < len(keys) and keys[i] and keys[i] != "nan": + node[key] = keys[i] + + values = value_combined.split(';') + if values and values[0] and values[0] != "nan": + node['value'] = values[0] + + node['typeOf'] = 'StatVarObservation' + else: + # Reconstruct schema node + # key_combined is node_id_key + if key_combined.startswith('dcid:'): + node['dcid'] = key_combined[len('dcid:'):] + else: + node['Node'] = key_combined + + for kv in value_combined.split(';'): + if ':' in kv: + k, v = kv.split(':', 1) + node[k] = v + + node['diffType'] = diff_type + all_nodes.append(node) + return all_nodes def run_dataflow_job(self, project: str, job: str, current_data: str, previous_data: str, file_format: str, @@ -347,12 +343,44 @@ def run_dataflow_job(self, project: str, job: str, current_data: str, ) return status - def run_differ(self) -> dict: + def run_direct_job(self, current_data: str, previous_data: str, + file_format: str, output_location: str) -> str: + logging.info(f'Launching differ direct job {self.job_name}') + jar_path = os.path.join(_SCRIPT_DIR, 'differ-bundled-0.1-SNAPSHOT.jar') + if not os.path.exists(jar_path): + logging.error(f'Dataflow jar not found: {jar_path}') + return 'JOB_STATE_FAILED' + + cmd = [ + 'java', '-jar', jar_path, f'--currentData={current_data}', + f'--previousData={previous_data}', + f'--outputLocation={output_location}', '--runner=DirectRunner' + ] + if file_format == 'tfrecord': + logging.info('Using tfrecord file format') + cmd.append('--useOptimizedGraphFormat=true') + else: + logging.info('Using mcf file format') + + logging.info(f"Running command: {' '.join(cmd)}") + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + logging.error( + f"Direct job failed with return code {result.returncode}") + logging.error(f"Stdout: {result.stdout}") + logging.error(f"Stderr: {result.stderr}") + return 'JOB_STATE_FAILED' + + logging.info(f'Finished differ direct job {self.job_name}.') + return 'JOB_STATE_DONE' + + def run_differ(self): os.makedirs(self.output_path, exist_ok=True) tmp_path = os.path.join(self.output_path, self.job_name) os.makedirs(tmp_path, exist_ok=True) logging.info('Processing input data to generate diff...') + differ_summary = {} if self.runner_mode == 'cloud': # Runs dataflow job in GCP. logging.info("Invoking dataflow mode for differ") @@ -361,17 +389,18 @@ def run_differ(self) -> dict: self.previous_data, self.file_format, self.output_path) if status == 'JOB_STATE_FAILED': - raise ExectionError(f'Dataflow job {job_name} failed.') - diff_path = os.path.join(self.output_path, 'obs-diff*') - logging.info("Loading obs diff data from: %s", diff_path) - obs_diff = differ_utils.load_csv_data(diff_path, tmp_path) - diff_path = os.path.join(self.output_path, 'schema-diff*') - logging.info("Loading schema diff data from: %s", diff_path) - schema_diff = differ_utils.load_csv_data(diff_path, tmp_path) - # TODO: populate summary for cloud mode - differ_summary = {} + raise RuntimeError(f'Dataflow job {self.job_name} failed.') + elif self.runner_mode == 'direct': + # Runs dataflow jar directly. + logging.info("Invoking direct mode for differ") + status = self.run_direct_job(self.current_data, self.previous_data, + self.file_format, self.output_path) + if status == 'JOB_STATE_FAILED': + raise RuntimeError(f'Direct job {self.job_name} failed.') + + return else: - # Runs local Python differ. + # Runs native Python differ. current_dir = os.path.join(tmp_path, 'current') previous_dir = os.path.join(tmp_path, 'previous') logging.info(f'Loading current data from {self.current_data}') @@ -391,40 +420,56 @@ def run_differ(self) -> dict: logging.info('Generating schema diff...') schema_diff = self.generate_diff(previous_df_schema, current_df_schema) - differ_utils.write_csv_data(obs_diff, self.output_path, - 'obs_diff_log.csv', tmp_path) - differ_utils.write_csv_data(schema_diff, self.output_path, - 'schema_diff_log.csv', tmp_path) + + logging.info('Writing diff to MCF files...') + obs_nodes = self.convert_diff_to_mcf_nodes(obs_diff, True) + schema_nodes = self.convert_diff_to_mcf_nodes(schema_diff, False) + all_nodes = obs_nodes + schema_nodes + if all_nodes: + differ_utils.write_mcf_nodes(all_nodes, self.output_path, + 'import_diff.mcf', tmp_path) + + obs_stats = obs_diff[Column.diff_type.name].value_counts().to_dict() + schema_stats = schema_diff[ + Column.diff_type.name].value_counts().to_dict() + differ_summary = { - 'current_version': self.current_data, - 'previous_version': self.previous_data, - 'current_obs_size': current_df_obs.shape[0], - 'previous_obs_size': previous_df_obs.shape[0], - 'current_schema_size': current_df_schema.shape[0], - 'previous_schema_size': previous_df_schema.shape[0], - 'obs_diff_size': obs_diff.shape[0], - 'schema_diff_size': schema_diff.shape[0] + 'current_version': + self.current_data, + 'previous_version': + self.previous_data, + 'current_obs_count': + int(current_df_obs.shape[0]), + 'previous_obs_count': + int(previous_df_obs.shape[0]), + 'current_schema_count': + int(current_df_schema.shape[0]), + 'previous_schema_count': + int(previous_df_schema.shape[0]), + 'added_obs_count': + int(obs_stats.get(Diff.ADDED.name, 0)), + 'deleted_obs_count': + int(obs_stats.get(Diff.DELETED.name, 0)), + 'modified_obs_count': + int(obs_stats.get(Diff.MODIFIED.name, 0)), + 'added_schema_count': + int(schema_stats.get(Diff.ADDED.name, 0)), + 'deleted_schema_count': + int(schema_stats.get(Diff.DELETED.name, 0)), + 'modified_schema_count': + int(schema_stats.get(Diff.MODIFIED.name, 0)), + 'obs_diff_count': + int(obs_diff.shape[0]), + 'schema_diff_count': + int(schema_diff.shape[0]) } + logging.info( + f'Generated observation diff of size {obs_diff.shape[0]}') + logging.info( + f'Generated schema diff of size {schema_diff.shape[0]}') differ_utils.write_json_data(differ_summary, self.output_path, 'differ_summary.json', tmp_path) - - logging.info(f'Generated observation diff of size {obs_diff.shape[0]}') - logging.info(f'Generated schema diff of size {schema_diff.shape[0]}') logging.info(f'Differ summary: {differ_summary}') - - logging.info(f'Performing schema diff analysis') - schema_diff_summary = self.schema_diff_analysis(schema_diff) - logging.info('Performing observation diff analysis...') - obs_diff_summary, obs_diff_samples = self.observation_diff_analysis( - obs_diff) - - logging.info(f'Writing differ output to {self.output_path}') - differ_utils.write_csv_data(schema_diff_summary, self.output_path, - 'schema_diff_summary.csv', tmp_path) - differ_utils.write_csv_data(obs_diff_summary, self.output_path, - 'obs_diff_summary.csv', tmp_path) - differ_utils.write_csv_data(obs_diff_samples, self.output_path, - 'obs_diff_samples.csv', tmp_path) logging.info(f'Differ output written to {self.output_path}') return differ_summary diff --git a/tools/import_differ/import_differ_test.py b/tools/import_differ/import_differ_test.py index a975d0bb22..08d8d4e535 100644 --- a/tools/import_differ/import_differ_test.py +++ b/tools/import_differ/import_differ_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import os import pandas as pd import unittest @@ -32,27 +33,68 @@ class TestImportDiffer(unittest.TestCase): def test_diff_analysis(self): current_data = os.path.join(module_dir, 'test', 'current', '*.mcf') previous_data = os.path.join(module_dir, 'test', 'previous', '*.mcf') - output_location = os.path.join(module_dir) - self.differ = import_differ.ImportDiffer(current_data, previous_data, - output_location) - current_mcf = differ_utils.load_data(self.differ.current_data, '.') - previous_mcf = differ_utils.load_data(self.differ.previous_data, '.') - current_obs_df, current_schema_df = self.differ.split_data(current_mcf) - previous_obs_df, previous_schema_df = self.differ.split_data( - previous_mcf) - obs_diff = self.differ.generate_diff(previous_obs_df, current_obs_df) - summary, _ = self.differ.observation_diff_analysis(obs_diff) - expected_summary = pd.read_csv( - os.path.join(module_dir, 'test', 'results', 'obs_diff_summary.csv')) - assert_frame_equal(summary, expected_summary) - - schema_diff = self.differ.generate_diff(previous_schema_df, - current_schema_df) - summary = self.differ.schema_diff_analysis(schema_diff) - expected_summary = pd.read_csv( - os.path.join(module_dir, 'test', 'results', - 'schema_diff_summary.csv')) - assert_frame_equal(summary, expected_summary) + output_location = os.path.join(module_dir, 'test', 'output') + if os.path.exists(output_location): + import shutil + shutil.rmtree(output_location) + os.makedirs(output_location) + + differ = import_differ.ImportDiffer(current_data=current_data, + previous_data=previous_data, + output_location=output_location, + runner_mode='native') + differ.run_differ() + + # Check for expected files + expected_files = ['import_diff.mcf', 'differ_summary.json'] + + for f in expected_files: + file_path = os.path.join(output_location, f) + self.assertTrue(os.path.exists(file_path), + f"File {f} was not generated") + + # Verify content of the combined MCF file + with open(os.path.join(output_location, 'import_diff.mcf'), 'r') as f: + content = f.read().strip() + + with open( + os.path.join(module_dir, 'test', 'results', 'import_diff.mcf'), + 'r') as f: + expected_content = f.read().strip() + + # Split into individual nodes, strip whitespace, and sort to avoid ordering issues + actual_nodes = sorted( + [node.strip() for node in content.split('\n\n') if node.strip()]) + expected_nodes = sorted([ + node.strip() + for node in expected_content.split('\n\n') + if node.strip() + ]) + + self.assertListEqual(actual_nodes, expected_nodes) + + # Verify content of the summary file + with open(os.path.join(output_location, 'differ_summary.json'), + 'r') as f: + summary_content = json.load(f) + + with open( + os.path.join(module_dir, 'test', 'results', + 'differ_summary.json'), 'r') as f: + expected_summary_content = json.load(f) + + # The version paths might differ based on where the test is run, + # so we normalize or remove them for the comparison + summary_content.pop('current_version', None) + summary_content.pop('previous_version', None) + expected_summary_content.pop('current_version', None) + expected_summary_content.pop('previous_version', None) + + self.assertDictEqual(summary_content, expected_summary_content) + + if os.path.exists(output_location): + import shutil + shutil.rmtree(output_location) if __name__ == '__main__': diff --git a/tools/import_differ/test/current/schema.mcf b/tools/import_differ/test/current/schema.mcf index 33bd3747d4..e0a3e2ae31 100644 --- a/tools/import_differ/test/current/schema.mcf +++ b/tools/import_differ/test/current/schema.mcf @@ -1,4 +1,4 @@ -dcid: "InterestRate_TreasuryBill_6Month" +Node: dcid:InterestRate_TreasuryBill_6Month name: "InterestRate_TreasuryBill_6Month" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -6,7 +6,7 @@ populationType: dcs:TreasuryBill maturity: [6 Month] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryBill_1Year" +Node: dcid:InterestRate_TreasuryBill_1Year name: "InterestRate_TreasuryBill_1Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -14,7 +14,7 @@ populationType: dcs:TreasuryBill maturity: [1 Year] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryNote_2Year" +Node: dcid:InterestRate_TreasuryNote_2Year name: "InterestRate_TreasuryNote_2Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -22,7 +22,7 @@ populationType: dcs:TreasuryNote maturity: [2 Year] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryNote_3Year" +Node: dcid:InterestRate_TreasuryNote_3Year name: "InterestRate_TreasuryNote_3Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -30,7 +30,7 @@ populationType: dcs:TreasuryNote maturity: [3 Year] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryNote_5Year" +Node: dcid:InterestRate_TreasuryNote_5Year name: "InterestRate_TreasuryNote_5Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -38,7 +38,7 @@ populationType: dcs:TreasuryNote maturity: [5 Year] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryNote_7Year" +Node: dcid:InterestRate_TreasuryNote_7Year name: "InterestRate_TreasuryNote_7Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -46,7 +46,7 @@ populationType: dcs:TreasuryNote maturity: [7 Year] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryNote_10Year" +Node: dcid:InterestRate_TreasuryNote_10Year name: "InterestRate_TreasuryNote_10Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -54,7 +54,7 @@ populationType: dcs:TreasuryNote maturity: [10 Year] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryBond_20Year" +Node: dcid:InterestRate_TreasuryBond_20Year name: "InterestRate_TreasuryBond_20Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -62,11 +62,10 @@ populationType: dcs:TreasuryBond maturity: [20 Year] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryBond_30Year" +Node: dcid:InterestRate_TreasuryBond_30Year name: "InterestRate_TreasuryBond_30Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate populationType: dcs:TreasuryBond maturity: [30 Year] statType: dcs:measuredValue - diff --git a/tools/import_differ/test/previous/schema.mcf b/tools/import_differ/test/previous/schema.mcf index 4b30a5056f..39fd91f2a0 100644 --- a/tools/import_differ/test/previous/schema.mcf +++ b/tools/import_differ/test/previous/schema.mcf @@ -1,4 +1,4 @@ -dcid: "InterestRate_TreasuryBill_1Month" +Node: dcid:InterestRate_TreasuryBill_1Month name: "InterestRate_TreasuryBill_1Month" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -6,7 +6,7 @@ populationType: dcs:TreasuryBill maturity: [1 Month] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryBill_3Month" +Node: dcid:InterestRate_TreasuryBill_3Month name: "InterestRate_TreasuryBill_3Month" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -14,7 +14,7 @@ populationType: dcs:TreasuryBill maturity: [3 Month] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryBill_6Month" +Node: dcid:InterestRate_TreasuryBill_6Month name: "InterestRate_TreasuryBill_6Month" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -22,7 +22,7 @@ populationType: dcs:TreasuryBill maturity: [6 Month] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryBill_1Year" +Node: dcid:InterestRate_TreasuryBill_1Year name: "InterestRate_TreasuryBill_01Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -30,7 +30,7 @@ populationType: dcs:TreasuryBill maturity: [1 Year] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryNote_2Year" +Node: dcid:InterestRate_TreasuryNote_2Year name: "InterestRate_TreasuryNote_02Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -38,7 +38,7 @@ populationType: dcs:TreasuryNote maturity: [2 Year] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryNote_10Year" +Node: dcid:InterestRate_TreasuryNote_10Year name: "InterestRate_TreasuryNote_10Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -46,7 +46,7 @@ populationType: dcs:TreasuryNote maturity: [10 Year] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryBond_20Year" +Node: dcid:InterestRate_TreasuryBond_20Year name: "InterestRate_TreasuryBond_20Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate @@ -54,10 +54,10 @@ populationType: dcs:TreasuryBond maturity: [20 Year] statType: dcs:measuredValue -dcid: "InterestRate_TreasuryBond_30Year" +Node: dcid:InterestRate_TreasuryBond_30Year name: "InterestRate_TreasuryBond_30Year" typeOf: dcs:StatisticalVariable measuredProperty: dcs:interestRate populationType: dcs:TreasuryBond maturity: [30 Year] -statType: dcs:measuredValue \ No newline at end of file +statType: dcs:measuredValue diff --git a/tools/import_differ/test/results/differ_summary.json b/tools/import_differ/test/results/differ_summary.json new file mode 100644 index 0000000000..f95e048228 --- /dev/null +++ b/tools/import_differ/test/results/differ_summary.json @@ -0,0 +1,16 @@ +{ + "current_version": "tools/import_differ/test/current/*.mcf", + "previous_version": "tools/import_differ/test/previous/*.mcf", + "current_obs_count": 20, + "previous_obs_count": 21, + "current_schema_count": 9, + "previous_schema_count": 8, + "added_obs_count": 1, + "deleted_obs_count": 2, + "modified_obs_count": 1, + "added_schema_count": 3, + "deleted_schema_count": 2, + "modified_schema_count": 2, + "obs_diff_count": 4, + "schema_diff_count": 7 +} diff --git a/tools/import_differ/test/results/import_diff.mcf b/tools/import_differ/test/results/import_diff.mcf new file mode 100644 index 0000000000..ec6cf0470f --- /dev/null +++ b/tools/import_differ/test/results/import_diff.mcf @@ -0,0 +1,103 @@ +Node: treasury_constant_maturity_rates/E9/c0eb81eb-be8c-d11a-9279-a0391c1b01ef +variableMeasured: dcid:InterestRate_TreasuryNote_10Year +observationAbout: dcid:country/USA +observationDate: "2025-01-31" +measurementMethod: dcid:ConstantMaturityRate +unit: dcid:Percent +value: 4.58 +typeOf: StatVarObservation +diffType: ADDED + +Node: treasury_constant_maturity_rates/E2/115adaef-1547-4f5b-cabb-116d14c7a1ed +variableMeasured: dcid:InterestRate_TreasuryBill_3Month +observationAbout: dcid:country/USA +observationDate: "2025-01-30" +measurementMethod: dcid:ConstantMaturityRate +unit: dcid:Percent +value: 4.30 +typeOf: StatVarObservation +diffType: DELETED + +Node: treasury_constant_maturity_rates/E2/c0eb81eb-be8c-d11a-9279-a0391c1b01ef +variableMeasured: dcid:InterestRate_TreasuryBill_3Month +observationAbout: dcid:country/USA +observationDate: "2025-01-31" +measurementMethod: dcid:ConstantMaturityRate +unit: dcid:Percent +value: 4.31 +typeOf: StatVarObservation +diffType: DELETED + +Node: treasury_constant_maturity_rates/E10/115adaef-1547-4f5b-cabb-116d14c7a1ed +variableMeasured: dcid:InterestRate_TreasuryBond_20Year +observationAbout: dcid:country/USA +observationDate: "2025-01-30" +measurementMethod: dcid:ConstantMaturityRate +unit: dcid:Percent +value: 4.85 +typeOf: StatVarObservation +diffType: MODIFIED + +dcid: InterestRate_TreasuryNote_3Year +maturity: [3 Year] +measuredProperty: dcs:interestRate +name: "InterestRate_TreasuryNote_3Year" +populationType: dcs:TreasuryNote +statType: dcs:measuredValue +typeOf: dcs:StatisticalVariable +diffType: ADDED + +dcid: InterestRate_TreasuryNote_5Year +maturity: [5 Year] +measuredProperty: dcs:interestRate +name: "InterestRate_TreasuryNote_5Year" +populationType: dcs:TreasuryNote +statType: dcs:measuredValue +typeOf: dcs:StatisticalVariable +diffType: ADDED + +dcid: InterestRate_TreasuryNote_7Year +maturity: [7 Year] +measuredProperty: dcs:interestRate +name: "InterestRate_TreasuryNote_7Year" +populationType: dcs:TreasuryNote +statType: dcs:measuredValue +typeOf: dcs:StatisticalVariable +diffType: ADDED + +dcid: InterestRate_TreasuryBill_1Month +maturity: [1 Month] +measuredProperty: dcs:interestRate +name: "InterestRate_TreasuryBill_1Month" +populationType: dcs:TreasuryBill +statType: dcs:measuredValue +typeOf: dcs:StatisticalVariable +diffType: DELETED + +dcid: InterestRate_TreasuryBill_3Month +maturity: [3 Month] +measuredProperty: dcs:interestRate +name: "InterestRate_TreasuryBill_3Month" +populationType: dcs:TreasuryBill +statType: dcs:measuredValue +typeOf: dcs:StatisticalVariable +diffType: DELETED + +dcid: InterestRate_TreasuryBill_1Year +maturity: [1 Year] +measuredProperty: dcs:interestRate +name: "InterestRate_TreasuryBill_1Year" +populationType: dcs:TreasuryBill +statType: dcs:measuredValue +typeOf: dcs:StatisticalVariable +diffType: MODIFIED + +dcid: InterestRate_TreasuryNote_2Year +maturity: [2 Year] +measuredProperty: dcs:interestRate +name: "InterestRate_TreasuryNote_2Year" +populationType: dcs:TreasuryNote +statType: dcs:measuredValue +typeOf: dcs:StatisticalVariable +diffType: MODIFIED + diff --git a/tools/import_differ/test/results/obs_diff_log.csv b/tools/import_differ/test/results/obs_diff_log.csv deleted file mode 100644 index 8117e19a2c..0000000000 --- a/tools/import_differ/test/results/obs_diff_log.csv +++ /dev/null @@ -1,5 +0,0 @@ -key_combined,value_combined_x,value_combined_y,diff_type -"dcid:InterestRate_TreasuryBill_3Month;dcid:country/USA;""2025-01-30"";;dcid:ConstantMaturityRate;dcid:Percent;",4.30,,DELETED -"dcid:InterestRate_TreasuryBill_3Month;dcid:country/USA;""2025-01-31"";;dcid:ConstantMaturityRate;dcid:Percent;",4.31,,DELETED -"dcid:InterestRate_TreasuryBond_20Year;dcid:country/USA;""2025-01-30"";;dcid:ConstantMaturityRate;dcid:Percent;",4.81,4.85,MODIFIED -"dcid:InterestRate_TreasuryNote_10Year;dcid:country/USA;""2025-01-31"";;dcid:ConstantMaturityRate;dcid:Percent;",,4.58,ADDED diff --git a/tools/import_differ/test/results/obs_diff_samples.csv b/tools/import_differ/test/results/obs_diff_samples.csv deleted file mode 100644 index e1b6a38723..0000000000 --- a/tools/import_differ/test/results/obs_diff_samples.csv +++ /dev/null @@ -1,4 +0,0 @@ -variableMeasured,diff_type,observationAbout,observationDate,diff_size -dcid:InterestRate_TreasuryNote_10Year,ADDED,['dcid:country/USA'],"['""2025-01-31""']",1 -dcid:InterestRate_TreasuryBill_3Month,DELETED,"['dcid:country/USA', 'dcid:country/USA']","['""2025-01-30""', '""2025-01-31""']",2 -dcid:InterestRate_TreasuryBond_20Year,MODIFIED,['dcid:country/USA'],"['""2025-01-30""']",1 diff --git a/tools/import_differ/test/results/obs_diff_summary.csv b/tools/import_differ/test/results/obs_diff_summary.csv deleted file mode 100644 index c51a0b0f19..0000000000 --- a/tools/import_differ/test/results/obs_diff_summary.csv +++ /dev/null @@ -1,4 +0,0 @@ -variableMeasured,ADDED,DELETED,MODIFIED -dcid:InterestRate_TreasuryBill_3Month,0,2,0 -dcid:InterestRate_TreasuryBond_20Year,0,0,1 -dcid:InterestRate_TreasuryNote_10Year,1,0,0 diff --git a/tools/import_differ/test/results/point_analysis_summary.csv b/tools/import_differ/test/results/point_analysis_summary.csv deleted file mode 100644 index c51a0b0f19..0000000000 --- a/tools/import_differ/test/results/point_analysis_summary.csv +++ /dev/null @@ -1,4 +0,0 @@ -variableMeasured,ADDED,DELETED,MODIFIED -dcid:InterestRate_TreasuryBill_3Month,0,2,0 -dcid:InterestRate_TreasuryBond_20Year,0,0,1 -dcid:InterestRate_TreasuryNote_10Year,1,0,0 diff --git a/tools/import_differ/test/results/schema_diff_log.csv b/tools/import_differ/test/results/schema_diff_log.csv deleted file mode 100644 index 9e6fdf9959..0000000000 --- a/tools/import_differ/test/results/schema_diff_log.csv +++ /dev/null @@ -1,8 +0,0 @@ -key_combined,value_combined_x,value_combined_y,diff_type -"""InterestRate_TreasuryBill_1Month""","maturity:[1 Month];measuredProperty:dcs:interestRate;name:""InterestRate_TreasuryBill_1Month"";populationType:dcs:TreasuryBill;statType:dcs:measuredValue;typeOf:dcs:StatisticalVariable",,DELETED -"""InterestRate_TreasuryBill_1Year""","maturity:[1 Year];measuredProperty:dcs:interestRate;name:""InterestRate_TreasuryBill_01Year"";populationType:dcs:TreasuryBill;statType:dcs:measuredValue;typeOf:dcs:StatisticalVariable","maturity:[1 Year];measuredProperty:dcs:interestRate;name:""InterestRate_TreasuryBill_1Year"";populationType:dcs:TreasuryBill;statType:dcs:measuredValue;typeOf:dcs:StatisticalVariable",MODIFIED -"""InterestRate_TreasuryBill_3Month""","maturity:[3 Month];measuredProperty:dcs:interestRate;name:""InterestRate_TreasuryBill_3Month"";populationType:dcs:TreasuryBill;statType:dcs:measuredValue;typeOf:dcs:StatisticalVariable",,DELETED -"""InterestRate_TreasuryNote_2Year""","maturity:[2 Year];measuredProperty:dcs:interestRate;name:""InterestRate_TreasuryNote_02Year"";populationType:dcs:TreasuryNote;statType:dcs:measuredValue;typeOf:dcs:StatisticalVariable","maturity:[2 Year];measuredProperty:dcs:interestRate;name:""InterestRate_TreasuryNote_2Year"";populationType:dcs:TreasuryNote;statType:dcs:measuredValue;typeOf:dcs:StatisticalVariable",MODIFIED -"""InterestRate_TreasuryNote_3Year""",,"maturity:[3 Year];measuredProperty:dcs:interestRate;name:""InterestRate_TreasuryNote_3Year"";populationType:dcs:TreasuryNote;statType:dcs:measuredValue;typeOf:dcs:StatisticalVariable",ADDED -"""InterestRate_TreasuryNote_5Year""",,"maturity:[5 Year];measuredProperty:dcs:interestRate;name:""InterestRate_TreasuryNote_5Year"";populationType:dcs:TreasuryNote;statType:dcs:measuredValue;typeOf:dcs:StatisticalVariable",ADDED -"""InterestRate_TreasuryNote_7Year""",,"maturity:[7 Year];measuredProperty:dcs:interestRate;name:""InterestRate_TreasuryNote_7Year"";populationType:dcs:TreasuryNote;statType:dcs:measuredValue;typeOf:dcs:StatisticalVariable",ADDED diff --git a/tools/import_differ/test/results/schema_diff_summary.csv b/tools/import_differ/test/results/schema_diff_summary.csv deleted file mode 100644 index 579d63108a..0000000000 --- a/tools/import_differ/test/results/schema_diff_summary.csv +++ /dev/null @@ -1,2 +0,0 @@ -ADDED,DELETED,MODIFIED -3,2,2 diff --git a/tools/import_validation/import_validation_test.py b/tools/import_validation/import_validation_test.py index 7d45827648..33538f3ec7 100644 --- a/tools/import_validation/import_validation_test.py +++ b/tools/import_validation/import_validation_test.py @@ -28,9 +28,11 @@ def setUp(self): self.test_dir = tempfile.TemporaryDirectory() self.config_path = os.path.join(self.test_dir.name, 'config.json') self.stats_path = os.path.join(self.test_dir.name, 'stats.csv') - self.differ_path = os.path.join(self.test_dir.name, 'differ.csv') + self.differ_path = os.path.join(self.test_dir.name, 'differ_output') self.output_path = os.path.join(self.test_dir.name, 'output.csv') + os.makedirs(self.differ_path, exist_ok=True) + # Find the project root using the canonical git command result = subprocess.run(['git', 'rev-parse', '--show-toplevel'], capture_output=True, @@ -38,8 +40,13 @@ def setUp(self): check=True) self.project_root = result.stdout.strip() - # Create an empty differ output file, as it is required - pd.DataFrame({'DELETED': []}).to_csv(self.differ_path, index=False) + # Create an empty differ summary, as it is required + with open(os.path.join(self.differ_path, 'differ_summary.json'), + 'w') as f: + json.dump({}, f) + # Create an empty mcf file + with open(os.path.join(self.differ_path, 'import_diff.mcf'), 'w') as f: + f.write("") # Create a dummy stats file, as it is often required pd.DataFrame({ 'StatVar': ['sv1'], @@ -217,6 +224,12 @@ def test_sql_validator_fails(self): 'MaxValue': [99, 101] }).to_csv(self.stats_path, index=False) + # Write to the differ_output directory to ensure differ_df is populated + with open(os.path.join(self.differ_path, 'import_diff_test.mcf'), + 'w') as f: + f.write( + "Node: dcid:sv1\nvariableMeasured: sv1\ndiffType: MODIFIED\n\n") + # 3. Run the script result = subprocess.run([ 'python3', '-m', 'tools.import_validation.runner', @@ -256,8 +269,10 @@ def test_empty_differ_file_runs_validation(self): }] }, f) - # 2. Create a differ file with only headers - pd.DataFrame({'DELETED': []}).to_csv(self.differ_path, index=False) + # 2. Re-create an empty differ summary in the directory + with open(os.path.join(self.differ_path, 'differ_summary.json'), + 'w') as f: + json.dump({}, f) # 3. Run the script result = subprocess.run([ diff --git a/tools/import_validation/runner.py b/tools/import_validation/runner.py index f1364518e6..a666c3c1a7 100644 --- a/tools/import_validation/runner.py +++ b/tools/import_validation/runner.py @@ -59,9 +59,11 @@ def __init__(self, validation_config_path: str, differ_output: str, 'MAX_DATE_CONSISTENT': (self.validator.validate_max_date_consistent, 'stats'), 'DELETED_RECORDS_COUNT': - (self.validator.validate_deleted_records_count, 'differ'), + (self.validator.validate_deleted_records_count, 'differ_summary' + ), 'DELETED_RECORDS_PERCENT': - (self.validator.validate_deleted_records_percent, 'differ'), + (self.validator.validate_deleted_records_percent, + 'differ_summary'), 'EMPTY_IMPORT_CHECK': (self.validator.validate_empty_import, 'lint'), 'MISSING_REFS_COUNT': @@ -121,38 +123,21 @@ def _initialize_data_sources(self, stats_summary: str, lint_report: str, logging.warning("stats_summary file exists but is empty: %s", stats_summary) - # Handle differ output (folder or file) - differ_csv_path = None - differ_json_path = None - if differ_output and os.path.exists(differ_output): - if os.path.isdir(differ_output): - differ_csv_path = os.path.join(differ_output, - 'obs_diff_summary.csv') - differ_json_path = os.path.join(differ_output, - 'differ_summary.json') - else: - differ_csv_path = differ_output - - if differ_csv_path and os.path.exists( - differ_csv_path) and os.path.getsize(differ_csv_path) > 0: - self.data_sources['differ'] = pd.read_csv(differ_csv_path) - elif differ_csv_path and os.path.exists(differ_csv_path): - logging.warning("differ csv file exists but is empty: %s", - differ_csv_path) - - if differ_json_path and os.path.exists( - differ_json_path) and os.path.getsize(differ_json_path) > 0: - try: - with open(differ_json_path, 'r') as f: - self.data_sources['differ_summary'] = json.load(f) - except Exception as e: - logging.error( - f"JSON parse error while reading differ summary at {differ_json_path}: {e}" - ) - elif differ_json_path and os.path.exists(differ_json_path): - logging.warning("differ summary file exists but is empty: %s", - differ_json_path) + differ_json_path = os.path.join(differ_output, + 'differ_summary.json') + if os.path.exists(differ_json_path) and os.path.getsize( + differ_json_path) > 0: + try: + with open(differ_json_path, 'r') as f: + self.data_sources['differ_summary'] = json.load(f) + except Exception as e: + logging.error( + f'JSON parse error while reading differ summary at {differ_json_path}: {e}' + ) + + self.data_sources['differ'] = self._load_differ_df_from_mcf( + differ_output) if lint_report and os.path.exists(lint_report) and os.path.getsize( lint_report) > 0: @@ -167,6 +152,51 @@ def _initialize_data_sources(self, stats_summary: str, lint_report: str, logging.warning("lint_report file exists but is empty: %s", lint_report) + def _load_differ_df_from_mcf(self, input_dir: str) -> pd.DataFrame: + """Parses MCF diff files and returns a summary DataFrame.""" + import glob + from collections import defaultdict + stats = defaultdict(lambda: {'ADDED': 0, 'DELETED': 0, 'MODIFIED': 0}) + + combined_pattern = os.path.join(input_dir, 'import_diff*.mcf') + combined_files = glob.glob(combined_pattern) + for filepath in combined_files: + with open(filepath, 'r', encoding='utf-8') as f: + current_var = None + current_diff_type = None + for line in f: + line = line.strip() + if not line: # End of node + if current_var and current_diff_type: + stats[current_var][current_diff_type] += 1 + current_var = None + current_diff_type = None + continue + if line.startswith('variableMeasured:'): + parts = line.split(':', 1) + if len(parts) > 1: + current_var = parts[1].strip().strip('"\'') + elif line.startswith('diffType:'): + parts = line.split(':', 1) + if len(parts) > 1: + current_diff_type = parts[1].strip() + # Catch last node if file doesn't end with newline + if current_var and current_diff_type: + stats[current_var][current_diff_type] += 1 + + if not stats: + return pd.DataFrame() + + rows = [] + for var, counts in stats.items(): + rows.append({ + 'StatVar': var, + 'ADDED': counts['ADDED'], + 'DELETED': counts['DELETED'], + 'MODIFIED': counts['MODIFIED'] + }) + return pd.DataFrame(rows) + def _determine_required_sources(self) -> set[str]: """ Parses the validation config to determine which data sources are required. @@ -216,9 +246,10 @@ def run_validations(self) -> tuple[bool, list[ValidationResult]]: result = validation_func(self.data_sources['stats'], self.data_sources['differ'], rule_params) - elif validator_name in ['DELETED_RECORDS_PERCENT']: + elif validator_name in [ + 'DELETED_RECORDS_PERCENT', 'DELETED_RECORDS_COUNT' + ]: result = validation_func( - self.data_sources['differ'], self.data_sources.get('differ_summary'), rule_params) else: scope = rule.get('scope', {}) diff --git a/tools/import_validation/runner_test.py b/tools/import_validation/runner_test.py index 76c5bc1d0b..e9cb4d8af9 100644 --- a/tools/import_validation/runner_test.py +++ b/tools/import_validation/runner_test.py @@ -167,15 +167,25 @@ def test_runner_writes_correct_output(self, MockValidator): ValidationStatus.FAILED, 'DELETED_RECORDS_COUNT', message='Too many deletions, found 100', - details={ - 'deleted_records_count': 100, - 'rows_processed': 1, - 'rows_succeeded': 0, - 'rows_failed': 1 - }) + details={'deleted_records_count': 100}) mock_validator_instance.validate_deleted_records_count.return_value = expected_result - # 2. Create test files + # 2. Setup files in a directory for differ output + differ_dir = os.path.join(self.test_dir.name, 'differ_out2') + os.makedirs(differ_dir, exist_ok=True) + + # Create a mock MCF file to generate the 'differ' DataFrame + mcf_path = os.path.join(differ_dir, 'import_diff.mcf') + with open(mcf_path, 'w') as f: + for _ in range(100): + f.write( + 'Node: dcid:sv_test\nvariableMeasured: dcid:sv_test\ndiffType: DELETED\n\n' + ) + + differ_json = os.path.join(differ_dir, 'differ_summary.json') + with open(differ_json, 'w') as f: + json.dump({'deleted_obs_count': 100}, f) + with open(self.config_path, 'w') as f: json.dump( { @@ -188,12 +198,11 @@ def test_runner_writes_correct_output(self, MockValidator): } }] }, f) - pd.DataFrame({'DELETED': [100]}).to_csv(self.differ_path, index=False) # 3. Run the runner runner = ValidationRunner(validation_config_path=self.config_path, stats_summary=self.stats_path, - differ_output=self.differ_path, + differ_output=differ_dir, lint_report=self.report_path, validation_output=self.output_path) runner.run_validations() @@ -208,9 +217,6 @@ def test_runner_writes_correct_output(self, MockValidator): 'Too many deletions, found 100') details = json.loads(output_df.iloc[0]['Details']) self.assertEqual(details['deleted_records_count'], 100) - self.assertEqual(details['rows_processed'], 1) - self.assertEqual(details['rows_succeeded'], 0) - self.assertEqual(details['rows_failed'], 1) @patch('tools.import_validation.runner.Validator') def test_runner_uses_custom_name(self, MockValidator): @@ -260,12 +266,17 @@ def test_runner_deleted_records_percent(self, MockValidator): differ_dir = os.path.join(self.test_dir.name, 'differ_out') os.makedirs(differ_dir, exist_ok=True) - differ_csv = os.path.join(differ_dir, 'obs_diff_summary.csv') - pd.DataFrame({'DELETED': [5]}).to_csv(differ_csv, index=False) + # Create a mock MCF file + mcf_path = os.path.join(differ_dir, 'import_diff.mcf') + with open(mcf_path, 'w') as f: + for _ in range(5): + f.write( + 'Node: dcid:sv_test\nvariableMeasured: dcid:sv_test\ndiffType: DELETED\n\n' + ) differ_json = os.path.join(differ_dir, 'differ_summary.json') with open(differ_json, 'w') as f: - json.dump({'previous_data_size': 100}, f) + json.dump({'previous_obs_count': 100, 'deleted_obs_count': 5}, f) with open(self.config_path, 'w') as f: json.dump( @@ -291,10 +302,10 @@ def test_runner_deleted_records_percent(self, MockValidator): # 4. Verify # Check if validator was called with correct arguments call_args, _ = mock_validator_instance.validate_deleted_records_percent.call_args - # call_args is (df, summary, params) - self.assertIsInstance(call_args[0], pd.DataFrame) - self.assertEqual(call_args[1]['previous_data_size'], 100) - self.assertEqual(call_args[2]['threshold'], 10) + # call_args is (summary, params) + self.assertEqual(call_args[0]['previous_obs_count'], 100) + self.assertEqual(call_args[0]['deleted_obs_count'], 5) + self.assertEqual(call_args[1]['threshold'], 10) # Check output output_df = pd.read_csv(self.output_path) diff --git a/tools/import_validation/validator.py b/tools/import_validation/validator.py index 712a14787e..4e28836838 100644 --- a/tools/import_validation/validator.py +++ b/tools/import_validation/validator.py @@ -138,52 +138,25 @@ def validate_max_date_latest(self, stats_df: pd.DataFrame, 'rows_failed': 0 }) - def validate_deleted_records_count(self, differ_df: pd.DataFrame, + def validate_deleted_records_count(self, summary: dict, params: dict) -> ValidationResult: """Checks if the total number of deleted points is within a threshold. Args: - differ_df: A DataFrame containing the differ output, expected to have a - 'DELETED' column. + summary: A dictionary containing the differ summary. params: A dictionary containing the validation parameters, which may have a 'threshold' key. Returns: A ValidationResult object. """ - if differ_df.empty: - deleted_records_count = 0 - threshold = params.get('threshold', 0) - if deleted_records_count > threshold: - return ValidationResult( - ValidationStatus.FAILED, - 'DELETED_RECORDS_COUNT', - message= - f"Found {deleted_records_count} deleted points, which is over the threshold of {threshold}.", - details={ - 'deleted_records_count': int(deleted_records_count), - 'threshold': threshold, - 'rows_processed': 0, - 'rows_succeeded': 0, - 'rows_failed': 0 - }) - return ValidationResult(ValidationStatus.PASSED, - 'DELETED_RECORDS_COUNT', - details={ - 'rows_processed': 0, - 'rows_succeeded': 0, - 'rows_failed': 0 - }) - - if 'DELETED' not in differ_df.columns: - return ValidationResult( - ValidationStatus.DATA_ERROR, - 'DELETED_RECORDS_COUNT', - message="Input data is missing required column: 'DELETED'.") - - rows_processed = len(differ_df) threshold = params.get('threshold', 0) - deleted_records_count = differ_df['DELETED'].sum() + + if not summary or 'deleted_obs_count' not in summary: + # If summary is missing or empty, assume 0 deleted records. + deleted_records_count = 0 + else: + deleted_records_count = summary['deleted_obs_count'] if deleted_records_count > threshold: return ValidationResult( @@ -193,26 +166,22 @@ def validate_deleted_records_count(self, differ_df: pd.DataFrame, f"Found {deleted_records_count} deleted points, which is over the threshold of {threshold}.", details={ 'deleted_records_count': int(deleted_records_count), - 'threshold': threshold, - 'rows_processed': rows_processed, - 'rows_succeeded': 0, - 'rows_failed': rows_processed + 'threshold': threshold }) - return ValidationResult(ValidationStatus.PASSED, - 'DELETED_RECORDS_COUNT', - details={ - 'rows_processed': rows_processed, - 'rows_succeeded': rows_processed, - 'rows_failed': 0 - }) - def validate_deleted_records_percent(self, differ_df: pd.DataFrame, - summary: dict, + return ValidationResult( + ValidationStatus.PASSED, + 'DELETED_RECORDS_COUNT', + details={ + 'deleted_records_count': int(deleted_records_count), + 'threshold': threshold + }) + + def validate_deleted_records_percent(self, summary: dict, params: dict) -> ValidationResult: """Checks if the percentage of deleted records is within a threshold. Args: - differ_df: A DataFrame containing the differ output. summary: A dictionary containing the differ summary. params: A dictionary containing the validation parameters, which may have a 'threshold' key. @@ -220,43 +189,37 @@ def validate_deleted_records_percent(self, differ_df: pd.DataFrame, Returns: A ValidationResult object. """ - if differ_df is None: - return ValidationResult(ValidationStatus.DATA_ERROR, - 'DELETED_RECORDS_PERCENT', - message="Differ DataFrame is missing.") - if summary is None: return ValidationResult(ValidationStatus.DATA_ERROR, 'DELETED_RECORDS_PERCENT', message="Differ summary is missing.") - if 'previous_obs_size' not in summary: + if 'previous_obs_count' not in summary: return ValidationResult( ValidationStatus.DATA_ERROR, 'DELETED_RECORDS_PERCENT', message= - "Differ summary is missing required field: 'previous_obs_size'." + "Differ summary is missing required field: 'previous_obs_count'." ) - previous_obs_size = summary['previous_obs_size'] - - if differ_df.empty: - deleted_records_count = 0 - elif 'DELETED' not in differ_df.columns: + if 'deleted_obs_count' not in summary: return ValidationResult( ValidationStatus.DATA_ERROR, 'DELETED_RECORDS_PERCENT', - message="Input data is missing required column: 'DELETED'.") - else: - deleted_records_count = differ_df['DELETED'].sum() + message= + "Differ summary is missing required field: 'deleted_obs_count'." + ) + + previous_obs_count = summary['previous_obs_count'] + deleted_records_count = summary['deleted_obs_count'] - if previous_obs_size == 0: + if previous_obs_count == 0: if deleted_records_count > 0: percent = 100.0 else: percent = 0.0 else: - percent = (deleted_records_count / previous_obs_size) * 100 + percent = (deleted_records_count / previous_obs_count) * 100 threshold = params.get('threshold', 0) @@ -268,7 +231,7 @@ def validate_deleted_records_percent(self, differ_df: pd.DataFrame, f"Found {percent:.2f}% deleted records, which is over the threshold of {threshold}%.", details={ 'deleted_records_count': int(deleted_records_count), - 'previous_obs_size': int(previous_obs_size), + 'previous_obs_count': int(previous_obs_count), 'percent': percent, 'threshold': threshold }) @@ -278,7 +241,7 @@ def validate_deleted_records_percent(self, differ_df: pd.DataFrame, 'DELETED_RECORDS_PERCENT', details={ 'deleted_records_count': int(deleted_records_count), - 'previous_obs_size': int(previous_obs_size), + 'previous_obs_count': int(previous_obs_count), 'percent': percent, 'threshold': threshold }) diff --git a/tools/import_validation/validator_test.py b/tools/import_validation/validator_test.py index eadaff88fc..9fc0a235f7 100644 --- a/tools/import_validation/validator_test.py +++ b/tools/import_validation/validator_test.py @@ -76,40 +76,41 @@ def setUp(self): self.validator = Validator() def test_deleted_records_count_fails_when_over_threshold(self): - test_df = pd.DataFrame({'DELETED': [1, 1]}) # Total deleted = 2 + summary = {'deleted_obs_count': 2} params = {'threshold': 1} - result = self.validator.validate_deleted_records_count(test_df, params) + result = self.validator.validate_deleted_records_count(summary, params) self.assertEqual(result.status, ValidationStatus.FAILED) self.assertEqual(result.details['deleted_records_count'], 2) self.assertEqual(result.details['threshold'], 1) - self.assertEqual(result.details['rows_processed'], 2) - self.assertEqual(result.details['rows_succeeded'], 0) - self.assertEqual(result.details['rows_failed'], 2) def test_deleted_records_count_passes_when_at_threshold(self): - test_df = pd.DataFrame({'DELETED': [1, 1]}) # Total deleted = 2 + summary = {'deleted_obs_count': 2} params = {'threshold': 2} - result = self.validator.validate_deleted_records_count(test_df, params) + result = self.validator.validate_deleted_records_count(summary, params) self.assertEqual(result.status, ValidationStatus.PASSED) - self.assertEqual(result.details['rows_processed'], 2) - self.assertEqual(result.details['rows_succeeded'], 2) - self.assertEqual(result.details['rows_failed'], 0) + self.assertEqual(result.details['deleted_records_count'], 2) + self.assertEqual(result.details['threshold'], 2) - def test_deleted_records_count_passes_on_empty_dataframe(self): - test_df = pd.DataFrame({'DELETED': []}) + def test_deleted_records_count_passes_on_empty_deleted_count(self): + summary = {'deleted_obs_count': 0} params = {'threshold': 0} - result = self.validator.validate_deleted_records_count(test_df, params) + result = self.validator.validate_deleted_records_count(summary, params) self.assertEqual(result.status, ValidationStatus.PASSED) - self.assertEqual(result.details['rows_processed'], 0) - self.assertEqual(result.details['rows_succeeded'], 0) - self.assertEqual(result.details['rows_failed'], 0) + self.assertEqual(result.details['deleted_records_count'], 0) + self.assertEqual(result.details['threshold'], 0) - def test_deleted_records_count_fails_on_missing_column(self): - test_df = pd.DataFrame({'StatVar': ['sv1']}) # Missing 'DELETED' + def test_deleted_records_count_passes_on_missing_column(self): + summary = {'previous_obs_count': 100} # Missing 'deleted_obs_count' params = {'threshold': 1} - result = self.validator.validate_deleted_records_count(test_df, params) - self.assertEqual(result.status, ValidationStatus.DATA_ERROR) - self.assertIn('missing required column', result.message) + result = self.validator.validate_deleted_records_count(summary, params) + self.assertEqual(result.status, ValidationStatus.PASSED) + self.assertEqual(result.details['deleted_records_count'], 0) + + def test_deleted_records_count_passes_on_missing_summary(self): + params = {'threshold': 1} + result = self.validator.validate_deleted_records_count(None, params) + self.assertEqual(result.status, ValidationStatus.PASSED) + self.assertEqual(result.details['deleted_records_count'], 0) class TestDeletedRecordsPercentValidation(unittest.TestCase): @@ -120,63 +121,56 @@ def setUp(self): def test_deleted_records_percent_fails_when_over_threshold(self): # 10 records, 2 deleted => 20% - test_df = pd.DataFrame({'DELETED': [1, 1]}) - summary = {'previous_obs_size': 10} + summary = {'previous_obs_count': 10, 'deleted_obs_count': 2} params = {'threshold': 10} # Threshold 10% result = self.validator.validate_deleted_records_percent( - test_df, summary, params) + summary, params) self.assertEqual(result.status, ValidationStatus.FAILED) self.assertEqual(result.details['percent'], 20.0) self.assertEqual(result.details['threshold'], 10) def test_deleted_records_percent_passes_when_below_threshold(self): # 100 records, 1 deleted => 1% - test_df = pd.DataFrame({'DELETED': [1]}) - summary = {'previous_obs_size': 100} + summary = {'previous_obs_count': 100, 'deleted_obs_count': 1} params = {'threshold': 10} result = self.validator.validate_deleted_records_percent( - test_df, summary, params) + summary, params) self.assertEqual(result.status, ValidationStatus.PASSED) self.assertEqual(result.details['percent'], 1.0) def test_deleted_records_percent_passes_when_no_deleted(self): - test_df = pd.DataFrame({'DELETED': []}) - summary = {'previous_obs_size': 100} + summary = {'previous_obs_count': 100, 'deleted_obs_count': 0} params = {'threshold': 10} result = self.validator.validate_deleted_records_percent( - test_df, summary, params) + summary, params) self.assertEqual(result.status, ValidationStatus.PASSED) self.assertEqual(result.details['percent'], 0.0) def test_deleted_records_percent_handles_zero_data_size(self): # 0 records, 0 deleted => 0% - test_df = pd.DataFrame({'DELETED': []}) - summary = {'previous_obs_size': 0} + summary = {'previous_obs_count': 0, 'deleted_obs_count': 0} params = {'threshold': 10} result = self.validator.validate_deleted_records_percent( - test_df, summary, params) + summary, params) self.assertEqual(result.status, ValidationStatus.PASSED) self.assertEqual(result.details['percent'], 0.0) def test_deleted_records_percent_fails_on_missing_summary(self): - test_df = pd.DataFrame({'DELETED': [1]}) params = {'threshold': 10} - result = self.validator.validate_deleted_records_percent( - test_df, None, params) + result = self.validator.validate_deleted_records_percent(None, params) self.assertEqual(result.status, ValidationStatus.DATA_ERROR) def test_deleted_records_percent_fails_on_missing_size_in_summary(self): - test_df = pd.DataFrame({'DELETED': [1]}) - summary = {} # Missing previous_obs_size + summary = {} # Missing previous_obs_count params = {'threshold': 10} result = self.validator.validate_deleted_records_percent( - test_df, summary, params) + summary, params) self.assertEqual(result.status, ValidationStatus.DATA_ERROR)