Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions tools/import_validation/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class ValidationRunner:
"""

def __init__(self, validation_config_path: str, differ_output: str,
stats_summary: str, lint_report: str, validation_output: str):
stats_summary: str, lint_report: str, validation_output: str,
counters_report: str = None):
self.config = ValidationConfig(validation_config_path)
self.validation_output = validation_output
self.validator = Validator()
Expand All @@ -49,7 +50,8 @@ def __init__(self, validation_config_path: str, differ_output: str,
'stats': pd.DataFrame(),
'differ': pd.DataFrame(),
'differ_summary': {},
'lint': {}
'lint': {},
'counters': {}
}

self.validation_dispatch = {
Expand Down Expand Up @@ -85,12 +87,22 @@ def __init__(self, validation_config_path: str, differ_output: str,
'MAX_VALUE_CHECK':
(self.validator.validate_max_value_check, 'stats'),
'GOLDENS_CHECK': (self.validator.validate_goldens, 'stats'),
'COUNTER_ZERO_CHECK':
(self.validator.validate_counter_zero, 'counters'),
'COUNTER_MAX_THRESHOLD':
(self.validator.validate_counter_max_threshold, 'counters'),
'COUNTER_RATIO_THRESHOLD':
(self.validator.validate_counter_ratio_threshold, 'counters'),
'COUNTER_SUM_INTEGRITY':
(self.validator.validate_counter_sum_integrity, 'counters'),
'COUNTER_MIN_YIELD':
(self.validator.validate_counter_min_yield, 'counters'),
}

self._initialize_data_sources(stats_summary, lint_report, differ_output)
self._initialize_data_sources(stats_summary, lint_report, differ_output, counters_report)

def _initialize_data_sources(self, stats_summary: str, lint_report: str,
differ_output: str):
differ_output: str, counters_report: str = None):
"""
Checks for and loads the required data sources based on the config.
"""
Expand Down Expand Up @@ -167,6 +179,37 @@ def _initialize_data_sources(self, stats_summary: str, lint_report: str,
logging.warning("lint_report file exists but is empty: %s",
lint_report)

if counters_report:
self._load_counters(counters_report)

def _load_counters(self, counters_report: str):
"""Loads counters from a CSV file and stores them in data_sources."""
if os.path.exists(counters_report) and os.path.getsize(
counters_report) > 0:
try:
df = pd.read_csv(counters_report)
def clean_key(x):
if not isinstance(x, str):
return x
if ':' in x:
x = x.split(':', 1)[1]
if '_' in x:
x = x.rsplit('_', 1)[-1]
return x
Comment on lines +191 to +198
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The clean_key logic is too aggressive and will break counter names that contain underscores. For example, a counter named records_count (following the recommended naming convention) would be incorrectly truncated to count. Additionally, rsplit('_', 1) only works if the counter name itself has no underscores. A more robust approach would be to use a regular expression to strip the specific stage prefix pattern (e.g., digit:string_).

                def clean_key(x):
                    if not isinstance(x, str):
                        return x
                    import re
                    # Strip stage prefix like "2:prepare_output_"
                    return re.sub(r'^\d+:[^:]+_', '', x)
References
  1. When naming a variable for a count of items, prefer the pattern plural_noun_count (e.g., records_count) over singular_noun_counts (e.g., record_counts).


df['key'] = df['key'].apply(clean_key)
# Aggregate by summing if there are duplicates
df = df.groupby('key')['value'].sum().reset_index()
self.data_sources['counters'] = dict(
zip(df['key'], df['value']))
except Exception as e:
logging.error(
f"CSV parse error while reading counters report at {counters_report}: {e}"
)
Comment on lines +189 to +208
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

It is recommended to raise an exception if the counters report fails to parse, rather than just logging an error. Continuing with an empty counters dictionary could lead to incorrect validation results (e.g., COUNTER_ZERO_CHECK passing when it should have failed because the data was missing). Raising a ValueError ensures the user is aware that the validation process is incomplete due to corrupted input data, enforcing the correctness of the validation process.

Suggested change
try:
with open(counters_report, 'r') as f:
self.data_sources['counters'] = json.load(f)
except Exception as e:
logging.error(
f"JSON parse error while reading counters report at {counters_report}: {e}"
)
try:
with open(counters_report, 'r') as f:
self.data_sources['counters'] = json.load(f)
except (json.JSONDecodeError, OSError) as e:
raise ValueError(
f"Failed to parse counters report at {counters_report}: {e}"
) from e
References
  1. When a configuration dictionary like resource_limits is provided, it is acceptable to assume it contains all required keys and allow it to fail on a KeyError if a key is missing, rather than defensively using default values. This enforces configuration correctness.

Comment on lines +205 to +208
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Avoid catching broad exceptions. It is better to catch specific errors that pd.read_csv might raise, such as pd.errors.ParserError or pd.errors.EmptyDataError.

Suggested change
except Exception as e:
logging.error(
f"CSV parse error while reading counters report at {counters_report}: {e}"
)
except pd.errors.ParserError as e:
logging.error(
f"CSV parse error while reading counters report at {counters_report}: {e}"
)

elif os.path.exists(counters_report):
logging.warning("counters_report file exists but is empty: %s",
counters_report)

def _determine_required_sources(self) -> set[str]:
"""
Parses the validation config to determine which data sources are required.
Expand Down Expand Up @@ -265,7 +308,8 @@ def main(_):
differ_output=_FLAGS.differ_output,
stats_summary=_FLAGS.stats_summary,
lint_report=_FLAGS.lint_report,
validation_output=_FLAGS.validation_output)
validation_output=_FLAGS.validation_output,
counters_report=_FLAGS.counters_report)
overall_status, _ = runner.run_validations()
if not overall_status:
sys.exit(1)
Expand All @@ -283,6 +327,8 @@ def main(_):
'Path to the stats summary report file.')
flags.DEFINE_string('lint_report', None,
'Path to the mcf lint report file.')
flags.DEFINE_string('counters_report', None,
'Path to the counters report file.')
flags.DEFINE_string('validation_output', None,
'Path to the validation output file.')
flags.mark_flag_as_required('validation_output')
Expand Down
102 changes: 102 additions & 0 deletions tools/import_validation/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
"""Tests for the ValidationRunner."""

import csv
import unittest
from unittest.mock import patch, MagicMock
import pandas as pd
Expand Down Expand Up @@ -350,3 +351,104 @@ def test_init_raises_error_if_required_file_is_missing(self):
lint_report=self.report_path,
validation_output=self.output_path)
self.assertIn("'stats' data source", str(context.exception))


class TestCountersIntegration(unittest.TestCase):
'''Test Class for counter validations integration in ValidationRunner.'''

def setUp(self):
self.test_dir = tempfile.TemporaryDirectory()
self.config_path = os.path.join(self.test_dir.name, 'config.json')
self.stats_path = os.path.join(self.test_dir.name, 'stats.csv')
self.report_path = os.path.join(self.test_dir.name, 'report.json')
self.differ_path = os.path.join(self.test_dir.name, 'differ.csv')
self.output_path = os.path.join(self.test_dir.name, 'output.csv')
self.counters_path = os.path.join(self.test_dir.name, 'counters.csv')

def tearDown(self):
self.test_dir.cleanup()

@patch('tools.import_validation.runner.Validator')
def test_runner_loads_counters_and_calls_validator(self, MockValidator):
# 1. Setup the mock
mock_validator_instance = MockValidator.return_value
mock_validator_instance.validate_counter_zero.return_value = ValidationResult(
ValidationStatus.PASSED, 'COUNTER_ZERO_CHECK')

# 2. Create test files
with open(self.config_path, 'w') as f:
json.dump(
{
'rules': [{
'rule_id': 'check_zero_errors',
'validator': 'COUNTER_ZERO_CHECK',
'params': {'counter_name': 'invalid-lat-lng'}
}]
}, f)

# Create sample CSV data for counters
with open(self.counters_path, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['key', 'value'])
writer.writerow(['invalid-lat-lng', 0])

# 3. Run the runner
runner = ValidationRunner(
validation_config_path=self.config_path,
stats_summary=self.stats_path,
differ_output=self.differ_path,
lint_report=self.report_path,
validation_output=self.output_path,
counters_report=self.counters_path)
runner.run_validations()

# 4. Assert that the correct method was called on the mock with loaded counters
mock_validator_instance.validate_counter_zero.assert_called_once()
call_args, _ = mock_validator_instance.validate_counter_zero.call_args
self.assertEqual(call_args[0]['invalid-lat-lng'], 0)

@patch('tools.import_validation.runner.Validator')
def test_runner_strips_counter_prefixes(self, MockValidator):
# 1. Setup the mock
mock_validator_instance = MockValidator.return_value
mock_validator_instance.validate_counter_max_threshold.return_value = ValidationResult(
ValidationStatus.PASSED, 'COUNTER_MAX_THRESHOLD')

# 2. Create test files with prefixed keys
with open(self.config_path, 'w') as f:
json.dump(
{
'rules': [{
'rule_id': 'check_dropped_points',
'validator': 'COUNTER_MAX_THRESHOLD',
'params': {
'counter_name': 'dropped-points',
'threshold': 10
}
}]
}, f)

# Create sample CSV data with prefixes and duplicates
with open(self.counters_path, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['key', 'value'])
writer.writerow(['2:prepare_output_dropped-points', 5])
writer.writerow(['3:write_statvar_mcf_dropped-points', 3])

# 3. Run the runner
runner = ValidationRunner(
validation_config_path=self.config_path,
stats_summary=self.stats_path,
differ_output=self.differ_path,
lint_report=self.report_path,
validation_output=self.output_path,
counters_report=self.counters_path)
runner.run_validations()

# 4. Assert that the correct method was called with aggregated counters
mock_validator_instance.validate_counter_max_threshold.assert_called_once()
call_args, _ = mock_validator_instance.validate_counter_max_threshold.call_args
# Values should be summed: 5 + 3 = 8
self.assertEqual(call_args[0]['dropped-points'], 8)


Loading
Loading