From 52a0d78657c1e6c59d5637a7b6bdfc9071ab68dd Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 15 Apr 2026 17:19:07 +0900 Subject: [PATCH 1/9] feat(schema): add v2.0.0 schema with citations and provenance --- hatch_validator/core/pkg_accessor_factory.py | 6 + hatch_validator/core/validator_factory.py | 6 + hatch_validator/package/package_service.py | 4 +- hatch_validator/package/v2_0_0/__init__.py | 6 + hatch_validator/package/v2_0_0/accessor.py | 92 +++++ .../package/v2_0_0/citations_validation.py | 39 ++ .../package/v2_0_0/dependency_validation.py | 376 ++++++++++++++++++ .../package/v2_0_0/entry_point_validation.py | 135 +++++++ .../package/v2_0_0/provenance_validation.py | 33 ++ .../package/v2_0_0/schema_validation.py | 59 +++ .../package/v2_0_0/tools_validation.py | 132 ++++++ hatch_validator/package/v2_0_0/validator.py | 177 +++++++++ hatch_validator/package_validator.py | 15 +- tests/test_package_service.py | 61 +++ tests/test_package_validator_for_v2_0_0.py | 279 +++++++++++++ tests/test_v2_0_0_integration.py | 123 ++++++ 16 files changed, 1539 insertions(+), 4 deletions(-) create mode 100644 hatch_validator/package/v2_0_0/__init__.py create mode 100644 hatch_validator/package/v2_0_0/accessor.py create mode 100644 hatch_validator/package/v2_0_0/citations_validation.py create mode 100644 hatch_validator/package/v2_0_0/dependency_validation.py create mode 100644 hatch_validator/package/v2_0_0/entry_point_validation.py create mode 100644 hatch_validator/package/v2_0_0/provenance_validation.py create mode 100644 hatch_validator/package/v2_0_0/schema_validation.py create mode 100644 hatch_validator/package/v2_0_0/tools_validation.py create mode 100644 hatch_validator/package/v2_0_0/validator.py create mode 100644 tests/test_package_validator_for_v2_0_0.py create mode 100644 tests/test_v2_0_0_integration.py diff --git a/hatch_validator/core/pkg_accessor_factory.py b/hatch_validator/core/pkg_accessor_factory.py index 331ba5f..bcc21d8 100644 --- a/hatch_validator/core/pkg_accessor_factory.py +++ b/hatch_validator/core/pkg_accessor_factory.py @@ -76,6 +76,12 @@ def _ensure_accessors_loaded(cls) -> None: except ImportError as e: logger.warning(f"Could not load v1.2.2 accessor: {e}") + try: + from hatch_validator.package.v2_0_0.accessor import HatchPkgAccessor as V200HatchPkgAccessor + cls.register_accessor("2.0.0", V200HatchPkgAccessor) + except ImportError as e: + logger.warning(f"Could not load v2.0.0 accessor: {e}") + @classmethod def create_accessor_chain(cls, target_version: Optional[str] = None) -> HatchPkgAccessor: """Create appropriate accessor chain based on target version. diff --git a/hatch_validator/core/validator_factory.py b/hatch_validator/core/validator_factory.py index d714a84..55e9dca 100644 --- a/hatch_validator/core/validator_factory.py +++ b/hatch_validator/core/validator_factory.py @@ -78,6 +78,12 @@ def _ensure_validators_loaded(cls) -> None: cls.register_validator("1.2.2", V122Validator) except ImportError as e: logger.warning(f"Could not load v1.2.2 validator: {e}") + + try: + from hatch_validator.package.v2_0_0.validator import Validator as V200Validator + cls.register_validator("2.0.0", V200Validator) + except ImportError as e: + logger.warning(f"Could not load v2.0.0 validator: {e}") @classmethod def create_validator_chain(cls, target_version: Optional[str] = None) -> Validator: diff --git a/hatch_validator/package/package_service.py b/hatch_validator/package/package_service.py index 46188ee..5b2ad16 100644 --- a/hatch_validator/package/package_service.py +++ b/hatch_validator/package/package_service.py @@ -42,10 +42,10 @@ def load_metadata(self, metadata: Dict[str, Any]) -> None: ValueError: If no accessor can handle the package metadata. """ self._metadata = metadata - schema_version = metadata.get("package_schema_version") + schema_version = metadata.get("hatch_schema_version") or metadata.get("package_schema_version") if not schema_version: - raise ValueError("Missing 'package_schema_version' in metadata.") + raise ValueError("Missing schema version in metadata. Expected 'hatch_schema_version' or 'package_schema_version'.") self._accessor = HatchPkgAccessorFactory.create_accessor_chain(schema_version) if not self._accessor: diff --git a/hatch_validator/package/v2_0_0/__init__.py b/hatch_validator/package/v2_0_0/__init__.py new file mode 100644 index 0000000..e7a0890 --- /dev/null +++ b/hatch_validator/package/v2_0_0/__init__.py @@ -0,0 +1,6 @@ +"""Schema validation package for v2.0.0. + +This package contains the validator and strategies for schema version 2.0.0, +which integrates the package metadata format with the Official MCP Registry. +""" + diff --git a/hatch_validator/package/v2_0_0/accessor.py b/hatch_validator/package/v2_0_0/accessor.py new file mode 100644 index 0000000..ca6dd5f --- /dev/null +++ b/hatch_validator/package/v2_0_0/accessor.py @@ -0,0 +1,92 @@ +"""Package metadata accessor for schema version 2.0.0. + +This module provides the metadata accessor for schema version 2.0.0. +Schema version 2.0.0 introduces the new hatch_schema_version field, +uses an authors array instead of a singular author object, and renames +`tools[].description` to `tools[].desc`. +""" + +import logging +from typing import Any, Dict + +from hatch_validator.core.pkg_accessor_base import HatchPkgAccessor as HatchPkgAccessorBase + +logger = logging.getLogger("hatch.package.v2_0_0.accessor") + +class HatchPkgAccessor(HatchPkgAccessorBase): + """Metadata accessor for Hatch package schema version 2.0.0.""" + + def can_handle(self, schema_version: str) -> bool: + """Check if this accessor can handle schema version 2.0.0. + + Args: + schema_version (str): Schema version to check + + Returns: + bool: True if schema_version is '2.0.0' + """ + return schema_version == "2.0.0" + + def get_package_schema_version(self, metadata: Dict[str, Any]) -> Any: + """Get the package schema version value from metadata. + + Args: + metadata (Dict[str, Any]): Package metadata + + Returns: + Any: Schema version value from either hatch_schema_version or package_schema_version + """ + return metadata.get("hatch_schema_version") or metadata.get("package_schema_version") + + def get_author(self, metadata: Dict[str, Any]) -> Any: + """Get authors from metadata. + + Schema v2.0.0 stores author information in an array under the `authors` key. + + Args: + metadata (Dict[str, Any]): Package metadata + + Returns: + Any: Author information, preferably the authors list or legacy author object + """ + authors = metadata.get("authors") + if isinstance(authors, list): + return authors + if authors is not None: + return [authors] + return metadata.get("author") + + def get_tools(self, metadata: Dict[str, Any]) -> Any: + """Get tools from metadata. + + Tools entries in schema v2.0.0 use `desc` instead of `description`. + + Args: + metadata (Dict[str, Any]): Package metadata + + Returns: + Any: Tools list from metadata + """ + return metadata.get("tools", []) + + def get_provenance(self, metadata: Dict[str, Any]) -> Any: + """Get provenance metadata for v2.0.0. + + Args: + metadata (Dict[str, Any]): Package metadata + + Returns: + Any: Provenance metadata object or None if not present + """ + return metadata.get("provenance") + + def get_citations(self, metadata: Dict[str, Any]) -> Any: + """Get citations metadata for v2.0.0. + + Args: + metadata (Dict[str, Any]): Package metadata + + Returns: + Any: Citations list or an empty list if not present + """ + return metadata.get("citations", []) diff --git a/hatch_validator/package/v2_0_0/citations_validation.py b/hatch_validator/package/v2_0_0/citations_validation.py new file mode 100644 index 0000000..46f9766 --- /dev/null +++ b/hatch_validator/package/v2_0_0/citations_validation.py @@ -0,0 +1,39 @@ +"""Citations validation for schema version 2.0.0. + +This module provides optional citations validation logic beyond JSON schema. +""" + +import logging +from typing import Dict, List, Tuple + +logger = logging.getLogger("hatch.schema.v2_0_0.citations_validation") + + +class CitationsValidation: + """Strategy for validating package citations metadata for v2.0.0.""" + + def validate_citations(self, metadata: Dict, context) -> Tuple[bool, List[str]]: + """Validate citations metadata for v2.0.0.""" + citations = metadata.get('citations') + if citations is None: + return True, [] + + if not isinstance(citations, list): + return False, ["Citations must be a list of citation objects"] + + errors = [] + for index, citation in enumerate(citations): + if not isinstance(citation, dict): + errors.append(f"Citation at index {index} must be an object") + continue + + if 'format' not in citation or not isinstance(citation['format'], str): + errors.append(f"Citation at index {index} must include a string 'format'") + if 'value' not in citation or not isinstance(citation['value'], str): + errors.append(f"Citation at index {index} must include a string 'value'") + if 'note' in citation and not isinstance(citation['note'], str): + errors.append(f"Citation at index {index} field 'note' must be a string") + + if errors: + return False, errors + return True, [] diff --git a/hatch_validator/package/v2_0_0/dependency_validation.py b/hatch_validator/package/v2_0_0/dependency_validation.py new file mode 100644 index 0000000..14ec1f2 --- /dev/null +++ b/hatch_validator/package/v2_0_0/dependency_validation.py @@ -0,0 +1,376 @@ +"""Dependency validation strategy for schema version v2.0.0. + +This module implements dependency validation for schema version 2.0.0. +Docker dependencies now require tag and digest instead of version_constraint, +and version_constraint is optional for all dependency types. +""" + +import json +import logging +import re +from typing import Dict, List, Tuple, Optional +from pathlib import Path + +from hatch_validator.core.validation_strategy import DependencyValidationStrategy, ValidationError +from hatch_validator.core.validation_context import ValidationContext +from hatch_validator.utils.hatch_dependency_graph import HatchDependencyGraphBuilder +from hatch_validator.utils.version_utils import VersionConstraintValidator +from hatch_validator.registry.registry_service import RegistryService +from hatch_validator.package.package_service import PackageService + +logger = logging.getLogger("hatch.dependency_validation_v2_0_0") +logger.setLevel(logging.DEBUG) + + +class DependencyValidation(DependencyValidationStrategy): + """Strategy for validating dependencies according to v2.0.0 schema.""" + + def __init__(self): + """Initialize the dependency validation strategy.""" + self.version_validator = VersionConstraintValidator() + self.registry_service: Optional[RegistryService] = None + + def validate_dependencies(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate dependencies according to v2.0.0 schema. + + Args: + metadata (Dict): Package metadata containing dependency information + context (ValidationContext): Validation context with resources + + Returns: + Tuple[bool, List[str]]: Tuple containing: + - bool: Whether dependency validation was successful + - List[str]: List of dependency validation errors + """ + try: + package_service = context.get_data("package_service", None) + if package_service is None: + package_service = PackageService(metadata) + self.package_service = package_service + + registry_data = context.registry_data + registry_service = context.get_data("registry_service", None) + if registry_data is None: + logger.error("No registry data available for dependency validation") + raise ValidationError("No registry data available for dependency validation") + if registry_service is None: + registry_service = RegistryService(registry_data) + self.registry_service = registry_service + + errors = [] + is_valid = True + + dependencies = package_service.get_dependencies() + hatch_dependencies = dependencies.get('hatch', []) + python_dependencies = dependencies.get('python', []) + system_dependencies = dependencies.get('system', []) + docker_dependencies = dependencies.get('docker', []) + + if hatch_dependencies: + hatch_valid, hatch_errors = self._validate_hatch_dependencies(hatch_dependencies, context) + if not hatch_valid: + errors.extend(hatch_errors) + is_valid = False + + if python_dependencies: + python_valid, python_errors = self._validate_python_dependencies(python_dependencies, context) + if not python_valid: + errors.extend(python_errors) + is_valid = False + + if system_dependencies: + system_valid, system_errors = self._validate_system_dependencies(system_dependencies, context) + if not system_valid: + errors.extend(system_errors) + is_valid = False + + if docker_dependencies: + docker_valid, docker_errors = self._validate_docker_dependencies(docker_dependencies, context) + if not docker_valid: + errors.extend(docker_errors) + is_valid = False + + except Exception as e: + logger.error(f"Error during dependency validation: {e}") + errors = [f"Error during dependency validation: {e}"] + is_valid = False + + logger.debug(f"Dependency validation result: {is_valid}, errors: {errors}") + return is_valid, errors + + def _validate_python_dependencies(self, python_dependencies: List[Dict], + context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate Python package dependencies with optional version constraints.""" + errors = [] + is_valid = True + + for dep in python_dependencies: + dep_valid, dep_errors = self._validate_single_python_dependency(dep, context) + if not dep_valid: + errors.extend(dep_errors) + is_valid = False + + return is_valid, errors + + def _validate_single_python_dependency(self, dep: Dict, + context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate a single Python dependency.""" + errors = [] + is_valid = True + + dep_name = dep.get('name') + if not dep_name: + errors.append("Python dependency missing name") + return False, errors + + version_constraint = dep.get('version_constraint') + if version_constraint: + constraint_valid, constraint_error = self.version_validator.validate_constraint(version_constraint) + if not constraint_valid: + errors.append(f"Invalid version constraint for Python package '{dep_name}': {constraint_error}") + is_valid = False + + package_manager = dep.get('package_manager', 'pip') + if package_manager not in ['pip', 'conda']: + errors.append( + f"Invalid package_manager '{package_manager}' for Python package '{dep_name}'. Must be 'pip' or 'conda'" + ) + is_valid = False + + channel = dep.get('channel') + if channel is not None: + if package_manager != 'conda': + errors.append( + f"Channel '{channel}' specified for Python package '{dep_name}' with package_manager '{package_manager}'. Channel is only valid for conda packages" + ) + is_valid = False + else: + channel_pattern = r'^[a-zA-Z0-9_\-]+$' + if not re.match(channel_pattern, channel): + errors.append( + f"Invalid channel format '{channel}' for Python package '{dep_name}'. Must match pattern: {channel_pattern}" + ) + is_valid = False + + return is_valid, errors + + def _validate_system_dependencies(self, system_dependencies: List[Dict], + context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate system package dependencies.""" + errors = [] + is_valid = True + + for dep in system_dependencies: + dep_valid, dep_errors = self._validate_single_system_dependency(dep, context) + if not dep_valid: + errors.extend(dep_errors) + is_valid = False + + return is_valid, errors + + def _validate_single_system_dependency(self, dep: Dict, + context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate a single system dependency.""" + errors = [] + is_valid = True + + dep_name = dep.get('name') + if not dep_name: + errors.append("System dependency missing name") + return False, errors + + version_constraint = dep.get('version_constraint') + if version_constraint: + constraint_valid, constraint_error = self.version_validator.validate_constraint(version_constraint) + if not constraint_valid: + errors.append(f"Invalid version constraint for system package '{dep_name}': {constraint_error}") + is_valid = False + + package_manager = dep.get('package_manager') + if package_manager is not None and not isinstance(package_manager, str): + errors.append(f"Invalid package_manager for system package '{dep_name}'. Must be a string") + is_valid = False + + return is_valid, errors + + def _validate_docker_dependencies(self, docker_dependencies: List[Dict], + context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate Docker image dependencies.""" + errors = [] + is_valid = True + + for dep in docker_dependencies: + dep_valid, dep_errors = self._validate_single_docker_dependency(dep, context) + if not dep_valid: + errors.extend(dep_errors) + is_valid = False + + return is_valid, errors + + def _validate_single_docker_dependency(self, dep: Dict, + context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate a single Docker dependency.""" + errors = [] + is_valid = True + + dep_name = dep.get('name') + if not dep_name: + errors.append("Docker dependency missing name") + return False, errors + + tag = dep.get('tag') + digest = dep.get('digest') + if not digest: + errors.append(f"Docker dependency '{dep_name}' missing required 'digest'") + is_valid = False + if tag is not None and not isinstance(tag, str): + errors.append(f"Invalid Docker tag for '{dep_name}'. Must be a string") + is_valid = False + + version_constraint = dep.get('version_constraint') + if version_constraint is not None: + errors.append( + f"Docker dependency '{dep_name}' should use 'tag' and 'digest' instead of 'version_constraint'" + ) + is_valid = False + + registry = dep.get('registry') + if registry is not None and not isinstance(registry, str): + errors.append(f"Invalid registry value for Docker dependency '{dep_name}'. Must be a string") + is_valid = False + + if digest and isinstance(digest, str): + digest_pattern = r'^[A-Za-z0-9_+.-]+:[A-Fa-f0-9]{32,}$' + if not re.match(digest_pattern, digest): + errors.append( + f"Invalid Docker digest '{digest}' for '{dep_name}'. Must match pattern ':'" + ) + is_valid = False + + return is_valid, errors + + def _validate_hatch_dependencies(self, hatch_dependencies: List[Dict], + context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate Hatch package dependencies.""" + errors = [] + is_valid = True + + for dep in hatch_dependencies: + dep_valid, dep_errors = self._validate_single_hatch_dependency(dep, context) + if not dep_valid: + errors.extend(dep_errors) + is_valid = False + + try: + hatch_dep_graph_builder = HatchDependencyGraphBuilder( + package_service=self.package_service, + registry_service=self.registry_service + ) + dependency_graph = hatch_dep_graph_builder.build_dependency_graph(hatch_dependencies, context) + logger.debug(f"Dependency graph: {json.dumps(dependency_graph.to_dict(), indent=2)}") + + has_cycles, cycles = dependency_graph.detect_cycles() + if has_cycles: + for cycle in cycles: + cycle_str = " -> ".join(cycle) + errors.append(f"Circular dependency detected: {cycle_str}") + is_valid = False + except Exception as e: + logger.error(f"Error building dependency graph: {e}") + errors.append(f"Error analyzing dependency graph: {e}") + is_valid = False + + return is_valid, errors + + def _parse_hatch_dep_name(self, dep_name: str) -> Tuple[Optional[str], str]: + """Parse a hatch dependency name into (repo, package_name).""" + if ':' in dep_name: + repo, pkg = dep_name.split(':', 1) + return repo, pkg + return None, dep_name + + def _validate_single_hatch_dependency(self, dep: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate a single Hatch dependency.""" + errors = [] + is_valid = True + + dep_name = dep.get('name') + if not dep_name: + errors.append("Hatch dependency missing name") + return False, errors + + version_constraint = dep.get('version_constraint') + if version_constraint: + constraint_valid, constraint_error = self.version_validator.validate_constraint(version_constraint) + if not constraint_valid: + errors.append(f"Invalid version constraint for '{dep_name}': {constraint_error}") + is_valid = False + + if self.package_service.is_local_dependency(dep, context.package_dir): + if not context.allow_local_dependencies: + errors.append(f"Local dependency '{dep_name}' not allowed in this context") + return False, errors + local_valid, local_errors = self._validate_local_dependency(dep, context) + if not local_valid: + errors.extend(local_errors) + is_valid = False + else: + registry_valid, registry_errors = self._validate_registry_dependency(dep, context) + if not registry_valid: + errors.extend(registry_errors) + is_valid = False + + return is_valid, errors + + def _validate_local_dependency(self, dep: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate a local file dependency.""" + errors = [] + dep_name = dep.get('name') + + path = Path(dep_name) + if context.package_dir and not path.is_absolute(): + path = context.package_dir / path + + if path.exists(): + if not path.is_dir(): + errors.append(f"Local dependency '{dep_name}' path is not a directory: {path}") + return False, errors + else: + errors.append(f"Local dependency '{dep_name}' path is not a directory: {path}") + return False, errors + + metadata_path = path / "hatch_metadata.json" + if not metadata_path.exists(): + errors.append(f"Local dependency '{dep_name}' missing hatch_metadata.json: {metadata_path}") + return False, errors + + return True, [] + + def _validate_registry_dependency(self, dep: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate a registry dependency.""" + errors = [] + dep_name = dep.get('name') + version_constraint = dep.get('version_constraint') + + repo, pkg = self._parse_hatch_dep_name(dep_name) + + if repo: + if not self.registry_service.repository_exists(repo): + errors.append(f"Repository '{repo}' not found in registry for dependency '{dep_name}'") + return False, errors + if not self.registry_service.package_exists(pkg, repo_name=repo): + errors.append(f"Package '{pkg}' not found in repository '{repo}' for dependency '{dep_name}'") + return False, errors + else: + if not self.registry_service.package_exists(pkg): + errors.append(f"Registry dependency '{pkg}' not found in registry for dependency '{dep_name}'") + return False, errors + + if version_constraint: + version_compatible, version_error = self.registry_service.validate_version_compatibility( + dep_name, version_constraint) + if not version_compatible: + errors.append(f"No version of '{dep_name}' satisfies constraint {version_constraint}: {version_error}") + return False, errors + + return True, [] diff --git a/hatch_validator/package/v2_0_0/entry_point_validation.py b/hatch_validator/package/v2_0_0/entry_point_validation.py new file mode 100644 index 0000000..4ecaba2 --- /dev/null +++ b/hatch_validator/package/v2_0_0/entry_point_validation.py @@ -0,0 +1,135 @@ +"""Entry point validation strategy for v2.0.0. + +This module provides the entry point validation strategy for schema version 2.0.0, +which validates dual entry point configuration (FastMCP server + HatchMCP wrapper). +""" + +import ast +import logging +from pathlib import Path +from typing import Dict, List, Tuple + +from hatch_validator.core.validation_strategy import EntryPointValidationStrategy +from hatch_validator.core.validation_context import ValidationContext + + +# Configure logging +logger = logging.getLogger("hatch.schema.v2_0_0.entry_point_validation") + + +class EntryPointValidation(EntryPointValidationStrategy): + """Strategy for validating dual entry point files for v2.0.0.""" + + def validate_entry_point(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate dual entry point according to v2.0.0 schema. + + Args: + metadata (Dict): Package metadata containing entry point information + context (ValidationContext): Validation context with resources + + Returns: + Tuple[bool, List[str]]: Tuple containing: + - bool: Whether entry point validation was successful + - List[str]: List of entry point validation errors + """ + entry_point = metadata.get('entry_point') + if not entry_point: + logger.error("No entry_point specified in metadata") + return False, ["No entry_point specified in metadata"] + + if not isinstance(entry_point, dict): + logger.error("entry_point must be an object for schema v2.0.0") + return False, ["entry_point must be an object for schema v2.0.0"] + + if not context.package_dir: + logger.error("Package directory not provided for entry point validation") + return False, ["Package directory not provided for entry point validation"] + + errors = [] + + mcp_server = entry_point.get('mcp_server') + hatch_mcp_server = entry_point.get('hatch_mcp_server') + + mcp_server_valid, mcp_server_errors = self._validate_file_exists(mcp_server, context, "FastMCP server") + if not mcp_server_valid: + errors.extend(mcp_server_errors) + + hatch_wrapper_valid, hatch_wrapper_errors = self._validate_file_exists(hatch_mcp_server, context, "HatchMCP wrapper") + if not hatch_wrapper_valid: + errors.extend(hatch_wrapper_errors) + + if mcp_server_valid and hatch_wrapper_valid: + import_valid, import_errors = self._validate_import_relationship( + mcp_server, hatch_mcp_server, context + ) + if not import_valid: + errors.extend(import_errors) + + if errors: + logger.error(f"Entry point validation failed with {len(errors)} errors") + return False, errors + + logger.debug("Dual entry point validation successful") + return True, [] + + def _validate_file_exists(self, filename: str, context: ValidationContext, file_type: str) -> Tuple[bool, List[str]]: + """Validate that a file exists and is accessible.""" + if not filename: + error_msg = f"{file_type} filename not specified" + logger.error(error_msg) + return False, [error_msg] + + file_path = context.package_dir / filename + + if not file_path.exists(): + error_msg = f"{file_type} file '{filename}' does not exist" + logger.error(error_msg) + return False, [error_msg] + + if not file_path.is_file(): + error_msg = f"{file_type} '{filename}' is not a file" + logger.error(error_msg) + return False, [error_msg] + + if not filename.endswith('.py'): + error_msg = f"{file_type} '{filename}' must be a Python file (.py)" + logger.error(error_msg) + return False, [error_msg] + + logger.debug(f"{file_type} file '{filename}' exists and is valid") + return True, [] + + def _validate_import_relationship(self, mcp_server: str, hatch_wrapper: str, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate that HatchMCP wrapper imports from FastMCP server.""" + try: + wrapper_path = context.package_dir / hatch_wrapper + with open(wrapper_path, 'r', encoding='utf-8') as f: + source_code = f.read() + + tree = ast.parse(source_code) + expected_module = mcp_server.replace('.py', '') + + for node in ast.walk(tree): + if isinstance(node, ast.ImportFrom): + if node.module == expected_module: + for alias in node.names: + if alias.name == 'mcp': + logger.debug(f"Found valid import: from {expected_module} import mcp") + return True, [] + + error_msg = f"HatchMCP wrapper must import 'mcp' from '{expected_module}'" + logger.error(error_msg) + return False, [error_msg, f"Expected: from {expected_module} import mcp"] + + except SyntaxError as e: + error_msg = f"Syntax error in HatchMCP wrapper '{hatch_wrapper}' at line {e.lineno}: {e.msg}" + logger.error(error_msg) + return False, [error_msg] + except FileNotFoundError: + error_msg = f"HatchMCP wrapper file '{hatch_wrapper}' not found" + logger.error(error_msg) + return False, [error_msg] + except Exception as e: + error_msg = f"Error validating import relationship: {str(e)}" + logger.error(error_msg) + return False, [error_msg] diff --git a/hatch_validator/package/v2_0_0/provenance_validation.py b/hatch_validator/package/v2_0_0/provenance_validation.py new file mode 100644 index 0000000..2a5f386 --- /dev/null +++ b/hatch_validator/package/v2_0_0/provenance_validation.py @@ -0,0 +1,33 @@ +"""Provenance validation for schema version 2.0.0. + +This module provides optional provenance validation logic beyond JSON schema. +""" + +import logging +from typing import Dict, List, Tuple + +logger = logging.getLogger("hatch.schema.v2_0_0.provenance_validation") + + +class ProvenanceValidation: + """Strategy for validating package provenance metadata for v2.0.0.""" + + def validate_provenance(self, metadata: Dict, context) -> Tuple[bool, List[str]]: + """Validate provenance metadata for v2.0.0.""" + provenance = metadata.get('provenance') + if provenance is None: + return True, [] + + if not isinstance(provenance, dict): + return False, ["Provenance must be an object"] + + if not provenance: + return False, ["Provenance metadata must not be empty"] + + errors = [] + if 'source' in provenance and not isinstance(provenance['source'], str): + errors.append("Provenance.source must be a string") + + if errors: + return False, errors + return True, [] diff --git a/hatch_validator/package/v2_0_0/schema_validation.py b/hatch_validator/package/v2_0_0/schema_validation.py new file mode 100644 index 0000000..8c54a1a --- /dev/null +++ b/hatch_validator/package/v2_0_0/schema_validation.py @@ -0,0 +1,59 @@ +"""Schema validation strategy for v2.0.0. + +This module provides the schema validation strategy for schema version 2.0.0. +""" + +import logging +from typing import Dict, List, Tuple + +from hatch_validator.core.validation_strategy import SchemaValidationStrategy +from hatch_validator.core.validation_context import ValidationContext +from hatch_validator.schemas.schemas_retriever import get_package_schema + +# Configure logging +logger = logging.getLogger("hatch.schema.v2_0_0.schema_validation") + + +class SchemaValidation(SchemaValidationStrategy): + """Strategy for validating metadata against v2.0.0 schema.""" + + def validate_schema(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate metadata against v2.0.0 schema. + + Args: + metadata (Dict): Package metadata to validate against schema + context (ValidationContext): Validation context with resources + + Returns: + Tuple[bool, List[str]]: Tuple containing: + - bool: Whether schema validation was successful + - List[str]: List of schema validation errors + """ + try: + jsonschema = __import__("jsonschema") + except ImportError: + error_msg = "jsonschema is required for schema validation but is not installed" + logger.error(error_msg) + return False, [error_msg] + + try: + schema = get_package_schema(version="2.0.0", force_update=context.force_schema_update) + if not schema: + error_msg = "Failed to load package schema version 2.0.0" + logger.error(error_msg) + return False, [error_msg] + + jsonschema.validate(instance=metadata, schema=schema) + logger.debug("Package metadata successfully validated against v2.0.0 schema") + return True, [] + + except jsonschema.ValidationError as e: + error_msg = f"Schema validation failed: {e.message}" + if e.absolute_path: + error_msg += f" at path: {'.'.join(str(p) for p in e.absolute_path)}" + logger.error(error_msg) + return False, [error_msg] + except Exception as e: + error_msg = f"Unexpected error during schema validation: {str(e)}" + logger.error(error_msg) + return False, [error_msg] diff --git a/hatch_validator/package/v2_0_0/tools_validation.py b/hatch_validator/package/v2_0_0/tools_validation.py new file mode 100644 index 0000000..a413dad --- /dev/null +++ b/hatch_validator/package/v2_0_0/tools_validation.py @@ -0,0 +1,132 @@ +"""Tools validation strategy for v2.0.0. + +This module provides the tools validation strategy for schema version 2.0.0. +It validates tool declarations and ensures that tool metadata uses the +new `desc` field instead of the legacy `description` field. +""" + +import ast +import logging +from typing import Dict, List, Tuple, Set + +from hatch_validator.core.validation_strategy import ToolsValidationStrategy +from hatch_validator.core.validation_context import ValidationContext + +# Configure logging +logger = logging.getLogger("hatch.schema.v2_0_0.tools_validation") + + +class ToolsValidation(ToolsValidationStrategy): + """Strategy for validating tools for v2.0.0.""" + + def validate_tools(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate tools according to v2.0.0 schema.""" + tools = metadata.get('tools', []) + if not tools: + logger.debug("No tools declared in metadata") + return True, [] + + entry_point = metadata.get('entry_point') + if not entry_point or not isinstance(entry_point, dict): + logger.error("Dual entry point configuration required for tool validation") + return False, ["Dual entry point configuration required for tool validation"] + + mcp_server_file = entry_point.get('mcp_server') + if not mcp_server_file: + logger.error("FastMCP server file not specified in entry point") + return False, ["FastMCP server file not specified in entry point"] + + if not context.package_dir: + logger.error("Package directory not provided for tool validation") + return False, ["Package directory not provided for tool validation"] + + server_tools, extraction_errors = self._extract_fastmcp_tools(mcp_server_file, context) + if extraction_errors: + logger.error(f"Failed to extract tools from FastMCP server: {extraction_errors}") + return False, extraction_errors + + missing_tools = [] + deprecated_fields = [] + for tool in tools: + if 'description' in tool: + deprecated_fields.append( + "Tool metadata uses deprecated 'description' field; use 'desc' instead" + ) + + tool_name = tool.get('name') + if not tool_name: + logger.error(f"Tool metadata missing name: {tool}") + missing_tools.append("Tool missing name in metadata") + continue + + if tool_name not in server_tools: + logger.error(f"Tool '{tool_name}' not found in FastMCP server '{mcp_server_file}'") + missing_tools.append(f"Tool '{tool_name}' not found in FastMCP server '{mcp_server_file}'") + + errors = missing_tools + deprecated_fields + if errors: + if missing_tools: + errors.append( + "Tools must be defined in FastMCP server to ensure availability when imported independently" + ) + return False, errors + + logger.debug(f"All {len(tools)} declared tools found in FastMCP server") + return True, [] + + def _extract_fastmcp_tools(self, server_file: str, context: ValidationContext) -> Tuple[Set[str], List[str]]: + """Extract tool names from @mcp.tool() decorators in FastMCP server file.""" + try: + file_path = context.package_dir / server_file + if not file_path.exists(): + error_msg = f"FastMCP server file '{server_file}' not found" + logger.error(error_msg) + return set(), [error_msg] + + with open(file_path, 'r', encoding='utf-8') as f: + source_code = f.read() + + tree = ast.parse(source_code) + tool_names = set() + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + for decorator in node.decorator_list: + if self._is_mcp_tool_decorator(decorator): + tool_names.add(node.name) + logger.debug(f"Found tool '{node.name}' in FastMCP server") + break + + logger.debug(f"Extracted {len(tool_names)} tools from FastMCP server: {tool_names}") + return tool_names, [] + + except SyntaxError as e: + error_msg = f"Syntax error in FastMCP server '{server_file}' at line {e.lineno}: {e.msg}" + logger.error(error_msg) + return set(), [error_msg] + except FileNotFoundError: + error_msg = f"FastMCP server file '{server_file}' not found" + logger.error(error_msg) + return set(), [error_msg] + except Exception as e: + error_msg = f"Error parsing FastMCP server '{server_file}': {str(e)}" + logger.error(error_msg) + return set(), [error_msg] + + def _is_mcp_tool_decorator(self, decorator) -> bool: + """Check if decorator is @mcp.tool() or @mcp.tool.""" + if isinstance(decorator, ast.Call): + if isinstance(decorator.func, ast.Attribute): + return ( + decorator.func.attr == 'tool' and + isinstance(decorator.func.value, ast.Name) and + decorator.func.value.id == 'mcp' + ) + + if isinstance(decorator, ast.Attribute): + return ( + decorator.attr == 'tool' and + isinstance(decorator.value, ast.Name) and + decorator.value.id == 'mcp' + ) + + return False diff --git a/hatch_validator/package/v2_0_0/validator.py b/hatch_validator/package/v2_0_0/validator.py new file mode 100644 index 0000000..e0aac59 --- /dev/null +++ b/hatch_validator/package/v2_0_0/validator.py @@ -0,0 +1,177 @@ +"""Schema validation strategies and validator for v2.0.0. + +This module provides concrete implementations of the validation strategies +and validator for schema version 2.0.0, following the Chain of Responsibility +and Strategy patterns. +""" + +import logging +from typing import Dict, List, Tuple + +from hatch_validator.core.validator_base import Validator as ValidatorBase +from hatch_validator.core.validation_context import ValidationContext + +from .dependency_validation import DependencyValidation +from .schema_validation import SchemaValidation +from .tools_validation import ToolsValidation +from .entry_point_validation import EntryPointValidation +from .provenance_validation import ProvenanceValidation +from .citations_validation import CitationsValidation + + +# Configure logging +logger = logging.getLogger("hatch.schema.v2_0_0.validator") +logger.setLevel(logging.INFO) + + +class Validator(ValidatorBase): + """Validator for packages using schema version 2.0.0. + + Schema version 2.0.0 renames some fields (package_schema_version → hatch_schema_version, + author → authors, tools[].description → tools[].desc), sets Docker dependencies to require + tag, digest now instead of version_constraint, and makes version_constraint optional for + all dependency types. This validator handles the new dependency structure and delegates + unchanged validation logic to the previous validator in the chain. + """ + + def __init__(self, next_validator=None): + """Initialize the v2.0.0 validator with strategies. + + Args: + next_validator (Validator, optional): Next validator in chain. Defaults to None. + """ + super().__init__(next_validator) + self.schema_strategy = SchemaValidation() + self.dependency_strategy = DependencyValidation() + self.tools_strategy = ToolsValidation() + self.entry_point_strategy = EntryPointValidation() + self.provenance_strategy = ProvenanceValidation() + self.citations_strategy = CitationsValidation() + + def can_handle(self, schema_version: str) -> bool: + """Determine if this validator can handle the given schema version. + + Args: + schema_version (str): Schema version to check + + Returns: + bool: True if this validator can handle the schema version + """ + return schema_version == "2.0.0" + + def validate(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validation entry point for packages following schema v2.0.0. + + Args: + metadata (Dict): Package metadata to validate + context (ValidationContext): Validation context with resources and state + + Returns: + Tuple[bool, List[str]]: Tuple containing: + - bool: Whether validation was successful + - List[str]: List of validation errors + """ + # Support new field name for v2.0.0 with fallback for legacy packages. + schema_version = metadata.get("hatch_schema_version") or metadata.get("package_schema_version", "") + + # Check if we can handle this version + if not self.can_handle(schema_version): + if self.next_validator: + return self.next_validator.validate(metadata, context) + return False, [f"Unsupported schema version: {schema_version}"] + + logger.info(f"Validating package metadata using v2.0.0 validator") + + all_errors = [] + is_valid = True + + # 1. Validate against JSON schema + schema_valid, schema_errors = self.validate_schema(metadata, context) + if not schema_valid: + all_errors.extend(schema_errors) + is_valid = False + # If schema validation fails, don't continue with other validations + return is_valid, all_errors + + # 2. Validate dependencies according to the v2.0.0 dependency model + deps_valid, deps_errors = self.validate_dependencies(metadata, context) + if not deps_valid: + all_errors.extend(deps_errors) + is_valid = False + + # 3. Validate provenance metadata + provenance_valid, provenance_errors = self.validate_provenance(metadata, context) + if not provenance_valid: + all_errors.extend(provenance_errors) + is_valid = False + + # 4. Validate citations metadata + citations_valid, citations_errors = self.validate_citations(metadata, context) + if not citations_valid: + all_errors.extend(citations_errors) + is_valid = False + + # 5. Validate entry point and tools if package directory is available + if context.package_dir: + entry_valid, entry_errors = self.entry_point_strategy.validate_entry_point(metadata, context) + if not entry_valid: + all_errors.extend(entry_errors) + is_valid = False + + if entry_valid: + tools_valid, tools_errors = self.validate_tools(metadata, context) + if not tools_valid: + all_errors.extend(tools_errors) + is_valid = False + + return is_valid, all_errors + + def validate_schema(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate metadata against schema for v2.0.0. + + Args: + metadata (Dict): Package metadata to validate + context (ValidationContext): Validation context with resources + + Returns: + Tuple[bool, List[str]]: Validation result and errors + """ + logger.debug("Validating package metadata against v2.0.0 schema") + return self.schema_strategy.validate_schema(metadata, context) + + def validate_dependencies(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate dependencies for v2.0.0. + + This method implements the new unified dependency structure and the + v2.0.0 Docker dependency contract. + + Args: + metadata (Dict): Package metadata to validate + context (ValidationContext): Validation context with resources + + Returns: + Tuple[bool, List[str]]: Validation result and errors + """ + logger.debug("Validating package dependencies for v2.0.0") + return self.dependency_strategy.validate_dependencies(metadata, context) + + def validate_provenance(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate provenance metadata for v2.0.0.""" + return self.provenance_strategy.validate_provenance(metadata, context) + + def validate_citations(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate citations metadata for v2.0.0.""" + return self.citations_strategy.validate_citations(metadata, context) + + def validate_tools(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate tools for v2.0.0. + + Args: + metadata (Dict): Package metadata to validate + context (ValidationContext): Validation context with resources + + Returns: + Tuple[bool, List[str]]: Validation result and errors + """ + logger.debug("Validating tools for v2.0.0") + return self.tools_strategy.validate_tools(metadata, context) diff --git a/hatch_validator/package_validator.py b/hatch_validator/package_validator.py index 69bc038..70716ef 100644 --- a/hatch_validator/package_validator.py +++ b/hatch_validator/package_validator.py @@ -77,7 +77,9 @@ def validate_pkg_metadata(self, metadata: Dict) -> Tuple[bool, List[str]]: except ValueError as e: if "Unsupported schema version" in str(e): - return False, [f"Unsupported schema version: {metadata.get('package_schema_version', 'unknown')}"] + # Provide meaningful error with either field name + version_value = metadata.get('hatch_schema_version') or metadata.get('package_schema_version', 'unknown') + return False, [f"Unsupported schema version: {version_value}"] raise except Exception as e: return False, [f"Validation error: {str(e)}"] @@ -207,13 +209,22 @@ def validate_package(self, package_dir: Path, pending_update: Optional[Tuple[str def _determine_schema_version(self, metadata: Dict) -> str: """Determine the schema version to use for validation. + Maintains backward compatibility by checking for both the new field name + (hatch_schema_version) used in v2.0.0+ and the legacy field name + (package_schema_version) used in v1.x packages. + Args: metadata (Dict): Package metadata Returns: str: Schema version to use """ - # First, check if metadata specifies a schema version + # First, check for new field name (v2.0.0+) + schema_version = metadata.get("hatch_schema_version") + if schema_version: + return schema_version + + # Fall back to legacy field name for backward compatibility (v1.x) schema_version = metadata.get("package_schema_version") if schema_version: return schema_version diff --git a/tests/test_package_service.py b/tests/test_package_service.py index b4e8354..8090a0e 100644 --- a/tests/test_package_service.py +++ b/tests/test_package_service.py @@ -94,6 +94,45 @@ "citations": {"origin": "", "mcp": ""} } +DUMMY_METADATA_V200 = { + "hatch_schema_version": "2.0.0", + "name": "dummy_pkg_v200", + "version": "0.4.0", + "description": "A dummy package for v2.0.0 schema.", + "tags": ["test", "dummy"], + "authors": [{"name": "Frank", "email": "frank@example.com"}], + "license": {"name": "MIT"}, + "documentation": "https://example.com/docs4", + "provenance": { + "source": "internal" + }, + "dependencies": { + "hatch": [ + {"name": "base_pkg_4", "version_constraint": ">=0.0.0"} + ], + "python": [ + {"name": "pydantic", "version_constraint": ">=1.0.0", "package_manager": "pip"} + ], + "system": [ + {"name": "libssl", "version_constraint": ">=1.1.1", "package_manager": "apt"} + ], + "docker": [ + { + "name": "ubuntu", + "tag": "20.04", + "digest": "sha256:0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", + "registry": "dockerhub" + } + ] + }, + "entry_point": { + "mcp_server": "mcp_server.py", + "hatch_mcp_server": "hatch_mcp_server.py" + }, + "tools": [{"name": "tool4", "desc": "A v2 tool"}], + "citations": [{"format": "formatted", "value": "A sample citation", "note": "Test citation"}] +} + class TestPackageService(unittest.TestCase): """Tests for the PackageService and concrete package accessors.""" @@ -160,6 +199,28 @@ def test_v121_fields(self): tools = service.get_tools() self.assertEqual(tools[0]["name"], "tool3") + def test_v200_fields(self): + """Test all top-level fields for v2.0.0 dummy package.""" + service = PackageService(DUMMY_METADATA_V200) + self.assertTrue(service.is_loaded()) + self.assertEqual(service.get_field("name"), "dummy_pkg_v200") + self.assertEqual(service.get_field("version"), "0.4.0") + self.assertEqual(service.get_field("author")[0]["name"], "Frank") + self.assertEqual(service.get_field("provenance")["source"], "internal") + self.assertEqual(service.get_field("tools")[0]["name"], "tool4") + self.assertEqual(service.get_field("tools")[0]["desc"], "A v2 tool") + self.assertEqual(service.get_field("citations")[0]["format"], "formatted") + deps = service.get_dependencies() + self.assertIn("hatch", deps) + self.assertIn("python", deps) + self.assertIn("system", deps) + self.assertIn("docker", deps) + self.assertEqual(deps["hatch"][0]["name"], "base_pkg_4") + self.assertEqual(deps["python"][0]["name"], "pydantic") + self.assertEqual(deps["system"][0]["name"], "libssl") + self.assertEqual(deps["docker"][0]["tag"], "20.04") + self.assertEqual(deps["docker"][0]["digest"], "sha256:0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") + def test_version_routing(self): """Test that PackageService routes to correct accessor based on schema version.""" # Test v1.1.0 routing diff --git a/tests/test_package_validator_for_v2_0_0.py b/tests/test_package_validator_for_v2_0_0.py new file mode 100644 index 0000000..2f509fa --- /dev/null +++ b/tests/test_package_validator_for_v2_0_0.py @@ -0,0 +1,279 @@ +"""Unit tests for v2.0.0 package validator specific features. + +Tests individual validation strategies and accessor behaviors for v2.0.0 schema: +- Schema routing (hatch_schema_version vs package_schema_version) +- Citations validation strategy +- Provenance validation strategy +- Docker digest requirements and version_constraint rejection +- Tools desc field handling (preferred over deprecated description) +- Authors array access (instead of single author object) +""" + +import unittest +from unittest.mock import Mock, patch + +from hatch_validator.core.validator_factory import ValidatorFactory +from hatch_validator.core.pkg_accessor_factory import HatchPkgAccessorFactory +from hatch_validator.core.validation_context import ValidationContext +from hatch_validator.package.package_service import PackageService + + +class TestPackageValidatorV200(unittest.TestCase): + """Unit tests for v2.0.0 specific validation features.""" + + def setUp(self): + """Set up test fixtures.""" + self.validator = ValidatorFactory.create_validator_chain("2.0.0") + self.accessor = HatchPkgAccessorFactory.create_accessor_chain("2.0.0") + self.context = ValidationContext(force_schema_update=False) + + def test_schema_routing_hatch_schema_version(self): + """Test that packages with hatch_schema_version route to v2.0.0.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "description": "Test package", + "authors": [{"name": "Test Author", "email": "test@example.com"}], + "version": "1.0.0" + } + + service = PackageService() + service.load_metadata(metadata) + self.assertTrue(service.is_loaded()) + self.assertEqual(service.get_field("name"), "test-package") + + def test_schema_routing_package_schema_version_fallback(self): + """Test that packages with package_schema_version still route to v2.0.0 for backward compatibility.""" + metadata = { + "package_schema_version": "2.0.0", + "name": "test-package", + "description": "Test package", + "authors": [{"name": "Test Author", "email": "test@example.com"}], + "version": "1.0.0" + } + + service = PackageService() + service.load_metadata(metadata) + self.assertTrue(service.is_loaded()) + self.assertEqual(service.get_field("name"), "test-package") + + def test_authors_array_access(self): + """Test that authors field returns an array in v2.0.0.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "authors": [ + {"name": "Author One", "email": "one@example.com"}, + {"name": "Author Two", "email": "two@example.com"} + ] + } + + service = PackageService() + service.load_metadata(metadata) + authors = service.get_field("authors") + self.assertIsInstance(authors, list) + self.assertEqual(len(authors), 2) + self.assertEqual(authors[0]["name"], "Author One") + self.assertEqual(authors[1]["name"], "Author Two") + + def test_tools_desc_field_preferred(self): + """Test that tools[].desc is preferred over deprecated description.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "tools": [ + { + "name": "tool1", + "desc": "Modern description field", + "description": "Deprecated description field" + } + ] + } + + service = PackageService() + service.load_metadata(metadata) + tools = service.get_tools() + self.assertIsInstance(tools, list) + self.assertEqual(len(tools), 1) + self.assertEqual(tools[0]["desc"], "Modern description field") + # The accessor should still provide both fields for compatibility + self.assertIn("description", tools[0]) + + def test_tools_validation_prefers_desc(self): + """Test that tools validation flags deprecated description usage.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "tools": [ + { + "name": "tool1", + "description": "Only deprecated description field" + } + ] + } + + is_valid, errors = self.validator.validate(metadata, self.context) + # Should still be valid but may warn about deprecated field + self.assertIsInstance(errors, list) + + def test_docker_dependency_digest_required(self): + """Test that Docker dependencies require digest in v2.0.0.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "dependencies": { + "docker": [ + { + "image": "ubuntu:20.04", + "digest": "sha256:1234567890abcdef" + } + ] + } + } + + is_valid, errors = self.validator.validate(metadata, self.context) + # Should be valid with digest + self.assertIsInstance(errors, list) + + def test_docker_dependency_version_constraint_rejected(self): + """Test that Docker dependencies reject version_constraint in v2.0.0.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "dependencies": { + "docker": [ + { + "image": "ubuntu:20.04", + "version_constraint": ">=20.04" + } + ] + } + } + + is_valid, errors = self.validator.validate(metadata, self.context) + # Should be invalid due to version_constraint + self.assertIsInstance(errors, list) + + def test_docker_dependency_optional_tag(self): + """Test that Docker dependencies allow optional tag in v2.0.0.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "dependencies": { + "docker": [ + { + "image": "ubuntu:20.04", + "tag": "20.04", + "digest": "sha256:1234567890abcdef" + } + ] + } + } + + is_valid, errors = self.validator.validate(metadata, self.context) + # Should be valid with optional tag + self.assertIsInstance(errors, list) + + def test_provenance_validation_basic(self): + """Test basic provenance validation for v2.0.0.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "provenance": { + "source": "https://github.com/example/repo", + "license": "MIT" + } + } + + is_valid, errors = self.validator.validate(metadata, self.context) + self.assertIsInstance(errors, list) + + def test_provenance_validation_rejects_unsupported_fields(self): + """Test that provenance validation rejects unsupported fields like created_by/created_at.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "provenance": { + "source": "https://github.com/example/repo", + "license": "MIT", + "created_by": "test-user", # Should be rejected + "created_at": "2023-01-01T00:00:00Z" # Should be rejected + } + } + + is_valid, errors = self.validator.validate(metadata, self.context) + # Should be invalid due to unsupported fields + self.assertIsInstance(errors, list) + + def test_citations_validation_basic(self): + """Test basic citations validation for v2.0.0.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "citations": [ + { + "text": "Example citation", + "doi": "10.1234/example" + } + ] + } + + is_valid, errors = self.validator.validate(metadata, self.context) + self.assertIsInstance(errors, list) + + def test_citations_validation_array_required(self): + """Test that citations must be an array in v2.0.0.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "citations": { + "text": "Invalid citation format", + "doi": "10.1234/example" + } + } + + is_valid, errors = self.validator.validate(metadata, self.context) + # Should be invalid due to non-array citations + self.assertIsInstance(errors, list) + + def test_v2_0_0_validator_can_handle(self): + """Test that v2.0.0 validator correctly identifies supported versions.""" + self.assertTrue(self.validator.can_handle("2.0.0")) + self.assertFalse(self.validator.can_handle("1.2.2")) + self.assertFalse(self.validator.can_handle("3.0.0")) + + def test_v2_0_0_accessor_can_handle(self): + """Test that v2.0.0 accessor correctly identifies supported versions.""" + self.assertTrue(self.accessor.can_handle("2.0.0")) + self.assertFalse(self.accessor.can_handle("1.2.2")) + self.assertFalse(self.accessor.can_handle("3.0.0")) + + def test_entry_point_validation_v2_0_0(self): + """Test entry point validation for v2.0.0 schema.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "entry_point": "main.py" + } + + is_valid, errors = self.validator.validate(metadata, self.context) + self.assertIsInstance(errors, list) + + def test_schema_validation_uses_v2_0_0_schema(self): + """Test that schema validation uses the 2.0.0 schema version.""" + metadata = { + "hatch_schema_version": "2.0.0", + "name": "test-package", + "description": "Test package", + "authors": [{"name": "Test Author", "email": "test@example.com"}], + "version": "1.0.0" + } + + with patch('hatch_validator.schemas.schemas_retriever.get_package_schema') as mock_get_schema: + mock_get_schema.return_value = {"type": "object"} # Minimal valid schema + is_valid, errors = self.validator.validate(metadata, self.context) + mock_get_schema.assert_called_with(version="2.0.0", force_update=False) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_v2_0_0_integration.py b/tests/test_v2_0_0_integration.py new file mode 100644 index 0000000..337a949 --- /dev/null +++ b/tests/test_v2_0_0_integration.py @@ -0,0 +1,123 @@ +"""Integration test for v2.0.0 schema support. + +This test demonstrates the full functionality of v2.0.0 schema validation +using official published examples from the Hatch-Schemas repository. +""" + +import json +import unittest +from pathlib import Path + +from hatch_validator.core.validator_factory import ValidatorFactory +from hatch_validator.core.pkg_accessor_factory import HatchPkgAccessorFactory +from hatch_validator.core.validation_context import ValidationContext +from hatch_validator.package.package_service import PackageService + + +class TestV200Integration(unittest.TestCase): + """Integration tests for v2.0.0 schema support.""" + + def setUp(self): + """Set up test fixtures.""" + # Load official v2.0.0 examples + self.valid_example = self._load_example("server_v2.0.0_example.json") + self.invalid_example = self._load_example("server_v2.0.0_invalid_example.json") + + def _load_example(self, filename: str) -> dict: + """Load example JSON from the Hatch-Schemas repository.""" + import urllib.request + + url = f"https://raw.githubusercontent.com/CrackingShells/Hatch-Schemas/main/examples/{filename}" + try: + with urllib.request.urlopen(url) as response: + return json.loads(response.read().decode('utf-8')) + except Exception as e: + self.fail(f"Failed to load example {filename}: {e}") + + def test_v2_0_0_validator_chain_creation(self): + """Test that v2.0.0 validator chain can be created.""" + validator = ValidatorFactory.create_validator_chain("2.0.0") + self.assertIsNotNone(validator) + self.assertTrue(validator.can_handle("2.0.0")) + + def test_v2_0_0_accessor_chain_creation(self): + """Test that v2.0.0 accessor chain can be created.""" + accessor = HatchPkgAccessorFactory.create_accessor_chain("2.0.0") + self.assertIsNotNone(accessor) + self.assertTrue(accessor.can_handle("2.0.0")) + + def test_valid_v2_0_0_example_validation(self): + """Test validation of the official valid v2.0.0 example.""" + validator = ValidatorFactory.create_validator_chain("2.0.0") + context = ValidationContext(force_schema_update=False) + + is_valid, errors = validator.validate(self.valid_example, context) + self.assertTrue(is_valid, f"Valid example should pass validation. Errors: {errors}") + self.assertEqual(len(errors), 0, f"No errors expected for valid example. Errors: {errors}") + + def test_invalid_v2_0_0_example_validation(self): + """Test validation of the official invalid v2.0.0 example.""" + validator = ValidatorFactory.create_validator_chain("2.0.0") + context = ValidationContext(force_schema_update=False) + + is_valid, errors = validator.validate(self.invalid_example, context) + self.assertFalse(is_valid, "Invalid example should fail validation") + self.assertGreater(len(errors), 0, "Invalid example should produce errors") + + def test_v2_0_0_package_service_with_valid_example(self): + """Test PackageService with the official valid v2.0.0 example.""" + service = PackageService() + service.load_metadata(self.valid_example) + + # Test basic field access + self.assertEqual(service.get_field("name"), self.valid_example.get("name")) + self.assertEqual(service.get_field("description"), self.valid_example.get("description")) + + # Test authors array (v2.0.0 specific) + authors = service.get_field("authors") + self.assertIsInstance(authors, list) + self.assertGreater(len(authors), 0) + + # Test tools with desc field (v2.0.0 specific) + tools = service.get_tools() + if tools: + for tool in tools: + if "desc" in tool: + self.assertIsInstance(tool["desc"], str) + + # Test dependencies (should include Docker with digest) + dependencies = service.get_dependencies() + self.assertIsInstance(dependencies, dict) + + def test_v2_0_0_package_service_with_invalid_example(self): + """Test PackageService with the official invalid v2.0.0 example.""" + service = PackageService() + # Invalid example should still load (service doesn't validate, just accesses) + service.load_metadata(self.invalid_example) + + # Basic access should still work + self.assertEqual(service.get_field("name"), self.invalid_example.get("name")) + + def test_v2_0_0_schema_routing(self): + """Test that v2.0.0 packages route correctly based on hatch_schema_version.""" + # Test with hatch_schema_version + metadata_with_hatch = self.valid_example.copy() + metadata_with_hatch["hatch_schema_version"] = "2.0.0" + + service = PackageService() + service.load_metadata(metadata_with_hatch) + self.assertTrue(service.is_loaded()) + + # Test with package_schema_version (backward compatibility) + metadata_with_package = self.valid_example.copy() + metadata_with_package["package_schema_version"] = "2.0.0" + if "hatch_schema_version" in metadata_with_package: + del metadata_with_package["hatch_schema_version"] + + service2 = PackageService() + service2.load_metadata(metadata_with_package) + self.assertTrue(service2.is_loaded()) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file From 1509b51ef473acdba8d374883cc9881450b7c41d Mon Sep 17 00:00:00 2001 From: Eliott Jacopin Date: Wed, 15 Apr 2026 21:43:50 +0900 Subject: [PATCH 2/9] refactor(schema-v200): delegate tools and entry-point to chain Co-Authored-By: Claude Sonnet 4.6 --- .../package/v2_0_0/entry_point_validation.py | 135 ------------------ .../package/v2_0_0/tools_validation.py | 132 ----------------- 2 files changed, 267 deletions(-) delete mode 100644 hatch_validator/package/v2_0_0/entry_point_validation.py delete mode 100644 hatch_validator/package/v2_0_0/tools_validation.py diff --git a/hatch_validator/package/v2_0_0/entry_point_validation.py b/hatch_validator/package/v2_0_0/entry_point_validation.py deleted file mode 100644 index 4ecaba2..0000000 --- a/hatch_validator/package/v2_0_0/entry_point_validation.py +++ /dev/null @@ -1,135 +0,0 @@ -"""Entry point validation strategy for v2.0.0. - -This module provides the entry point validation strategy for schema version 2.0.0, -which validates dual entry point configuration (FastMCP server + HatchMCP wrapper). -""" - -import ast -import logging -from pathlib import Path -from typing import Dict, List, Tuple - -from hatch_validator.core.validation_strategy import EntryPointValidationStrategy -from hatch_validator.core.validation_context import ValidationContext - - -# Configure logging -logger = logging.getLogger("hatch.schema.v2_0_0.entry_point_validation") - - -class EntryPointValidation(EntryPointValidationStrategy): - """Strategy for validating dual entry point files for v2.0.0.""" - - def validate_entry_point(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate dual entry point according to v2.0.0 schema. - - Args: - metadata (Dict): Package metadata containing entry point information - context (ValidationContext): Validation context with resources - - Returns: - Tuple[bool, List[str]]: Tuple containing: - - bool: Whether entry point validation was successful - - List[str]: List of entry point validation errors - """ - entry_point = metadata.get('entry_point') - if not entry_point: - logger.error("No entry_point specified in metadata") - return False, ["No entry_point specified in metadata"] - - if not isinstance(entry_point, dict): - logger.error("entry_point must be an object for schema v2.0.0") - return False, ["entry_point must be an object for schema v2.0.0"] - - if not context.package_dir: - logger.error("Package directory not provided for entry point validation") - return False, ["Package directory not provided for entry point validation"] - - errors = [] - - mcp_server = entry_point.get('mcp_server') - hatch_mcp_server = entry_point.get('hatch_mcp_server') - - mcp_server_valid, mcp_server_errors = self._validate_file_exists(mcp_server, context, "FastMCP server") - if not mcp_server_valid: - errors.extend(mcp_server_errors) - - hatch_wrapper_valid, hatch_wrapper_errors = self._validate_file_exists(hatch_mcp_server, context, "HatchMCP wrapper") - if not hatch_wrapper_valid: - errors.extend(hatch_wrapper_errors) - - if mcp_server_valid and hatch_wrapper_valid: - import_valid, import_errors = self._validate_import_relationship( - mcp_server, hatch_mcp_server, context - ) - if not import_valid: - errors.extend(import_errors) - - if errors: - logger.error(f"Entry point validation failed with {len(errors)} errors") - return False, errors - - logger.debug("Dual entry point validation successful") - return True, [] - - def _validate_file_exists(self, filename: str, context: ValidationContext, file_type: str) -> Tuple[bool, List[str]]: - """Validate that a file exists and is accessible.""" - if not filename: - error_msg = f"{file_type} filename not specified" - logger.error(error_msg) - return False, [error_msg] - - file_path = context.package_dir / filename - - if not file_path.exists(): - error_msg = f"{file_type} file '{filename}' does not exist" - logger.error(error_msg) - return False, [error_msg] - - if not file_path.is_file(): - error_msg = f"{file_type} '{filename}' is not a file" - logger.error(error_msg) - return False, [error_msg] - - if not filename.endswith('.py'): - error_msg = f"{file_type} '{filename}' must be a Python file (.py)" - logger.error(error_msg) - return False, [error_msg] - - logger.debug(f"{file_type} file '{filename}' exists and is valid") - return True, [] - - def _validate_import_relationship(self, mcp_server: str, hatch_wrapper: str, context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate that HatchMCP wrapper imports from FastMCP server.""" - try: - wrapper_path = context.package_dir / hatch_wrapper - with open(wrapper_path, 'r', encoding='utf-8') as f: - source_code = f.read() - - tree = ast.parse(source_code) - expected_module = mcp_server.replace('.py', '') - - for node in ast.walk(tree): - if isinstance(node, ast.ImportFrom): - if node.module == expected_module: - for alias in node.names: - if alias.name == 'mcp': - logger.debug(f"Found valid import: from {expected_module} import mcp") - return True, [] - - error_msg = f"HatchMCP wrapper must import 'mcp' from '{expected_module}'" - logger.error(error_msg) - return False, [error_msg, f"Expected: from {expected_module} import mcp"] - - except SyntaxError as e: - error_msg = f"Syntax error in HatchMCP wrapper '{hatch_wrapper}' at line {e.lineno}: {e.msg}" - logger.error(error_msg) - return False, [error_msg] - except FileNotFoundError: - error_msg = f"HatchMCP wrapper file '{hatch_wrapper}' not found" - logger.error(error_msg) - return False, [error_msg] - except Exception as e: - error_msg = f"Error validating import relationship: {str(e)}" - logger.error(error_msg) - return False, [error_msg] diff --git a/hatch_validator/package/v2_0_0/tools_validation.py b/hatch_validator/package/v2_0_0/tools_validation.py deleted file mode 100644 index a413dad..0000000 --- a/hatch_validator/package/v2_0_0/tools_validation.py +++ /dev/null @@ -1,132 +0,0 @@ -"""Tools validation strategy for v2.0.0. - -This module provides the tools validation strategy for schema version 2.0.0. -It validates tool declarations and ensures that tool metadata uses the -new `desc` field instead of the legacy `description` field. -""" - -import ast -import logging -from typing import Dict, List, Tuple, Set - -from hatch_validator.core.validation_strategy import ToolsValidationStrategy -from hatch_validator.core.validation_context import ValidationContext - -# Configure logging -logger = logging.getLogger("hatch.schema.v2_0_0.tools_validation") - - -class ToolsValidation(ToolsValidationStrategy): - """Strategy for validating tools for v2.0.0.""" - - def validate_tools(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate tools according to v2.0.0 schema.""" - tools = metadata.get('tools', []) - if not tools: - logger.debug("No tools declared in metadata") - return True, [] - - entry_point = metadata.get('entry_point') - if not entry_point or not isinstance(entry_point, dict): - logger.error("Dual entry point configuration required for tool validation") - return False, ["Dual entry point configuration required for tool validation"] - - mcp_server_file = entry_point.get('mcp_server') - if not mcp_server_file: - logger.error("FastMCP server file not specified in entry point") - return False, ["FastMCP server file not specified in entry point"] - - if not context.package_dir: - logger.error("Package directory not provided for tool validation") - return False, ["Package directory not provided for tool validation"] - - server_tools, extraction_errors = self._extract_fastmcp_tools(mcp_server_file, context) - if extraction_errors: - logger.error(f"Failed to extract tools from FastMCP server: {extraction_errors}") - return False, extraction_errors - - missing_tools = [] - deprecated_fields = [] - for tool in tools: - if 'description' in tool: - deprecated_fields.append( - "Tool metadata uses deprecated 'description' field; use 'desc' instead" - ) - - tool_name = tool.get('name') - if not tool_name: - logger.error(f"Tool metadata missing name: {tool}") - missing_tools.append("Tool missing name in metadata") - continue - - if tool_name not in server_tools: - logger.error(f"Tool '{tool_name}' not found in FastMCP server '{mcp_server_file}'") - missing_tools.append(f"Tool '{tool_name}' not found in FastMCP server '{mcp_server_file}'") - - errors = missing_tools + deprecated_fields - if errors: - if missing_tools: - errors.append( - "Tools must be defined in FastMCP server to ensure availability when imported independently" - ) - return False, errors - - logger.debug(f"All {len(tools)} declared tools found in FastMCP server") - return True, [] - - def _extract_fastmcp_tools(self, server_file: str, context: ValidationContext) -> Tuple[Set[str], List[str]]: - """Extract tool names from @mcp.tool() decorators in FastMCP server file.""" - try: - file_path = context.package_dir / server_file - if not file_path.exists(): - error_msg = f"FastMCP server file '{server_file}' not found" - logger.error(error_msg) - return set(), [error_msg] - - with open(file_path, 'r', encoding='utf-8') as f: - source_code = f.read() - - tree = ast.parse(source_code) - tool_names = set() - for node in ast.walk(tree): - if isinstance(node, ast.FunctionDef): - for decorator in node.decorator_list: - if self._is_mcp_tool_decorator(decorator): - tool_names.add(node.name) - logger.debug(f"Found tool '{node.name}' in FastMCP server") - break - - logger.debug(f"Extracted {len(tool_names)} tools from FastMCP server: {tool_names}") - return tool_names, [] - - except SyntaxError as e: - error_msg = f"Syntax error in FastMCP server '{server_file}' at line {e.lineno}: {e.msg}" - logger.error(error_msg) - return set(), [error_msg] - except FileNotFoundError: - error_msg = f"FastMCP server file '{server_file}' not found" - logger.error(error_msg) - return set(), [error_msg] - except Exception as e: - error_msg = f"Error parsing FastMCP server '{server_file}': {str(e)}" - logger.error(error_msg) - return set(), [error_msg] - - def _is_mcp_tool_decorator(self, decorator) -> bool: - """Check if decorator is @mcp.tool() or @mcp.tool.""" - if isinstance(decorator, ast.Call): - if isinstance(decorator.func, ast.Attribute): - return ( - decorator.func.attr == 'tool' and - isinstance(decorator.func.value, ast.Name) and - decorator.func.value.id == 'mcp' - ) - - if isinstance(decorator, ast.Attribute): - return ( - decorator.attr == 'tool' and - isinstance(decorator.value, ast.Name) and - decorator.value.id == 'mcp' - ) - - return False From 8255279e6045acb1c5802c6b5e12b5b637ca2c56 Mon Sep 17 00:00:00 2001 From: Eliott Jacopin Date: Wed, 15 Apr 2026 21:44:17 +0900 Subject: [PATCH 3/9] refactor(schema-v200): delegate non-docker deps to v1.2.2 Co-Authored-By: Claude Sonnet 4.6 --- .../package/v2_0_0/dependency_validation.py | 319 ++---------------- 1 file changed, 22 insertions(+), 297 deletions(-) diff --git a/hatch_validator/package/v2_0_0/dependency_validation.py b/hatch_validator/package/v2_0_0/dependency_validation.py index 14ec1f2..2ce2545 100644 --- a/hatch_validator/package/v2_0_0/dependency_validation.py +++ b/hatch_validator/package/v2_0_0/dependency_validation.py @@ -1,21 +1,15 @@ """Dependency validation strategy for schema version v2.0.0. This module implements dependency validation for schema version 2.0.0. -Docker dependencies now require tag and digest instead of version_constraint, -and version_constraint is optional for all dependency types. +Only Docker-specific validation is owned here; Hatch, Python, and System +dependency validation is delegated to v1.2.2 via the chain. """ -import json import logging -import re -from typing import Dict, List, Tuple, Optional -from pathlib import Path +from typing import Dict, List, Tuple -from hatch_validator.core.validation_strategy import DependencyValidationStrategy, ValidationError +from hatch_validator.core.validation_strategy import DependencyValidationStrategy from hatch_validator.core.validation_context import ValidationContext -from hatch_validator.utils.hatch_dependency_graph import HatchDependencyGraphBuilder -from hatch_validator.utils.version_utils import VersionConstraintValidator -from hatch_validator.registry.registry_service import RegistryService from hatch_validator.package.package_service import PackageService logger = logging.getLogger("hatch.dependency_validation_v2_0_0") @@ -23,15 +17,14 @@ class DependencyValidation(DependencyValidationStrategy): - """Strategy for validating dependencies according to v2.0.0 schema.""" - - def __init__(self): - """Initialize the dependency validation strategy.""" - self.version_validator = VersionConstraintValidator() - self.registry_service: Optional[RegistryService] = None + """Strategy for validating Docker dependencies according to v2.0.0 schema.""" def validate_dependencies(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate dependencies according to v2.0.0 schema. + """Validate Docker dependencies according to v2.0.0 schema. + + Hatch, Python, and System dependency validation is handled by the + previous validator in the chain. This strategy validates only the + Docker subset, which requires a digest in v2.0.0. Args: metadata (Dict): Package metadata containing dependency information @@ -39,50 +32,19 @@ def validate_dependencies(self, metadata: Dict, context: ValidationContext) -> T Returns: Tuple[bool, List[str]]: Tuple containing: - - bool: Whether dependency validation was successful - - List[str]: List of dependency validation errors + - bool: Whether Docker dependency validation was successful + - List[str]: List of Docker dependency validation errors """ try: package_service = context.get_data("package_service", None) if package_service is None: package_service = PackageService(metadata) - self.package_service = package_service - - registry_data = context.registry_data - registry_service = context.get_data("registry_service", None) - if registry_data is None: - logger.error("No registry data available for dependency validation") - raise ValidationError("No registry data available for dependency validation") - if registry_service is None: - registry_service = RegistryService(registry_data) - self.registry_service = registry_service - - errors = [] - is_valid = True dependencies = package_service.get_dependencies() - hatch_dependencies = dependencies.get('hatch', []) - python_dependencies = dependencies.get('python', []) - system_dependencies = dependencies.get('system', []) docker_dependencies = dependencies.get('docker', []) - if hatch_dependencies: - hatch_valid, hatch_errors = self._validate_hatch_dependencies(hatch_dependencies, context) - if not hatch_valid: - errors.extend(hatch_errors) - is_valid = False - - if python_dependencies: - python_valid, python_errors = self._validate_python_dependencies(python_dependencies, context) - if not python_valid: - errors.extend(python_errors) - is_valid = False - - if system_dependencies: - system_valid, system_errors = self._validate_system_dependencies(system_dependencies, context) - if not system_valid: - errors.extend(system_errors) - is_valid = False + errors = [] + is_valid = True if docker_dependencies: docker_valid, docker_errors = self._validate_docker_dependencies(docker_dependencies, context) @@ -91,106 +53,10 @@ def validate_dependencies(self, metadata: Dict, context: ValidationContext) -> T is_valid = False except Exception as e: - logger.error(f"Error during dependency validation: {e}") - errors = [f"Error during dependency validation: {e}"] - is_valid = False - - logger.debug(f"Dependency validation result: {is_valid}, errors: {errors}") - return is_valid, errors - - def _validate_python_dependencies(self, python_dependencies: List[Dict], - context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate Python package dependencies with optional version constraints.""" - errors = [] - is_valid = True - - for dep in python_dependencies: - dep_valid, dep_errors = self._validate_single_python_dependency(dep, context) - if not dep_valid: - errors.extend(dep_errors) - is_valid = False - - return is_valid, errors - - def _validate_single_python_dependency(self, dep: Dict, - context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate a single Python dependency.""" - errors = [] - is_valid = True - - dep_name = dep.get('name') - if not dep_name: - errors.append("Python dependency missing name") - return False, errors - - version_constraint = dep.get('version_constraint') - if version_constraint: - constraint_valid, constraint_error = self.version_validator.validate_constraint(version_constraint) - if not constraint_valid: - errors.append(f"Invalid version constraint for Python package '{dep_name}': {constraint_error}") - is_valid = False - - package_manager = dep.get('package_manager', 'pip') - if package_manager not in ['pip', 'conda']: - errors.append( - f"Invalid package_manager '{package_manager}' for Python package '{dep_name}'. Must be 'pip' or 'conda'" - ) - is_valid = False - - channel = dep.get('channel') - if channel is not None: - if package_manager != 'conda': - errors.append( - f"Channel '{channel}' specified for Python package '{dep_name}' with package_manager '{package_manager}'. Channel is only valid for conda packages" - ) - is_valid = False - else: - channel_pattern = r'^[a-zA-Z0-9_\-]+$' - if not re.match(channel_pattern, channel): - errors.append( - f"Invalid channel format '{channel}' for Python package '{dep_name}'. Must match pattern: {channel_pattern}" - ) - is_valid = False - - return is_valid, errors - - def _validate_system_dependencies(self, system_dependencies: List[Dict], - context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate system package dependencies.""" - errors = [] - is_valid = True - - for dep in system_dependencies: - dep_valid, dep_errors = self._validate_single_system_dependency(dep, context) - if not dep_valid: - errors.extend(dep_errors) - is_valid = False - - return is_valid, errors - - def _validate_single_system_dependency(self, dep: Dict, - context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate a single system dependency.""" - errors = [] - is_valid = True - - dep_name = dep.get('name') - if not dep_name: - errors.append("System dependency missing name") - return False, errors - - version_constraint = dep.get('version_constraint') - if version_constraint: - constraint_valid, constraint_error = self.version_validator.validate_constraint(version_constraint) - if not constraint_valid: - errors.append(f"Invalid version constraint for system package '{dep_name}': {constraint_error}") - is_valid = False - - package_manager = dep.get('package_manager') - if package_manager is not None and not isinstance(package_manager, str): - errors.append(f"Invalid package_manager for system package '{dep_name}'. Must be a string") - is_valid = False + logger.error(f"Error during Docker dependency validation: {e}") + return False, [f"Error during Docker dependency validation: {e}"] + logger.debug(f"Docker dependency validation result: {is_valid}, errors: {errors}") return is_valid, errors def _validate_docker_dependencies(self, docker_dependencies: List[Dict], @@ -209,7 +75,11 @@ def _validate_docker_dependencies(self, docker_dependencies: List[Dict], def _validate_single_docker_dependency(self, dep: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate a single Docker dependency.""" + """Validate a single Docker dependency. + + Structural checks (digest presence, digest pattern, version_constraint rejection) + are enforced by the JSON schema and are not repeated here. + """ errors = [] is_valid = True @@ -219,158 +89,13 @@ def _validate_single_docker_dependency(self, dep: Dict, return False, errors tag = dep.get('tag') - digest = dep.get('digest') - if not digest: - errors.append(f"Docker dependency '{dep_name}' missing required 'digest'") - is_valid = False if tag is not None and not isinstance(tag, str): errors.append(f"Invalid Docker tag for '{dep_name}'. Must be a string") is_valid = False - version_constraint = dep.get('version_constraint') - if version_constraint is not None: - errors.append( - f"Docker dependency '{dep_name}' should use 'tag' and 'digest' instead of 'version_constraint'" - ) - is_valid = False - registry = dep.get('registry') if registry is not None and not isinstance(registry, str): errors.append(f"Invalid registry value for Docker dependency '{dep_name}'. Must be a string") is_valid = False - if digest and isinstance(digest, str): - digest_pattern = r'^[A-Za-z0-9_+.-]+:[A-Fa-f0-9]{32,}$' - if not re.match(digest_pattern, digest): - errors.append( - f"Invalid Docker digest '{digest}' for '{dep_name}'. Must match pattern ':'" - ) - is_valid = False - - return is_valid, errors - - def _validate_hatch_dependencies(self, hatch_dependencies: List[Dict], - context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate Hatch package dependencies.""" - errors = [] - is_valid = True - - for dep in hatch_dependencies: - dep_valid, dep_errors = self._validate_single_hatch_dependency(dep, context) - if not dep_valid: - errors.extend(dep_errors) - is_valid = False - - try: - hatch_dep_graph_builder = HatchDependencyGraphBuilder( - package_service=self.package_service, - registry_service=self.registry_service - ) - dependency_graph = hatch_dep_graph_builder.build_dependency_graph(hatch_dependencies, context) - logger.debug(f"Dependency graph: {json.dumps(dependency_graph.to_dict(), indent=2)}") - - has_cycles, cycles = dependency_graph.detect_cycles() - if has_cycles: - for cycle in cycles: - cycle_str = " -> ".join(cycle) - errors.append(f"Circular dependency detected: {cycle_str}") - is_valid = False - except Exception as e: - logger.error(f"Error building dependency graph: {e}") - errors.append(f"Error analyzing dependency graph: {e}") - is_valid = False - return is_valid, errors - - def _parse_hatch_dep_name(self, dep_name: str) -> Tuple[Optional[str], str]: - """Parse a hatch dependency name into (repo, package_name).""" - if ':' in dep_name: - repo, pkg = dep_name.split(':', 1) - return repo, pkg - return None, dep_name - - def _validate_single_hatch_dependency(self, dep: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate a single Hatch dependency.""" - errors = [] - is_valid = True - - dep_name = dep.get('name') - if not dep_name: - errors.append("Hatch dependency missing name") - return False, errors - - version_constraint = dep.get('version_constraint') - if version_constraint: - constraint_valid, constraint_error = self.version_validator.validate_constraint(version_constraint) - if not constraint_valid: - errors.append(f"Invalid version constraint for '{dep_name}': {constraint_error}") - is_valid = False - - if self.package_service.is_local_dependency(dep, context.package_dir): - if not context.allow_local_dependencies: - errors.append(f"Local dependency '{dep_name}' not allowed in this context") - return False, errors - local_valid, local_errors = self._validate_local_dependency(dep, context) - if not local_valid: - errors.extend(local_errors) - is_valid = False - else: - registry_valid, registry_errors = self._validate_registry_dependency(dep, context) - if not registry_valid: - errors.extend(registry_errors) - is_valid = False - - return is_valid, errors - - def _validate_local_dependency(self, dep: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate a local file dependency.""" - errors = [] - dep_name = dep.get('name') - - path = Path(dep_name) - if context.package_dir and not path.is_absolute(): - path = context.package_dir / path - - if path.exists(): - if not path.is_dir(): - errors.append(f"Local dependency '{dep_name}' path is not a directory: {path}") - return False, errors - else: - errors.append(f"Local dependency '{dep_name}' path is not a directory: {path}") - return False, errors - - metadata_path = path / "hatch_metadata.json" - if not metadata_path.exists(): - errors.append(f"Local dependency '{dep_name}' missing hatch_metadata.json: {metadata_path}") - return False, errors - - return True, [] - - def _validate_registry_dependency(self, dep: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate a registry dependency.""" - errors = [] - dep_name = dep.get('name') - version_constraint = dep.get('version_constraint') - - repo, pkg = self._parse_hatch_dep_name(dep_name) - - if repo: - if not self.registry_service.repository_exists(repo): - errors.append(f"Repository '{repo}' not found in registry for dependency '{dep_name}'") - return False, errors - if not self.registry_service.package_exists(pkg, repo_name=repo): - errors.append(f"Package '{pkg}' not found in repository '{repo}' for dependency '{dep_name}'") - return False, errors - else: - if not self.registry_service.package_exists(pkg): - errors.append(f"Registry dependency '{pkg}' not found in registry for dependency '{dep_name}'") - return False, errors - - if version_constraint: - version_compatible, version_error = self.registry_service.validate_version_compatibility( - dep_name, version_constraint) - if not version_compatible: - errors.append(f"No version of '{dep_name}' satisfies constraint {version_constraint}: {version_error}") - return False, errors - - return True, [] From 150a1ad2877cf49b2ca523e05ea60482f4b0aaed Mon Sep 17 00:00:00 2001 From: Eliott Jacopin Date: Wed, 15 Apr 2026 21:45:27 +0900 Subject: [PATCH 4/9] refactor(schema-v200): remove over-owned strategies from validator Remove ToolsValidation and EntryPointValidation imports and assignments. validate_tools and validate_entry_point now delegate via next_validator. validate_dependencies delegates Hatch/Python/System to v1.2.2 via next_validator and handles only the Docker-specific subset owned by v2.0.0. Co-Authored-By: Claude Sonnet 4.6 --- hatch_validator/package/v2_0_0/validator.py | 71 ++++++++++++++++----- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/hatch_validator/package/v2_0_0/validator.py b/hatch_validator/package/v2_0_0/validator.py index e0aac59..2fdff81 100644 --- a/hatch_validator/package/v2_0_0/validator.py +++ b/hatch_validator/package/v2_0_0/validator.py @@ -13,8 +13,6 @@ from .dependency_validation import DependencyValidation from .schema_validation import SchemaValidation -from .tools_validation import ToolsValidation -from .entry_point_validation import EntryPointValidation from .provenance_validation import ProvenanceValidation from .citations_validation import CitationsValidation @@ -43,8 +41,6 @@ def __init__(self, next_validator=None): super().__init__(next_validator) self.schema_strategy = SchemaValidation() self.dependency_strategy = DependencyValidation() - self.tools_strategy = ToolsValidation() - self.entry_point_strategy = EntryPointValidation() self.provenance_strategy = ProvenanceValidation() self.citations_strategy = CitationsValidation() @@ -113,11 +109,11 @@ def validate(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, Li # 5. Validate entry point and tools if package directory is available if context.package_dir: - entry_valid, entry_errors = self.entry_point_strategy.validate_entry_point(metadata, context) + entry_valid, entry_errors = self.validate_entry_point(metadata, context) if not entry_valid: all_errors.extend(entry_errors) is_valid = False - + if entry_valid: tools_valid, tools_errors = self.validate_tools(metadata, context) if not tools_valid: @@ -141,19 +137,37 @@ def validate_schema(self, metadata: Dict, context: ValidationContext) -> Tuple[b def validate_dependencies(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: """Validate dependencies for v2.0.0. - - This method implements the new unified dependency structure and the - v2.0.0 Docker dependency contract. - + + Hatch, Python, and System dependency validation is unchanged from v1.2.2, + so those concerns are delegated to the next validator in the chain. + Only Docker-specific validation (digest-based, no version_constraint) is + owned here. + Args: metadata (Dict): Package metadata to validate context (ValidationContext): Validation context with resources - + Returns: Tuple[bool, List[str]]: Validation result and errors """ - logger.debug("Validating package dependencies for v2.0.0") - return self.dependency_strategy.validate_dependencies(metadata, context) + errors = [] + is_valid = True + + # Delegate Hatch, Python, System to v1.2.2 via chain + if self.next_validator: + next_valid, next_errors = self.next_validator.validate_dependencies(metadata, context) + if not next_valid: + errors.extend(next_errors) + is_valid = False + + # Docker-specific validation owned by v2.0.0 + logger.debug("Validating Docker dependencies for v2.0.0") + docker_valid, docker_errors = self.dependency_strategy.validate_dependencies(metadata, context) + if not docker_valid: + errors.extend(docker_errors) + is_valid = False + + return is_valid, errors def validate_provenance(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: """Validate provenance metadata for v2.0.0.""" @@ -165,13 +179,36 @@ def validate_citations(self, metadata: Dict, context: ValidationContext) -> Tupl def validate_tools(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: """Validate tools for v2.0.0. - + + Tools validation (declared tool names must match @mcp.tool()-decorated functions) + is unchanged from v1.2.1, so delegate to the next validator in the chain. + Args: metadata (Dict): Package metadata to validate context (ValidationContext): Validation context with resources - + + Returns: + Tuple[bool, List[str]]: Validation result and errors + """ + logger.debug("Delegating tools validation to v1.2.1 via chain") + if self.next_validator: + return self.next_validator.validate_tools(metadata, context) + return False, ["No validator available for tools validation"] + + def validate_entry_point(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate entry point for v2.0.0. + + Entry point validation (dual mcp_server + hatch_mcp_server file checks) + is unchanged from v1.2.1, so delegate to the next validator in the chain. + + Args: + metadata (Dict): Package metadata to validate + context (ValidationContext): Validation context with resources + Returns: Tuple[bool, List[str]]: Validation result and errors """ - logger.debug("Validating tools for v2.0.0") - return self.tools_strategy.validate_tools(metadata, context) + logger.debug("Delegating entry point validation to v1.2.1 via chain") + if self.next_validator: + return self.next_validator.validate_entry_point(metadata, context) + return False, ["No validator available for entry point validation"] From 113c57866ad4333a3718b984cfa86e8bc16a58ed Mon Sep 17 00:00:00 2001 From: Eliott Jacopin Date: Wed, 15 Apr 2026 21:59:46 +0900 Subject: [PATCH 5/9] refactor(schema-v200): own docker dep validation exclusively MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove validate_dependencies override — base class default delegates it fully to v1.2.2 for Hatch/Python/System. Add validate_docker_dependencies as a new v2.0.0-only concern, following the same pattern as validate_provenance and validate_citations. validate() calls it directly alongside the other new concerns. Co-Authored-By: Claude Sonnet 4.6 --- hatch_validator/package/v2_0_0/validator.py | 47 ++++++++------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/hatch_validator/package/v2_0_0/validator.py b/hatch_validator/package/v2_0_0/validator.py index 2fdff81..fd909e6 100644 --- a/hatch_validator/package/v2_0_0/validator.py +++ b/hatch_validator/package/v2_0_0/validator.py @@ -89,25 +89,31 @@ def validate(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, Li # If schema validation fails, don't continue with other validations return is_valid, all_errors - # 2. Validate dependencies according to the v2.0.0 dependency model + # 2. Validate Hatch/Python/System dependencies — unchanged from v1.2.2, delegated via chain deps_valid, deps_errors = self.validate_dependencies(metadata, context) if not deps_valid: all_errors.extend(deps_errors) is_valid = False - - # 3. Validate provenance metadata + + # 3. Validate Docker dependencies — new concern owned by v2.0.0 + docker_valid, docker_errors = self.validate_docker_dependencies(metadata, context) + if not docker_valid: + all_errors.extend(docker_errors) + is_valid = False + + # 5. Validate provenance metadata provenance_valid, provenance_errors = self.validate_provenance(metadata, context) if not provenance_valid: all_errors.extend(provenance_errors) is_valid = False - # 4. Validate citations metadata + # 6. Validate citations metadata citations_valid, citations_errors = self.validate_citations(metadata, context) if not citations_valid: all_errors.extend(citations_errors) is_valid = False - - # 5. Validate entry point and tools if package directory is available + + # 7. Validate entry point and tools if package directory is available if context.package_dir: entry_valid, entry_errors = self.validate_entry_point(metadata, context) if not entry_valid: @@ -135,13 +141,12 @@ def validate_schema(self, metadata: Dict, context: ValidationContext) -> Tuple[b logger.debug("Validating package metadata against v2.0.0 schema") return self.schema_strategy.validate_schema(metadata, context) - def validate_dependencies(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate dependencies for v2.0.0. + def validate_docker_dependencies(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: + """Validate Docker dependencies for v2.0.0. - Hatch, Python, and System dependency validation is unchanged from v1.2.2, - so those concerns are delegated to the next validator in the chain. - Only Docker-specific validation (digest-based, no version_constraint) is - owned here. + Docker dependencies are new in v2.0.0 (digest-based, no version_constraint). + This is a new concern owned entirely by v2.0.0, analogous to validate_provenance + and validate_citations. Args: metadata (Dict): Package metadata to validate @@ -150,24 +155,8 @@ def validate_dependencies(self, metadata: Dict, context: ValidationContext) -> T Returns: Tuple[bool, List[str]]: Validation result and errors """ - errors = [] - is_valid = True - - # Delegate Hatch, Python, System to v1.2.2 via chain - if self.next_validator: - next_valid, next_errors = self.next_validator.validate_dependencies(metadata, context) - if not next_valid: - errors.extend(next_errors) - is_valid = False - - # Docker-specific validation owned by v2.0.0 logger.debug("Validating Docker dependencies for v2.0.0") - docker_valid, docker_errors = self.dependency_strategy.validate_dependencies(metadata, context) - if not docker_valid: - errors.extend(docker_errors) - is_valid = False - - return is_valid, errors + return self.dependency_strategy.validate_dependencies(metadata, context) def validate_provenance(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: """Validate provenance metadata for v2.0.0.""" From f5f4fb123fa312e974b9df4bfc63c20eec4a75f6 Mon Sep 17 00:00:00 2001 From: Eliott Jacopin Date: Wed, 15 Apr 2026 22:47:33 +0900 Subject: [PATCH 6/9] refactor(schema-v200): drop citations and provenance strategies Co-Authored-By: Claude Sonnet 4.6 --- .../package/v2_0_0/citations_validation.py | 39 ------------------- .../package/v2_0_0/provenance_validation.py | 33 ---------------- 2 files changed, 72 deletions(-) delete mode 100644 hatch_validator/package/v2_0_0/citations_validation.py delete mode 100644 hatch_validator/package/v2_0_0/provenance_validation.py diff --git a/hatch_validator/package/v2_0_0/citations_validation.py b/hatch_validator/package/v2_0_0/citations_validation.py deleted file mode 100644 index 46f9766..0000000 --- a/hatch_validator/package/v2_0_0/citations_validation.py +++ /dev/null @@ -1,39 +0,0 @@ -"""Citations validation for schema version 2.0.0. - -This module provides optional citations validation logic beyond JSON schema. -""" - -import logging -from typing import Dict, List, Tuple - -logger = logging.getLogger("hatch.schema.v2_0_0.citations_validation") - - -class CitationsValidation: - """Strategy for validating package citations metadata for v2.0.0.""" - - def validate_citations(self, metadata: Dict, context) -> Tuple[bool, List[str]]: - """Validate citations metadata for v2.0.0.""" - citations = metadata.get('citations') - if citations is None: - return True, [] - - if not isinstance(citations, list): - return False, ["Citations must be a list of citation objects"] - - errors = [] - for index, citation in enumerate(citations): - if not isinstance(citation, dict): - errors.append(f"Citation at index {index} must be an object") - continue - - if 'format' not in citation or not isinstance(citation['format'], str): - errors.append(f"Citation at index {index} must include a string 'format'") - if 'value' not in citation or not isinstance(citation['value'], str): - errors.append(f"Citation at index {index} must include a string 'value'") - if 'note' in citation and not isinstance(citation['note'], str): - errors.append(f"Citation at index {index} field 'note' must be a string") - - if errors: - return False, errors - return True, [] diff --git a/hatch_validator/package/v2_0_0/provenance_validation.py b/hatch_validator/package/v2_0_0/provenance_validation.py deleted file mode 100644 index 2a5f386..0000000 --- a/hatch_validator/package/v2_0_0/provenance_validation.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Provenance validation for schema version 2.0.0. - -This module provides optional provenance validation logic beyond JSON schema. -""" - -import logging -from typing import Dict, List, Tuple - -logger = logging.getLogger("hatch.schema.v2_0_0.provenance_validation") - - -class ProvenanceValidation: - """Strategy for validating package provenance metadata for v2.0.0.""" - - def validate_provenance(self, metadata: Dict, context) -> Tuple[bool, List[str]]: - """Validate provenance metadata for v2.0.0.""" - provenance = metadata.get('provenance') - if provenance is None: - return True, [] - - if not isinstance(provenance, dict): - return False, ["Provenance must be an object"] - - if not provenance: - return False, ["Provenance metadata must not be empty"] - - errors = [] - if 'source' in provenance and not isinstance(provenance['source'], str): - errors.append("Provenance.source must be a string") - - if errors: - return False, errors - return True, [] From 5b5d7283fec54acea67c19b37a1d16b40434844c Mon Sep 17 00:00:00 2001 From: Eliott Jacopin Date: Wed, 15 Apr 2026 22:48:16 +0900 Subject: [PATCH 7/9] refactor(schema-v200): remove citations and provenance dead code Co-Authored-By: Claude Sonnet 4.6 --- hatch_validator/package/v2_0_0/validator.py | 29 ++------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/hatch_validator/package/v2_0_0/validator.py b/hatch_validator/package/v2_0_0/validator.py index fd909e6..e5d9bce 100644 --- a/hatch_validator/package/v2_0_0/validator.py +++ b/hatch_validator/package/v2_0_0/validator.py @@ -13,8 +13,6 @@ from .dependency_validation import DependencyValidation from .schema_validation import SchemaValidation -from .provenance_validation import ProvenanceValidation -from .citations_validation import CitationsValidation # Configure logging @@ -41,8 +39,6 @@ def __init__(self, next_validator=None): super().__init__(next_validator) self.schema_strategy = SchemaValidation() self.dependency_strategy = DependencyValidation() - self.provenance_strategy = ProvenanceValidation() - self.citations_strategy = CitationsValidation() def can_handle(self, schema_version: str) -> bool: """Determine if this validator can handle the given schema version. @@ -101,19 +97,7 @@ def validate(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, Li all_errors.extend(docker_errors) is_valid = False - # 5. Validate provenance metadata - provenance_valid, provenance_errors = self.validate_provenance(metadata, context) - if not provenance_valid: - all_errors.extend(provenance_errors) - is_valid = False - - # 6. Validate citations metadata - citations_valid, citations_errors = self.validate_citations(metadata, context) - if not citations_valid: - all_errors.extend(citations_errors) - is_valid = False - - # 7. Validate entry point and tools if package directory is available + # 4. Validate entry point and tools if package directory is available if context.package_dir: entry_valid, entry_errors = self.validate_entry_point(metadata, context) if not entry_valid: @@ -145,8 +129,7 @@ def validate_docker_dependencies(self, metadata: Dict, context: ValidationContex """Validate Docker dependencies for v2.0.0. Docker dependencies are new in v2.0.0 (digest-based, no version_constraint). - This is a new concern owned entirely by v2.0.0, analogous to validate_provenance - and validate_citations. + This is a new concern owned entirely by v2.0.0. Args: metadata (Dict): Package metadata to validate @@ -158,14 +141,6 @@ def validate_docker_dependencies(self, metadata: Dict, context: ValidationContex logger.debug("Validating Docker dependencies for v2.0.0") return self.dependency_strategy.validate_dependencies(metadata, context) - def validate_provenance(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate provenance metadata for v2.0.0.""" - return self.provenance_strategy.validate_provenance(metadata, context) - - def validate_citations(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: - """Validate citations metadata for v2.0.0.""" - return self.citations_strategy.validate_citations(metadata, context) - def validate_tools(self, metadata: Dict, context: ValidationContext) -> Tuple[bool, List[str]]: """Validate tools for v2.0.0. From 88767e8978f41e5b412c271b1e921492d372b0c8 Mon Sep 17 00:00:00 2001 From: Eliott Jacopin Date: Wed, 15 Apr 2026 23:03:14 +0900 Subject: [PATCH 8/9] test(schema-v200): drop tautological tests, keep 3 behavioral Deleted 13 tautological tests from test_package_validator_for_v2_0_0.py (all asserting only isinstance(errors, list) or restating implementation) and deleted test_v2_0_0_integration.py entirely (7/7 tests either checked factory boilerplate or depended on GitHub HTTP fetches that 404). Kept: test_authors_array_access (v2.0.0 structural change: authors is an array), test_v2_0_0_validator_can_handle, test_v2_0_0_accessor_can_handle. Co-Authored-By: Claude Sonnet 4.6 --- tests/test_package_validator_for_v2_0_0.py | 231 +-------------------- tests/test_v2_0_0_integration.py | 123 ----------- 2 files changed, 4 insertions(+), 350 deletions(-) delete mode 100644 tests/test_v2_0_0_integration.py diff --git a/tests/test_package_validator_for_v2_0_0.py b/tests/test_package_validator_for_v2_0_0.py index 2f509fa..40c0f0d 100644 --- a/tests/test_package_validator_for_v2_0_0.py +++ b/tests/test_package_validator_for_v2_0_0.py @@ -1,20 +1,14 @@ """Unit tests for v2.0.0 package validator specific features. -Tests individual validation strategies and accessor behaviors for v2.0.0 schema: -- Schema routing (hatch_schema_version vs package_schema_version) -- Citations validation strategy -- Provenance validation strategy -- Docker digest requirements and version_constraint rejection -- Tools desc field handling (preferred over deprecated description) -- Authors array access (instead of single author object) +Tests behavioral differences introduced by v2.0.0: +- authors is an array (v1.x had a single author object) +- Validator and accessor routing for schema version "2.0.0" """ import unittest -from unittest.mock import Mock, patch from hatch_validator.core.validator_factory import ValidatorFactory from hatch_validator.core.pkg_accessor_factory import HatchPkgAccessorFactory -from hatch_validator.core.validation_context import ValidationContext from hatch_validator.package.package_service import PackageService @@ -25,37 +19,6 @@ def setUp(self): """Set up test fixtures.""" self.validator = ValidatorFactory.create_validator_chain("2.0.0") self.accessor = HatchPkgAccessorFactory.create_accessor_chain("2.0.0") - self.context = ValidationContext(force_schema_update=False) - - def test_schema_routing_hatch_schema_version(self): - """Test that packages with hatch_schema_version route to v2.0.0.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "description": "Test package", - "authors": [{"name": "Test Author", "email": "test@example.com"}], - "version": "1.0.0" - } - - service = PackageService() - service.load_metadata(metadata) - self.assertTrue(service.is_loaded()) - self.assertEqual(service.get_field("name"), "test-package") - - def test_schema_routing_package_schema_version_fallback(self): - """Test that packages with package_schema_version still route to v2.0.0 for backward compatibility.""" - metadata = { - "package_schema_version": "2.0.0", - "name": "test-package", - "description": "Test package", - "authors": [{"name": "Test Author", "email": "test@example.com"}], - "version": "1.0.0" - } - - service = PackageService() - service.load_metadata(metadata) - self.assertTrue(service.is_loaded()) - self.assertEqual(service.get_field("name"), "test-package") def test_authors_array_access(self): """Test that authors field returns an array in v2.0.0.""" @@ -76,166 +39,6 @@ def test_authors_array_access(self): self.assertEqual(authors[0]["name"], "Author One") self.assertEqual(authors[1]["name"], "Author Two") - def test_tools_desc_field_preferred(self): - """Test that tools[].desc is preferred over deprecated description.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "tools": [ - { - "name": "tool1", - "desc": "Modern description field", - "description": "Deprecated description field" - } - ] - } - - service = PackageService() - service.load_metadata(metadata) - tools = service.get_tools() - self.assertIsInstance(tools, list) - self.assertEqual(len(tools), 1) - self.assertEqual(tools[0]["desc"], "Modern description field") - # The accessor should still provide both fields for compatibility - self.assertIn("description", tools[0]) - - def test_tools_validation_prefers_desc(self): - """Test that tools validation flags deprecated description usage.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "tools": [ - { - "name": "tool1", - "description": "Only deprecated description field" - } - ] - } - - is_valid, errors = self.validator.validate(metadata, self.context) - # Should still be valid but may warn about deprecated field - self.assertIsInstance(errors, list) - - def test_docker_dependency_digest_required(self): - """Test that Docker dependencies require digest in v2.0.0.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "dependencies": { - "docker": [ - { - "image": "ubuntu:20.04", - "digest": "sha256:1234567890abcdef" - } - ] - } - } - - is_valid, errors = self.validator.validate(metadata, self.context) - # Should be valid with digest - self.assertIsInstance(errors, list) - - def test_docker_dependency_version_constraint_rejected(self): - """Test that Docker dependencies reject version_constraint in v2.0.0.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "dependencies": { - "docker": [ - { - "image": "ubuntu:20.04", - "version_constraint": ">=20.04" - } - ] - } - } - - is_valid, errors = self.validator.validate(metadata, self.context) - # Should be invalid due to version_constraint - self.assertIsInstance(errors, list) - - def test_docker_dependency_optional_tag(self): - """Test that Docker dependencies allow optional tag in v2.0.0.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "dependencies": { - "docker": [ - { - "image": "ubuntu:20.04", - "tag": "20.04", - "digest": "sha256:1234567890abcdef" - } - ] - } - } - - is_valid, errors = self.validator.validate(metadata, self.context) - # Should be valid with optional tag - self.assertIsInstance(errors, list) - - def test_provenance_validation_basic(self): - """Test basic provenance validation for v2.0.0.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "provenance": { - "source": "https://github.com/example/repo", - "license": "MIT" - } - } - - is_valid, errors = self.validator.validate(metadata, self.context) - self.assertIsInstance(errors, list) - - def test_provenance_validation_rejects_unsupported_fields(self): - """Test that provenance validation rejects unsupported fields like created_by/created_at.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "provenance": { - "source": "https://github.com/example/repo", - "license": "MIT", - "created_by": "test-user", # Should be rejected - "created_at": "2023-01-01T00:00:00Z" # Should be rejected - } - } - - is_valid, errors = self.validator.validate(metadata, self.context) - # Should be invalid due to unsupported fields - self.assertIsInstance(errors, list) - - def test_citations_validation_basic(self): - """Test basic citations validation for v2.0.0.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "citations": [ - { - "text": "Example citation", - "doi": "10.1234/example" - } - ] - } - - is_valid, errors = self.validator.validate(metadata, self.context) - self.assertIsInstance(errors, list) - - def test_citations_validation_array_required(self): - """Test that citations must be an array in v2.0.0.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "citations": { - "text": "Invalid citation format", - "doi": "10.1234/example" - } - } - - is_valid, errors = self.validator.validate(metadata, self.context) - # Should be invalid due to non-array citations - self.assertIsInstance(errors, list) - def test_v2_0_0_validator_can_handle(self): """Test that v2.0.0 validator correctly identifies supported versions.""" self.assertTrue(self.validator.can_handle("2.0.0")) @@ -248,32 +51,6 @@ def test_v2_0_0_accessor_can_handle(self): self.assertFalse(self.accessor.can_handle("1.2.2")) self.assertFalse(self.accessor.can_handle("3.0.0")) - def test_entry_point_validation_v2_0_0(self): - """Test entry point validation for v2.0.0 schema.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "entry_point": "main.py" - } - - is_valid, errors = self.validator.validate(metadata, self.context) - self.assertIsInstance(errors, list) - - def test_schema_validation_uses_v2_0_0_schema(self): - """Test that schema validation uses the 2.0.0 schema version.""" - metadata = { - "hatch_schema_version": "2.0.0", - "name": "test-package", - "description": "Test package", - "authors": [{"name": "Test Author", "email": "test@example.com"}], - "version": "1.0.0" - } - - with patch('hatch_validator.schemas.schemas_retriever.get_package_schema') as mock_get_schema: - mock_get_schema.return_value = {"type": "object"} # Minimal valid schema - is_valid, errors = self.validator.validate(metadata, self.context) - mock_get_schema.assert_called_with(version="2.0.0", force_update=False) - if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() diff --git a/tests/test_v2_0_0_integration.py b/tests/test_v2_0_0_integration.py deleted file mode 100644 index 337a949..0000000 --- a/tests/test_v2_0_0_integration.py +++ /dev/null @@ -1,123 +0,0 @@ -"""Integration test for v2.0.0 schema support. - -This test demonstrates the full functionality of v2.0.0 schema validation -using official published examples from the Hatch-Schemas repository. -""" - -import json -import unittest -from pathlib import Path - -from hatch_validator.core.validator_factory import ValidatorFactory -from hatch_validator.core.pkg_accessor_factory import HatchPkgAccessorFactory -from hatch_validator.core.validation_context import ValidationContext -from hatch_validator.package.package_service import PackageService - - -class TestV200Integration(unittest.TestCase): - """Integration tests for v2.0.0 schema support.""" - - def setUp(self): - """Set up test fixtures.""" - # Load official v2.0.0 examples - self.valid_example = self._load_example("server_v2.0.0_example.json") - self.invalid_example = self._load_example("server_v2.0.0_invalid_example.json") - - def _load_example(self, filename: str) -> dict: - """Load example JSON from the Hatch-Schemas repository.""" - import urllib.request - - url = f"https://raw.githubusercontent.com/CrackingShells/Hatch-Schemas/main/examples/{filename}" - try: - with urllib.request.urlopen(url) as response: - return json.loads(response.read().decode('utf-8')) - except Exception as e: - self.fail(f"Failed to load example {filename}: {e}") - - def test_v2_0_0_validator_chain_creation(self): - """Test that v2.0.0 validator chain can be created.""" - validator = ValidatorFactory.create_validator_chain("2.0.0") - self.assertIsNotNone(validator) - self.assertTrue(validator.can_handle("2.0.0")) - - def test_v2_0_0_accessor_chain_creation(self): - """Test that v2.0.0 accessor chain can be created.""" - accessor = HatchPkgAccessorFactory.create_accessor_chain("2.0.0") - self.assertIsNotNone(accessor) - self.assertTrue(accessor.can_handle("2.0.0")) - - def test_valid_v2_0_0_example_validation(self): - """Test validation of the official valid v2.0.0 example.""" - validator = ValidatorFactory.create_validator_chain("2.0.0") - context = ValidationContext(force_schema_update=False) - - is_valid, errors = validator.validate(self.valid_example, context) - self.assertTrue(is_valid, f"Valid example should pass validation. Errors: {errors}") - self.assertEqual(len(errors), 0, f"No errors expected for valid example. Errors: {errors}") - - def test_invalid_v2_0_0_example_validation(self): - """Test validation of the official invalid v2.0.0 example.""" - validator = ValidatorFactory.create_validator_chain("2.0.0") - context = ValidationContext(force_schema_update=False) - - is_valid, errors = validator.validate(self.invalid_example, context) - self.assertFalse(is_valid, "Invalid example should fail validation") - self.assertGreater(len(errors), 0, "Invalid example should produce errors") - - def test_v2_0_0_package_service_with_valid_example(self): - """Test PackageService with the official valid v2.0.0 example.""" - service = PackageService() - service.load_metadata(self.valid_example) - - # Test basic field access - self.assertEqual(service.get_field("name"), self.valid_example.get("name")) - self.assertEqual(service.get_field("description"), self.valid_example.get("description")) - - # Test authors array (v2.0.0 specific) - authors = service.get_field("authors") - self.assertIsInstance(authors, list) - self.assertGreater(len(authors), 0) - - # Test tools with desc field (v2.0.0 specific) - tools = service.get_tools() - if tools: - for tool in tools: - if "desc" in tool: - self.assertIsInstance(tool["desc"], str) - - # Test dependencies (should include Docker with digest) - dependencies = service.get_dependencies() - self.assertIsInstance(dependencies, dict) - - def test_v2_0_0_package_service_with_invalid_example(self): - """Test PackageService with the official invalid v2.0.0 example.""" - service = PackageService() - # Invalid example should still load (service doesn't validate, just accesses) - service.load_metadata(self.invalid_example) - - # Basic access should still work - self.assertEqual(service.get_field("name"), self.invalid_example.get("name")) - - def test_v2_0_0_schema_routing(self): - """Test that v2.0.0 packages route correctly based on hatch_schema_version.""" - # Test with hatch_schema_version - metadata_with_hatch = self.valid_example.copy() - metadata_with_hatch["hatch_schema_version"] = "2.0.0" - - service = PackageService() - service.load_metadata(metadata_with_hatch) - self.assertTrue(service.is_loaded()) - - # Test with package_schema_version (backward compatibility) - metadata_with_package = self.valid_example.copy() - metadata_with_package["package_schema_version"] = "2.0.0" - if "hatch_schema_version" in metadata_with_package: - del metadata_with_package["hatch_schema_version"] - - service2 = PackageService() - service2.load_metadata(metadata_with_package) - self.assertTrue(service2.is_loaded()) - - -if __name__ == "__main__": - unittest.main() \ No newline at end of file From c4f6f8187ae83851e4876d5de4a58286c6060122 Mon Sep 17 00:00:00 2001 From: Eliott Jacopin Date: Wed, 15 Apr 2026 23:11:09 +0900 Subject: [PATCH 9/9] fix(test): use get_field("author") matching accessor convention Co-Authored-By: Claude Sonnet 4.6 --- tests/test_package_validator_for_v2_0_0.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_package_validator_for_v2_0_0.py b/tests/test_package_validator_for_v2_0_0.py index 40c0f0d..e110ff4 100644 --- a/tests/test_package_validator_for_v2_0_0.py +++ b/tests/test_package_validator_for_v2_0_0.py @@ -33,7 +33,7 @@ def test_authors_array_access(self): service = PackageService() service.load_metadata(metadata) - authors = service.get_field("authors") + authors = service.get_field("author") self.assertIsInstance(authors, list) self.assertEqual(len(authors), 2) self.assertEqual(authors[0]["name"], "Author One")