From 928e72360546dac9c02d9d61a10af0ca982f1a67 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Fri, 20 Feb 2026 12:14:40 +0100 Subject: [PATCH 01/10] new Claude command /pre-ci running locally a subset of the commands that will be executed by the CI. --- dev/commands/pre-ci.md | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 dev/commands/pre-ci.md diff --git a/dev/commands/pre-ci.md b/dev/commands/pre-ci.md new file mode 100644 index 00000000..18bc431f --- /dev/null +++ b/dev/commands/pre-ci.md @@ -0,0 +1,35 @@ +Run a subset of fast CI checks locally. These are lightweight validations that catch common issues before pushing. Execute each step sequentially and stop on the first failure. Report a summary at the end. + +## Steps + +1. **Format** Python code: + ```bash + uv run invoke format + ``` + +2. **Lint** (YAML, Ruff, ty, mypy, markdownlint, vale): + ```bash + uv run invoke lint + ``` + +3. **Python unit tests**: + ```bash + uv run pytest tests/unit/ + ``` + +4. **Docs unit tests** (vitest): + ```bash + cd docs && npx --no-install vitest run + ``` + +5. **Validate generated documentation** (regenerate and check for drift): + ```bash + uv run invoke docs-validate + ``` + +## Instructions + +- Run each step in order using the Bash tool. +- If a step fails, stop immediately and report the failure with the relevant output. +- At the end, print a summary table of all steps with pass/fail status. +- Do NOT commit or push anything. From cf066492ba90912e3d55a52d4dcb3360e6ceed1f Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Mon, 23 Feb 2026 10:23:46 +0100 Subject: [PATCH 02/10] new Claude command /feedback to identify missing project documentation --- dev/commands/feedback.md | 86 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 dev/commands/feedback.md diff --git a/dev/commands/feedback.md b/dev/commands/feedback.md new file mode 100644 index 00000000..aba5e730 --- /dev/null +++ b/dev/commands/feedback.md @@ -0,0 +1,86 @@ +# Session Feedback + +Analyze the current session and identify what documentation or context was missing, incomplete, or incorrect. The goal is to continuously improve the project's knowledge base so future sessions are more efficient. + +## Step 1: Session Analysis + +Reflect on the work done in this session. For each area, identify friction points: + +1. **Exploration overhead**: What parts of the codebase did you have to discover by searching that should have been documented? (e.g., patterns, conventions, module responsibilities) +2. **Wrong assumptions**: Did you make incorrect assumptions due to missing or misleading documentation? +3. **Repeated patterns**: Did you discover recurring patterns or conventions that aren't documented anywhere? +4. **Missing context**: What background knowledge would have helped you start faster? (e.g., architecture decisions, data flow, naming conventions) +5. **Tooling gaps**: Were there commands, scripts, or workflows that you had to figure out? + +## Step 2: Documentation Audit + +For each friction point identified, determine the appropriate fix. Check the existing documentation to avoid duplicating what's already there: + +- `AGENTS.md` — Top-level project instructions and component map +- `CLAUDE.md` — Entry point referencing AGENTS.md +- `docs/AGENTS.md` — Documentation site guide +- `infrahub_sdk/ctl/AGENTS.md` — CLI development guide +- `infrahub_sdk/pytest_plugin/AGENTS.md` — Pytest plugin guide +- `tests/AGENTS.md` — Testing guide + +Read the relevant existing files to understand what's already documented before proposing changes. + +## Step 3: Generate Report + +Present the feedback as a structured report with the following sections. Only include sections that have content — skip empty sections. + +### Format + +```markdown +## Session Feedback Report + +### What I Was Working On + + +### Documentation Gaps + + +For each gap: +- **Topic**: What's missing +- **Where**: Which file should contain this (existing file to update, or new file to create) +- **Why**: How this would have helped during this session +- **Suggested content**: A draft of what should be added (be specific and actionable) + +### Documentation Corrections + + +For each correction: +- **File**: Path to the file +- **Issue**: What's wrong or misleading +- **Fix**: What it should say instead + +### Discovered Patterns + + +For each pattern: +- **Pattern**: Description of the convention +- **Evidence**: Where in the code this pattern is used (file paths) +- **Where to document**: Which AGENTS.md or guide file should capture this + +### Memory Updates + + +For each update: +- **Action**: Add / Update / Remove +- **Content**: What to write +- **Reason**: Why this is worth remembering across sessions +``` + +## Step 4: Apply Changes + +After presenting the report, ask the user which changes they want to apply. Present the options: + +1. **Apply all** — Create/update all proposed documentation files and memory +2. **Cherry-pick** — Let the user select which changes to apply +3. **None** — Just keep the report as reference, don't modify any files + +For approved changes: +- Edit existing files when updating documentation +- Create new files only when no appropriate existing file exists +- Update `MEMORY.md` with approved memory changes +- Keep all changes minimal and focused — don't over-document From 03fe2dce5a060ca562bd3980fd51c0f779643b2f Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Mon, 23 Feb 2026 10:36:45 +0100 Subject: [PATCH 03/10] would be more convenient to have all the results at once after /pre-ci command since they are independent checks --- dev/commands/pre-ci.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/commands/pre-ci.md b/dev/commands/pre-ci.md index 18bc431f..efb2de8c 100644 --- a/dev/commands/pre-ci.md +++ b/dev/commands/pre-ci.md @@ -1,4 +1,4 @@ -Run a subset of fast CI checks locally. These are lightweight validations that catch common issues before pushing. Execute each step sequentially and stop on the first failure. Report a summary at the end. +Run a subset of fast CI checks locally. These are lightweight validations that catch common issues before pushing. Run all steps and report a summary at the end. ## Steps @@ -30,6 +30,6 @@ Run a subset of fast CI checks locally. These are lightweight validations that c ## Instructions - Run each step in order using the Bash tool. -- If a step fails, stop immediately and report the failure with the relevant output. +- If a step fails, continue with the remaining steps. - At the end, print a summary table of all steps with pass/fail status. - Do NOT commit or push anything. From bc144dd0269cf664a7ab25c6d426db885b40e73b Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Tue, 24 Feb 2026 20:58:49 +0100 Subject: [PATCH 04/10] encapsulating the 4th command into a subshell to avoid changing the working directory --- dev/commands/pre-ci.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/commands/pre-ci.md b/dev/commands/pre-ci.md index efb2de8c..91e8fd68 100644 --- a/dev/commands/pre-ci.md +++ b/dev/commands/pre-ci.md @@ -19,7 +19,7 @@ Run a subset of fast CI checks locally. These are lightweight validations that c 4. **Docs unit tests** (vitest): ```bash - cd docs && npx --no-install vitest run + (cd docs && npx --no-install vitest run) ``` 5. **Validate generated documentation** (regenerate and check for drift): From 514d7d31992cb4a4b35cc1de01c48185e1d4fba5 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Tue, 24 Feb 2026 20:59:34 +0100 Subject: [PATCH 05/10] new blank line to make linters happy --- dev/commands/feedback.md | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/commands/feedback.md b/dev/commands/feedback.md index aba5e730..a870afd6 100644 --- a/dev/commands/feedback.md +++ b/dev/commands/feedback.md @@ -80,6 +80,7 @@ After presenting the report, ask the user which changes they want to apply. Pres 3. **None** — Just keep the report as reference, don't modify any files For approved changes: + - Edit existing files when updating documentation - Create new files only when no appropriate existing file exists - Update `MEMORY.md` with approved memory changes From 8abbde24a8229a6d69aa10a92f53c33eac525688 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Tue, 24 Feb 2026 21:05:55 +0100 Subject: [PATCH 06/10] replaced ambiguous term "session" with "conversation" into feedback command --- dev/commands/feedback.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dev/commands/feedback.md b/dev/commands/feedback.md index a870afd6..e4a4dbed 100644 --- a/dev/commands/feedback.md +++ b/dev/commands/feedback.md @@ -1,10 +1,10 @@ # Session Feedback -Analyze the current session and identify what documentation or context was missing, incomplete, or incorrect. The goal is to continuously improve the project's knowledge base so future sessions are more efficient. +Analyze this conversation and identify what documentation or context was missing, incomplete, or incorrect. The goal is to continuously improve the project's knowledge base so future conversations are more efficient. ## Step 1: Session Analysis -Reflect on the work done in this session. For each area, identify friction points: +Reflect on the work done in this conversation. For each area, identify friction points: 1. **Exploration overhead**: What parts of the codebase did you have to discover by searching that should have been documented? (e.g., patterns, conventions, module responsibilities) 2. **Wrong assumptions**: Did you make incorrect assumptions due to missing or misleading documentation? @@ -35,7 +35,7 @@ Present the feedback as a structured report with the following sections. Only in ## Session Feedback Report ### What I Was Working On - + ### Documentation Gaps @@ -43,7 +43,7 @@ Present the feedback as a structured report with the following sections. Only in For each gap: - **Topic**: What's missing - **Where**: Which file should contain this (existing file to update, or new file to create) -- **Why**: How this would have helped during this session +- **Why**: How this would have helped during this conversation - **Suggested content**: A draft of what should be added (be specific and actionable) ### Documentation Corrections From 6387a79193de1048530ed12b1ef19ef41ea9d445 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Wed, 25 Feb 2026 10:14:06 +0100 Subject: [PATCH 07/10] additional blank lines --- dev/commands/feedback.md | 5 +++++ dev/commands/pre-ci.md | 1 + 2 files changed, 6 insertions(+) diff --git a/dev/commands/feedback.md b/dev/commands/feedback.md index e4a4dbed..b89896be 100644 --- a/dev/commands/feedback.md +++ b/dev/commands/feedback.md @@ -41,6 +41,7 @@ Present the feedback as a structured report with the following sections. Only in For each gap: + - **Topic**: What's missing - **Where**: Which file should contain this (existing file to update, or new file to create) - **Why**: How this would have helped during this conversation @@ -50,6 +51,7 @@ For each gap: For each correction: + - **File**: Path to the file - **Issue**: What's wrong or misleading - **Fix**: What it should say instead @@ -58,6 +60,7 @@ For each correction: For each pattern: + - **Pattern**: Description of the convention - **Evidence**: Where in the code this pattern is used (file paths) - **Where to document**: Which AGENTS.md or guide file should capture this @@ -66,6 +69,7 @@ For each pattern: For each update: + - **Action**: Add / Update / Remove - **Content**: What to write - **Reason**: Why this is worth remembering across sessions @@ -79,6 +83,7 @@ After presenting the report, ask the user which changes they want to apply. Pres 2. **Cherry-pick** — Let the user select which changes to apply 3. **None** — Just keep the report as reference, don't modify any files + For approved changes: - Edit existing files when updating documentation diff --git a/dev/commands/pre-ci.md b/dev/commands/pre-ci.md index 91e8fd68..7f14d70d 100644 --- a/dev/commands/pre-ci.md +++ b/dev/commands/pre-ci.md @@ -33,3 +33,4 @@ Run a subset of fast CI checks locally. These are lightweight validations that c - If a step fails, continue with the remaining steps. - At the end, print a summary table of all steps with pass/fail status. - Do NOT commit or push anything. + From d748a00ee0d029772d9d0362b71d83225efc8d3f Mon Sep 17 00:00:00 2001 From: Pol Michel <40861490+polmichel@users.noreply.github.com> Date: Thu, 26 Feb 2026 21:46:54 +0100 Subject: [PATCH 08/10] IHS-156 / #497 : support from_pool attributes on Python SDK queries (#850) * IHS-156 fix SDK from_pool attribute management before querying GraphQL API * IHS-156 refactor the test cases to have a better view on the tests perimeter * towncrier regarding Github issue #497 * IHS-156 update AGENTS doc using the command /feedback * IHS-156 is_from_pool_attribute typing and naming * IHS-156 refactor generate_input_data * IHS-156 remove Jira issue related comment in the tests * IHS-156 removed a part of fixtures to upper level * IHS-156 tested all cases of generated_input_data * IHS-156 fixed a bug in test_relationship_from_pool.py * IHS-156 side effect bugs regarding from_pool attributes * IHS-156 last feedbacks regarding documentation and variable naming * IHS-156 fix typing error * IHS-156 renamed payload_dict -> payload --- AGENTS.md | 2 +- changelog/497.fixed.md | 1 + .../sdk_ref/infrahub_sdk/node/attribute.mdx | 12 + infrahub_sdk/node/attribute.py | 122 ++++-- infrahub_sdk/node/node.py | 37 +- tests/AGENTS.md | 6 + tests/unit/sdk/conftest.py | 129 +----- tests/unit/sdk/pool/__init__.py | 0 tests/unit/sdk/pool/conftest.py | 114 +++++ tests/unit/sdk/pool/test_allocate.py | 219 ++++++++++ .../unit/sdk/pool/test_attribute_from_pool.py | 204 +++++++++ tests/unit/sdk/pool/test_pool_queries.py | 185 ++++++++ .../sdk/pool/test_relationship_from_pool.py | 130 ++++++ .../sdk/test_attribute_generate_input_data.py | 395 ++++++++++++++++++ tests/unit/sdk/test_client.py | 204 --------- tests/unit/sdk/test_node.py | 283 ------------- 16 files changed, 1386 insertions(+), 657 deletions(-) create mode 100644 changelog/497.fixed.md create mode 100644 tests/unit/sdk/pool/__init__.py create mode 100644 tests/unit/sdk/pool/conftest.py create mode 100644 tests/unit/sdk/pool/test_allocate.py create mode 100644 tests/unit/sdk/pool/test_attribute_from_pool.py create mode 100644 tests/unit/sdk/pool/test_pool_queries.py create mode 100644 tests/unit/sdk/pool/test_relationship_from_pool.py create mode 100644 tests/unit/sdk/test_attribute_generate_input_data.py diff --git a/AGENTS.md b/AGENTS.md index 00de5ab1..7abf694f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -7,7 +7,7 @@ Infrahub Python SDK - async/sync client for Infrahub infrastructure management. ```bash uv sync --all-groups --all-extras # Install all deps uv run invoke format # Format code -uv run invoke lint # All linters (code + yamllint + documentation) +uv run invoke lint # Full pipeline: ruff, yamllint, ty, mypy, markdownlint, vale uv run invoke lint-code # All linters for Python code uv run pytest tests/unit/ # Unit tests uv run pytest tests/integration/ # Integration tests diff --git a/changelog/497.fixed.md b/changelog/497.fixed.md new file mode 100644 index 00000000..b32323d1 --- /dev/null +++ b/changelog/497.fixed.md @@ -0,0 +1 @@ +Fixed Python SDK query generation regarding from_pool generated attribute value diff --git a/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx b/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx index a7b82ecb..d08c7fc5 100644 --- a/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx +++ b/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx @@ -24,3 +24,15 @@ value(self) -> Any ```python value(self, value: Any) -> None ``` + +#### `is_from_pool_attribute` + +```python +is_from_pool_attribute(self) -> bool +``` + +Check whether this attribute's value is sourced from a resource pool. + +**Returns:** + +- True if the attribute value is a resource pool node or was explicitly allocated from a pool. diff --git a/infrahub_sdk/node/attribute.py b/infrahub_sdk/node/attribute.py index 8043d567..54dd99aa 100644 --- a/infrahub_sdk/node/attribute.py +++ b/infrahub_sdk/node/attribute.py @@ -2,7 +2,7 @@ import ipaddress from collections.abc import Callable -from typing import TYPE_CHECKING, Any, get_args +from typing import TYPE_CHECKING, Any, NamedTuple, get_args from ..protocols_base import CoreNodeBase from ..uuidt import UUIDT @@ -13,6 +13,33 @@ from ..schema import AttributeSchemaAPI +class _GraphQLPayloadAttribute(NamedTuple): + """Result of resolving an attribute value for a GraphQL mutation. + + Attributes: + payload: Key/value entries to include in the mutation payload + (e.g. ``{"value": ...}`` or ``{"from_pool": ...}``). + variables: GraphQL variable bindings for unsafe string values. + needs_metadata: When ``True``, the payload needs to append property flags/objects + """ + + payload: dict[str, Any] + variables: dict[str, Any] + needs_metadata: bool + + def to_dict(self) -> dict[str, Any]: + return {"data": self.payload, "variables": self.variables} + + def add_properties(self, properties_flag: dict[str, Any], properties_object: dict[str, str | None]) -> None: + if not self.needs_metadata: + return + for prop_name, prop in properties_flag.items(): + self.payload[prop_name] = prop + + for prop_name, prop in properties_object.items(): + self.payload[prop_name] = prop + + class Attribute: """Represents an attribute of a Node, including its schema, value, and properties.""" @@ -25,8 +52,12 @@ def __init__(self, name: str, schema: AttributeSchemaAPI, data: Any | dict) -> N """ self.name = name self._schema = schema + self._from_pool: dict[str, Any] | None = None - if not isinstance(data, dict) or "value" not in data: + if isinstance(data, dict) and "from_pool" in data: + self._from_pool = data.pop("from_pool") + data.setdefault("value", None) + elif not isinstance(data, dict) or "value" not in data: data = {"value": data} self._properties_flag = PROPERTIES_FLAG @@ -76,38 +107,55 @@ def value(self, value: Any) -> None: self._value = value self.value_has_been_mutated = True - def _generate_input_data(self) -> dict | None: - data: dict[str, Any] = {} - variables: dict[str, Any] = {} - - if self.value is None: - if self._schema.optional and self.value_has_been_mutated: - data["value"] = None - return data - - if isinstance(self.value, str): - if SAFE_VALUE.match(self.value): - data["value"] = self.value - else: - var_name = f"value_{UUIDT.new().hex}" - variables[var_name] = self.value - data["value"] = f"${var_name}" - elif isinstance(self.value, get_args(IP_TYPES)): - data["value"] = self.value.with_prefixlen - elif isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): - data["from_pool"] = {"id": self.value.id} - else: - data["value"] = self.value - - for prop_name in self._properties_flag: - if getattr(self, prop_name) is not None: - data[prop_name] = getattr(self, prop_name) + def _initialize_graphql_payload(self) -> _GraphQLPayloadAttribute: + """Resolve the attribute value into a GraphQL mutation payload object.""" - for prop_name in self._properties_object: - if getattr(self, prop_name) is not None: - data[prop_name] = getattr(self, prop_name)._generate_input_data() + # Pool-based allocation (dict data or resource-pool node) + if self._from_pool is not None: + return _GraphQLPayloadAttribute(payload={"from_pool": self._from_pool}, variables={}, needs_metadata=True) + if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): + return _GraphQLPayloadAttribute( + payload={"from_pool": {"id": self.value.id}}, variables={}, needs_metadata=True + ) - return {"data": data, "variables": variables} + # Null value + if self.value is None: + data = {"value": None} if (self._schema.optional and self.value_has_been_mutated) else {} + return _GraphQLPayloadAttribute(payload=data, variables={}, needs_metadata=False) + + # Unsafe strings need a variable binding to avoid injection + if isinstance(self.value, str) and not SAFE_VALUE.match(self.value): + var_name = f"value_{UUIDT.new().hex}" + return _GraphQLPayloadAttribute( + payload={"value": f"${var_name}"}, + variables={var_name: self.value}, + needs_metadata=True, + ) + + # Safe strings, IP types, and everything else + value = self.value.with_prefixlen if isinstance(self.value, get_args(IP_TYPES)) else self.value + return _GraphQLPayloadAttribute(payload={"value": value}, variables={}, needs_metadata=True) + + def _generate_input_data(self) -> _GraphQLPayloadAttribute: + """Build the input payload for a GraphQL mutation on this attribute. + + Returns a ResolvedValue object, which contains all the data required. + """ + graphql_payload = self._initialize_graphql_payload() + + properties_flag: dict[str, Any] = { + property_name: getattr(self, property_name) + for property_name in self._properties_flag + if getattr(self, property_name) is not None + } + properties_object: dict[str, str | None] = { + property_name: getattr(self, property_name)._generate_input_data() + for property_name in self._properties_object + if getattr(self, property_name) is not None + } + graphql_payload.add_properties(properties_flag, properties_object) + + return graphql_payload def _generate_query_data(self, property: bool = False, include_metadata: bool = False) -> dict | None: data: dict[str, Any] = {"value": None} @@ -128,7 +176,15 @@ def _generate_query_data(self, property: bool = False, include_metadata: bool = return data def _generate_mutation_query(self) -> dict[str, Any]: - if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): + if self.is_from_pool_attribute(): # If it points to a pool, ask for the value of the pool allocated resource return {self.name: {"value": None}} return {} + + def is_from_pool_attribute(self) -> bool: + """Check whether this attribute's value is sourced from a resource pool. + + Returns: + True if the attribute value is a resource pool node or was explicitly allocated from a pool. + """ + return (isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool()) or self._from_pool is not None diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index 0c85c3ad..9d024cbb 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -216,7 +216,7 @@ def is_resource_pool(self) -> bool: def get_raw_graphql_data(self) -> dict | None: return self._data - def _generate_input_data( # noqa: C901, PLR0915 + def _generate_input_data( # noqa: C901 self, exclude_unmodified: bool = False, exclude_hfid: bool = False, @@ -228,27 +228,18 @@ def _generate_input_data( # noqa: C901, PLR0915 dict[str, Dict]: Representation of an input data in dict format """ - data = {} - variables = {} + data: dict[str, Any] = {} + variables: dict[str, Any] = {} for item_name in self._attributes: attr: Attribute = getattr(self, item_name) if attr._schema.read_only: continue - attr_data = attr._generate_input_data() - - # NOTE, this code has been inherited when we splitted attributes and relationships - # into 2 loops, most likely it's possible to simply it - if attr_data and isinstance(attr_data, dict): - if variable_values := attr_data.get("data"): - data[item_name] = variable_values - else: - data[item_name] = attr_data - if variable_names := attr_data.get("variables"): - variables.update(variable_names) - - elif attr_data and isinstance(attr_data, list): - data[item_name] = attr_data + graphql_payload = attr._generate_input_data() + if graphql_payload.payload: + data[item_name] = graphql_payload.payload + if graphql_payload.variables: + variables.update(graphql_payload.variables) for item_name in self._relationships: allocate_from_pool = False @@ -1011,11 +1002,7 @@ async def _process_mutation_result( for attr_name in self._attributes: attr = getattr(self, attr_name) - if ( - attr_name not in object_response - or not isinstance(attr.value, InfrahubNodeBase) - or not attr.value.is_resource_pool() - ): + if attr_name not in object_response or not attr.is_from_pool_attribute(): continue # Process allocated resource from a pool and update attribute @@ -1819,11 +1806,7 @@ def _process_mutation_result( for attr_name in self._attributes: attr = getattr(self, attr_name) - if ( - attr_name not in object_response - or not isinstance(attr.value, InfrahubNodeBase) - or not attr.value.is_resource_pool() - ): + if attr_name not in object_response or not attr.is_from_pool_attribute(): continue # Process allocated resource from a pool and update attribute diff --git a/tests/AGENTS.md b/tests/AGENTS.md index f3608ead..cce67364 100644 --- a/tests/AGENTS.md +++ b/tests/AGENTS.md @@ -17,6 +17,12 @@ uv run pytest tests/unit/test_client.py # Single file ```text tests/ ├── unit/ # Fast, mocked, no external deps +│ ├── ctl/ # CLI command tests +│ └── sdk/ # SDK tests +│ ├── pool/ # Resource pool allocation tests +│ ├── spec/ # Object spec tests +│ ├── checks/ # InfrahubCheck tests +│ └── ... # Core SDK tests (client, node, schema, etc.) ├── integration/ # Real Infrahub via testcontainers ├── fixtures/ # Test data (JSON, YAML) └── helpers/ # Test utilities diff --git a/tests/unit/sdk/conftest.py b/tests/unit/sdk/conftest.py index 8fb9ecf2..d28c23b9 100644 --- a/tests/unit/sdk/conftest.py +++ b/tests/unit/sdk/conftest.py @@ -1015,115 +1015,6 @@ async def ipam_ipprefix_data() -> dict[str, Any]: } -@pytest.fixture -async def ipaddress_pool_schema() -> NodeSchemaAPI: - data = { - "name": "IPAddressPool", - "namespace": "Core", - "description": "A pool of IP address resources", - "label": "IP Address Pool", - "default_filter": "name__value", - "order_by": ["name__value"], - "display_labels": ["name__value"], - "include_in_menu": False, - "branch": BranchSupportType.AGNOSTIC.value, - "inherit_from": ["CoreResourcePool"], - "attributes": [ - { - "name": "default_address_type", - "kind": "Text", - "optional": False, - "description": "The object type to create when reserving a resource in the pool", - }, - { - "name": "default_prefix_length", - "kind": "Number", - "optional": True, - }, - ], - "relationships": [ - { - "name": "resources", - "peer": "BuiltinIPPrefix", - "kind": "Attribute", - "identifier": "ipaddresspool__resource", - "cardinality": "many", - "optional": False, - "order_weight": 4000, - }, - { - "name": "ip_namespace", - "peer": "BuiltinIPNamespace", - "kind": "Attribute", - "identifier": "ipaddresspool__ipnamespace", - "cardinality": "one", - "optional": False, - "order_weight": 5000, - }, - ], - } - return NodeSchema(**data).convert_api() - - -@pytest.fixture -async def ipprefix_pool_schema() -> NodeSchemaAPI: - data = { - "name": "IPPrefixPool", - "namespace": "Core", - "description": "A pool of IP prefix resources", - "label": "IP Prefix Pool", - "include_in_menu": False, - "branch": BranchSupportType.AGNOSTIC.value, - "inherit_from": ["CoreResourcePool"], - "attributes": [ - { - "name": "default_prefix_length", - "kind": "Number", - "description": "The default prefix length as an integer for prefixes allocated from this pool.", - "optional": True, - "order_weight": 5000, - }, - { - "name": "default_member_type", - "kind": "Text", - "enum": ["prefix", "address"], - "default_value": "prefix", - "optional": True, - "order_weight": 3000, - }, - { - "name": "default_prefix_type", - "kind": "Text", - "optional": True, - "order_weight": 4000, - }, - ], - "relationships": [ - { - "name": "resources", - "peer": "BuiltinIPPrefix", - "kind": "Attribute", - "identifier": "prefixpool__resource", - "cardinality": "many", - "branch": BranchSupportType.AGNOSTIC.value, - "optional": False, - "order_weight": 6000, - }, - { - "name": "ip_namespace", - "peer": "BuiltinIPNamespace", - "kind": "Attribute", - "identifier": "prefixpool__ipnamespace", - "cardinality": "one", - "branch": BranchSupportType.AGNOSTIC.value, - "optional": False, - "order_weight": 7000, - }, - ], - } - return NodeSchema(**data).convert_api() - - @pytest.fixture async def address_schema() -> NodeSchemaAPI: data = { @@ -2645,3 +2536,23 @@ async def nested_device_with_interfaces_schema() -> NodeSchemaAPI: ], } return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def vlan_schema() -> NodeSchemaAPI: + data = { + "name": "VLAN", + "namespace": "Infra", + "label": "VLAN", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "attributes": [ + {"name": "name", "kind": "Text", "unique": True}, + {"name": "vlan_id", "kind": "Number"}, + {"name": "role", "kind": "Text", "optional": True}, + {"name": "status", "kind": "Text", "optional": True}, + ], + "relationships": [], + } + return NodeSchema(**data).convert_api() diff --git a/tests/unit/sdk/pool/__init__.py b/tests/unit/sdk/pool/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/sdk/pool/conftest.py b/tests/unit/sdk/pool/conftest.py new file mode 100644 index 00000000..e8276be6 --- /dev/null +++ b/tests/unit/sdk/pool/conftest.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import pytest + +from infrahub_sdk.schema import BranchSupportType, NodeSchema, NodeSchemaAPI + + +@pytest.fixture +async def ipaddress_pool_schema() -> NodeSchemaAPI: + data = { + "name": "IPAddressPool", + "namespace": "Core", + "description": "A pool of IP address resources", + "label": "IP Address Pool", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "include_in_menu": False, + "branch": BranchSupportType.AGNOSTIC.value, + "inherit_from": ["CoreResourcePool"], + "attributes": [ + { + "name": "default_address_type", + "kind": "Text", + "optional": False, + "description": "The object type to create when reserving a resource in the pool", + }, + { + "name": "default_prefix_length", + "kind": "Number", + "optional": True, + }, + ], + "relationships": [ + { + "name": "resources", + "peer": "BuiltinIPPrefix", + "kind": "Attribute", + "identifier": "ipaddresspool__resource", + "cardinality": "many", + "optional": False, + "order_weight": 4000, + }, + { + "name": "ip_namespace", + "peer": "BuiltinIPNamespace", + "kind": "Attribute", + "identifier": "ipaddresspool__ipnamespace", + "cardinality": "one", + "optional": False, + "order_weight": 5000, + }, + ], + } + return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def ipprefix_pool_schema() -> NodeSchemaAPI: + data = { + "name": "IPPrefixPool", + "namespace": "Core", + "description": "A pool of IP prefix resources", + "label": "IP Prefix Pool", + "include_in_menu": False, + "branch": BranchSupportType.AGNOSTIC.value, + "inherit_from": ["CoreResourcePool"], + "attributes": [ + { + "name": "default_prefix_length", + "kind": "Number", + "description": "The default prefix length as an integer for prefixes allocated from this pool.", + "optional": True, + "order_weight": 5000, + }, + { + "name": "default_member_type", + "kind": "Text", + "enum": ["prefix", "address"], + "default_value": "prefix", + "optional": True, + "order_weight": 3000, + }, + { + "name": "default_prefix_type", + "kind": "Text", + "optional": True, + "order_weight": 4000, + }, + ], + "relationships": [ + { + "name": "resources", + "peer": "BuiltinIPPrefix", + "kind": "Attribute", + "identifier": "prefixpool__resource", + "cardinality": "many", + "branch": BranchSupportType.AGNOSTIC.value, + "optional": False, + "order_weight": 6000, + }, + { + "name": "ip_namespace", + "peer": "BuiltinIPNamespace", + "kind": "Attribute", + "identifier": "prefixpool__ipnamespace", + "cardinality": "one", + "branch": BranchSupportType.AGNOSTIC.value, + "optional": False, + "order_weight": 7000, + }, + ], + } + return NodeSchema(**data).convert_api() diff --git a/tests/unit/sdk/pool/test_allocate.py b/tests/unit/sdk/pool/test_allocate.py new file mode 100644 index 00000000..1ed53500 --- /dev/null +++ b/tests/unit/sdk/pool/test_allocate.py @@ -0,0 +1,219 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from pytest_httpx import HTTPXMock + + from infrahub_sdk.schema import NodeSchemaAPI + from tests.unit.sdk.conftest import BothClients + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_allocate_next_ip_address( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubIPAddressPoolGetResource": { + "ok": True, + "node": { + "id": "17da1246-54f1-a9c0-2784-179f0ec5b128", + "kind": "IpamIPAddress", + "identifier": "test", + "display_label": "192.0.2.0/32", + }, + } + } + }, + match_headers={"X-Infrahub-Tracker": "allocate-ip-loopback"}, + is_reusable=True, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPAddress": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "__typename": "IpamIPAddress", + "address": {"value": "192.0.2.0/32"}, + "description": {"value": "test"}, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, + is_reusable=True, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_address = await clients.standard.allocate_next_ip_address( + resource_pool=ip_pool, + identifier="test", + prefix_length=32, + address_type="IpamIPAddress", + data={"description": "test"}, + tracker="allocate-ip-loopback", + ) + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_address = clients.sync.allocate_next_ip_address( + resource_pool=ip_pool, + identifier="test", + prefix_length=32, + address_type="IpamIPAddress", + data={"description": "test"}, + tracker="allocate-ip-loopback", + ) + + assert ip_address + assert str(ip_address.address.value) == "192.0.2.0/32" + assert ip_address.description.value == "test" + + +@pytest.mark.parametrize("client_type", client_types) +async def test_allocate_next_ip_prefix( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipprefix_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubIPPrefixPoolGetResource": { + "ok": True, + "node": { + "id": "7d9bd8d-8fc2-70b0-278a-179f425e25cb", + "kind": "IpamIPPrefix", + "identifier": "test", + "display_label": "192.0.2.0/31", + }, + } + } + }, + match_headers={"X-Infrahub-Tracker": "allocate-ip-interco"}, + is_reusable=True, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPPrefix": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "__typename": "IpamIPPrefix", + "prefix": {"value": "192.0.2.0/31"}, + "description": {"value": "test"}, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipprefix-page1"}, + is_reusable=True, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipprefix_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core intercos", + "default_prefix_type": "IpamIPPrefix", + "default_prefix_length": 31, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_prefix = await clients.standard.allocate_next_ip_prefix( + resource_pool=ip_pool, + identifier="test", + prefix_length=31, + prefix_type="IpamIPPrefix", + data={"description": "test"}, + tracker="allocate-ip-interco", + ) + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipprefix_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core intercos", + "default_prefix_type": "IpamIPPrefix", + "default_prefix_length": 31, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_prefix = clients.sync.allocate_next_ip_prefix( + resource_pool=ip_pool, + identifier="test", + prefix_length=31, + prefix_type="IpamIPPrefix", + data={"description": "test"}, + tracker="allocate-ip-interco", + ) + + assert ip_prefix + assert str(ip_prefix.prefix.value) == "192.0.2.0/31" + assert ip_prefix.description.value == "test" diff --git a/tests/unit/sdk/pool/test_attribute_from_pool.py b/tests/unit/sdk/pool/test_attribute_from_pool.py new file mode 100644 index 00000000..75d63f6d --- /dev/null +++ b/tests/unit/sdk/pool/test_attribute_from_pool.py @@ -0,0 +1,204 @@ +""" +When using from_pool on a number attribute (e.g. vlan_id), the SDK should generate: + vlan_id: { from_pool: { id: "...", identifier: "..." } } + +There are two ways to request a pool allocation: +1. Dict-based: {"from_pool": {"id": "...", "identifier": "..."}} +2. Node-based: pass an InfrahubNode pool object as the attribute value +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from infrahub_sdk import InfrahubClient, InfrahubClientSync + from infrahub_sdk.schema import NodeSchemaAPI + + +POOL_ID = "185b9728-1b76-dda7-d13d-106529b1bcd9" + + +# ────────────────────────────────────────────── +# Dict-based from_pool - async client +# ────────────────────────────────────────────── + + +async def test_number_attribute_from_pool_with_identifier( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """A number attribute with from_pool and identifier should NOT be wrapped in value.""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + "role": "user", + "status": "active", + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["role"] == {"value": "user"} + assert input_data["status"] == {"value": "active"} + assert input_data["vlan_id"] == {"from_pool": {"id": POOL_ID, "identifier": "test"}} + assert "value" not in input_data["vlan_id"] + + +async def test_number_attribute_regular_value( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """Regular number values should still be wrapped in value as before.""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": 100, + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["vlan_id"] == {"value": 100} + + +async def test_number_attribute_from_pool_mutation_query( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """A from_pool dict attribute should request value back in the mutation query.""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + mutation_query = node._generate_mutation_query() + + assert mutation_query["object"]["vlan_id"] == {"value": None} + + +# ────────────────────────────────────────────── +# Dict-based from_pool - sync client +# ────────────────────────────────────────────── + + +async def test_sync_number_attribute_from_pool_with_identifier( + client_sync: InfrahubClientSync, + vlan_schema: NodeSchemaAPI, +) -> None: + """A number attribute with from_pool and identifier should NOT be wrapped in value (sync client).""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + "role": "user", + "status": "active", + } + node = InfrahubNodeSync(client=client_sync, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["role"] == {"value": "user"} + assert input_data["status"] == {"value": "active"} + assert input_data["vlan_id"] == {"from_pool": {"id": POOL_ID, "identifier": "test"}} + assert "value" not in input_data["vlan_id"] + + +async def test_sync_number_attribute_regular_value( + client_sync: InfrahubClientSync, + vlan_schema: NodeSchemaAPI, +) -> None: + """Regular number values should still be wrapped in value as before (sync client).""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": 100, + } + node = InfrahubNodeSync(client=client_sync, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["vlan_id"] == {"value": 100} + + +# ────────────────────────────────────────────── +# Node-based from_pool - async client +# ────────────────────────────────────────────── + +NODE_POOL_ID = "185b9728-1b56-dda7-d13d-106535b1bcd9" + + +async def test_attribute_with_pool_node_generates_from_pool( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], +) -> None: + """When an attribute value is a CoreNodeBase pool node, _generate_input_data should produce from_pool.""" + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": NODE_POOL_ID, + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + vlan = InfrahubNode( + client=client, + schema=vlan_schema, + data={"name": "Example VLAN", "vlan_id": ip_pool}, + ) + + # Act + input_data = vlan._generate_input_data()["data"]["data"] + + assert input_data["vlan_id"] == {"from_pool": {"id": NODE_POOL_ID}} + assert "value" not in input_data["vlan_id"] + + +async def test_attribute_with_pool_node_generates_mutation_query( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], +) -> None: + """When an attribute value is a CoreNodeBase pool node, _generate_mutation_query should request value back.""" + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": NODE_POOL_ID, + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + vlan = InfrahubNode( + client=client, + schema=vlan_schema, + data={"name": "Example VLAN", "vlan_id": ip_pool}, + ) + + # Act + mutation_query = vlan._generate_mutation_query() + + assert mutation_query["object"]["vlan_id"] == {"value": None} diff --git a/tests/unit/sdk/pool/test_pool_queries.py b/tests/unit/sdk/pool/test_pool_queries.py new file mode 100644 index 00000000..4f27cba7 --- /dev/null +++ b/tests/unit/sdk/pool/test_pool_queries.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from pytest_httpx import HTTPXMock + + from infrahub_sdk.schema import NodeSchemaAPI + from tests.unit.sdk.conftest import BothClients + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_get_pool_allocated_resources( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubResourcePoolAllocated": { + "count": 2, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "kind": "IpamIPAddress", + "branch": "main", + "identifier": "ip-1", + } + }, + { + "node": { + "id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + "kind": "IpamIPAddress", + "branch": "main", + "identifier": "ip-2", + } + }, + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "get-allocated-resources-page1"}, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPAddress": { + "count": 2, + "edges": [ + {"node": {"id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", "__typename": "IpamIPAddress"}}, + {"node": {"id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", "__typename": "IpamIPAddress"}}, + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + resources = await ip_pool.get_pool_allocated_resources(resource=ip_prefix) + assert len(resources) == 2 + assert [resource.id for resource in resources] == [ + "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + ] + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + resources = ip_pool.get_pool_allocated_resources(resource=ip_prefix) + assert len(resources) == 2 + assert [resource.id for resource in resources] == [ + "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + ] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_get_pool_resources_utilization( + httpx_mock: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubResourcePoolUtilization": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd86-3471-a020-2782-179ff078e58f", + "utilization": 93.75, + "utilization_branches": 0, + "utilization_default_branch": 93.75, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "get-pool-utilization"}, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + utilizations = await ip_pool.get_pool_resources_utilization() + assert len(utilizations) == 1 + assert utilizations[0]["utilization"] == 93.75 + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + utilizations = ip_pool.get_pool_resources_utilization() + assert len(utilizations) == 1 + assert utilizations[0]["utilization"] == 93.75 diff --git a/tests/unit/sdk/pool/test_relationship_from_pool.py b/tests/unit/sdk/pool/test_relationship_from_pool.py new file mode 100644 index 00000000..9ce543dc --- /dev/null +++ b/tests/unit/sdk/pool/test_relationship_from_pool.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from infrahub_sdk import InfrahubClient + from infrahub_sdk.schema import NodeSchemaAPI + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_create_input_data_with_resource_pool_relationship( + client: InfrahubClient, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + simple_device_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + if client_type == "standard": + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + else: + ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNodeSync( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + + assert device._generate_input_data()["data"] == { + "data": { + "name": {"value": "device-01"}, + "primary_address": {"from_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}}, + "ip_address_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}, + }, + } + + +@pytest.mark.parametrize("client_type", client_types) +async def test_create_mutation_query_with_resource_pool_relationship( + client: InfrahubClient, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + simple_device_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + if client_type == "standard": + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + else: + ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + + assert device._generate_mutation_query() == { + "object": { + "id": None, + "primary_address": {"node": {"__typename": None, "display_label": None, "id": None}}, + "ip_address_pool": {"node": {"__typename": None, "display_label": None, "id": None}}, + }, + "ok": None, + } diff --git a/tests/unit/sdk/test_attribute_generate_input_data.py b/tests/unit/sdk/test_attribute_generate_input_data.py new file mode 100644 index 00000000..a50a2fe9 --- /dev/null +++ b/tests/unit/sdk/test_attribute_generate_input_data.py @@ -0,0 +1,395 @@ +"""Unit tests for Attribute._generate_input_data covering all code paths.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from infrahub_sdk.node.attribute import Attribute +from infrahub_sdk.protocols_base import CoreNodeBase +from infrahub_sdk.schema import AttributeSchemaAPI + +# ────────────────────────────────────────────── +# Value resolution: from_pool (dict-based) +# ────────────────────────────────────────────── + + +class TestFromPoolDict: + def test_from_pool_with_id(self) -> None: + pool_data = {"id": "pool-uuid-1"} + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": pool_data}) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": {"id": "pool-uuid-1"}} + assert result.variables == {} + + def test_from_pool_with_id_and_identifier(self) -> None: + pool_data = {"id": "pool-uuid-1", "identifier": "test"} + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": pool_data}) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": {"id": "pool-uuid-1", "identifier": "test"}} + assert result.variables == {} + + def test_from_pool_with_pool_name(self) -> None: + """from_pool can be a plain string (pool name), e.g. from_pool: 'VLAN ID Pool'.""" + attr = Attribute( + name="vlan_id", schema=_make_schema("Number", optional=True), data={"from_pool": "VLAN ID Pool"} + ) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": "VLAN ID Pool"} + assert result.variables == {} + assert "value" not in result.payload + + def test_from_pool_value_is_none(self) -> None: + """from_pool pops 'from_pool' and sets Attribute.value to None; value should NOT appear in payload.""" + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": {"id": "pool-uuid-1"}}) + + assert attr.value is None + result = attr._generate_input_data() + assert "value" not in result.payload + + +# ────────────────────────────────────────────── +# Value resolution: from_pool (node-based) +# ────────────────────────────────────────────── + + +class TestFromPoolNode: + def test_pool_node_generates_from_pool(self) -> None: + pool_node = _FakeNode(node_id="node-pool-uuid", is_pool=True) + + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=pool_node) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": {"id": "node-pool-uuid"}} + assert result.variables == {} + + def test_non_pool_node_treated_as_regular_value(self) -> None: + """A CoreNodeBase that is NOT a resource pool should go through the normal value path.""" + node = _FakeNode(node_id="regular-node-uuid", is_pool=False) + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=node) + + result = attr._generate_input_data() + + assert result.payload == {"value": node} + + +# ────────────────────────────────────────────── +# Value resolution: null values +# ────────────────────────────────────────────── + + +class TestNullValue: + def test_null_value_not_mutated(self) -> None: + """None value that was never mutated → empty payload, no properties.""" + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data={"value": None}) + + result = attr._generate_input_data() + + assert result.payload == {} + assert result.variables == {} + assert result.needs_metadata is False + + def test_null_value_mutated_optional(self) -> None: + """None value on an optional attr that was mutated → explicit null.""" + attr = Attribute(name="test_attr", schema=_make_schema("Text", optional=True), data={"value": "initial"}) + attr.value = None # triggers value_has_been_mutated + + result = attr._generate_input_data() + + assert result.payload == {"value": None} + assert result.needs_metadata is False + + def test_null_value_mutated_non_optional(self) -> None: + """None value on a non-optional attr that was mutated → empty payload (same as not mutated).""" + attr = Attribute(name="test_attr", schema=_make_schema("Text", optional=False), data={"value": "initial"}) + attr.value = None + + result = attr._generate_input_data() + + assert result.payload == {} + assert result.needs_metadata is False + + +# ────────────────────────────────────────────── +# Value resolution: strings (safe vs unsafe) +# ────────────────────────────────────────────── + + +class TestStringValues: + @pytest.mark.parametrize( + "value", + [ + pytest.param("simple", id="alphanumeric"), + pytest.param("user.name", id="dots"), + pytest.param("/opt/repos/infrahub", id="filepath"), + pytest.param("https://github.com/opsmill", id="url"), + pytest.param("", id="empty-string"), + ], + ) + def test_safe_string(self, value: str) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data=value) + + result = attr._generate_input_data() + + assert result.payload == {"value": value} + assert result.variables == {} + + @pytest.mark.parametrize( + "value", + [ + pytest.param('has "quotes"', id="quotes"), + pytest.param("has\nnewline", id="newline"), + pytest.param("special{chars}", id="braces"), + ], + ) + def test_unsafe_string_uses_variable_binding(self, value: str) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data=value) + + result = attr._generate_input_data() + + # payload["value"] should be a variable reference like "$value_" + assert "value" in result.payload + assert result.payload["value"].startswith("$value_") + # The actual string should be in variables + assert len(result.variables) == 1 + var_name = next(iter(result.variables)) + assert result.variables[var_name] == value + + +# ────────────────────────────────────────────── +# Value resolution: IP types +# ────────────────────────────────────────────── + + +class TestIPValues: + def test_ipv4_interface(self) -> None: + attr = Attribute(name="address", schema=_make_schema("IPHost"), data={"value": "10.0.0.1/24"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "10.0.0.1/24" + assert result.variables == {} + + def test_ipv6_interface(self) -> None: + attr = Attribute(name="address", schema=_make_schema("IPHost"), data={"value": "2001:db8::1/64"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "2001:db8::1/64" + + def test_ipv4_network(self) -> None: + attr = Attribute(name="network", schema=_make_schema("IPNetwork"), data={"value": "10.0.0.0/24"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "10.0.0.0/24" + + def test_ipv6_network(self) -> None: + attr = Attribute(name="network", schema=_make_schema("IPNetwork"), data={"value": "2001:db8::/32"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "2001:db8::/32" + + +# ────────────────────────────────────────────── +# Value resolution: other scalars +# ────────────────────────────────────────────── + + +class TestScalarValues: + def test_number_value(self) -> None: + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=42) + + result = attr._generate_input_data() + + assert result.payload == {"value": 42} + assert result.variables == {} + + def test_boolean_value(self) -> None: + attr = Attribute(name="enabled", schema=_make_schema("Boolean"), data=True) + + result = attr._generate_input_data() + + assert result.payload == {"value": True} + + +# ────────────────────────────────────────────── +# Property handling +# ────────────────────────────────────────────── + + +class TestProperties: + def test_no_properties_set(self) -> None: + """When no properties are set, payload only has the value.""" + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data="hello") + + result = attr._generate_input_data() + + assert result.payload == {"value": "hello"} + + def test_flag_property_is_protected(self) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data={"value": "hello", "is_protected": True}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "hello" + assert result.payload["is_protected"] is True + + def test_object_property_source(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={"value": "hello", "source": {"id": "source-uuid", "display_label": "Git", "__typename": "CoreGit"}}, + ) + + result = attr._generate_input_data() + + assert result.payload["value"] == "hello" + assert result.payload["source"] == "source-uuid" + + def test_object_property_owner(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={ + "value": "hello", + "owner": {"id": "owner-uuid", "display_label": "Admin", "__typename": "CoreAccount"}, + }, + ) + + result = attr._generate_input_data() + + assert result.payload["owner"] == "owner-uuid" + + def test_both_flag_and_object_properties(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={ + "value": "hello", + "is_protected": True, + "source": {"id": "src-uuid", "display_label": "Git", "__typename": "CoreGit"}, + }, + ) + + result = attr._generate_input_data() + + assert result.payload["value"] == "hello" + assert result.payload["is_protected"] is True + assert result.payload["source"] == "src-uuid" + + def test_properties_not_appended_for_null_value(self) -> None: + """When need_additional_properties is False (null non-mutated), properties are ignored.""" + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={ + "value": None, + "is_protected": True, + "source": {"id": "src-uuid", "display_label": "Git", "__typename": "CoreGit"}, + }, + ) + + result = attr._generate_input_data() + + # Null value, not mutated → empty payload, properties NOT appended + assert result.payload == {} + + def test_properties_appended_for_from_pool(self) -> None: + """from_pool payloads have need_additional_properties=True, so properties are included.""" + attr = Attribute( + name="vlan_id", + schema=_make_schema("Number"), + data={"from_pool": {"id": "pool-uuid"}, "is_protected": True}, + ) + + result = attr._generate_input_data() + + assert result.payload["from_pool"] == {"id": "pool-uuid"} + assert result.payload["is_protected"] is True + + +# ────────────────────────────────────────────── +# Return type: to_dict() integration +# ────────────────────────────────────────────── + + +class TestToDictIntegration: + def test_to_dict_simple_value(self) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data="hello") + + result = attr._generate_input_data().to_dict() + + assert result == {"data": {"value": "hello"}, "variables": {}} + + def test_to_dict_with_variables(self) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data='has "quotes"') + + result = attr._generate_input_data().to_dict() + + assert "data" in result + assert "variables" in result + assert len(result["variables"]) == 1 + var_name = next(iter(result["variables"])) + assert result["variables"][var_name] == 'has "quotes"' + assert result["data"]["value"] == f"${var_name}" + + +def _make_schema(kind: str = "Text", optional: bool = False) -> AttributeSchemaAPI: + return AttributeSchemaAPI(name="test_attr", kind=kind, optional=optional) + + +class _FakeNode(CoreNodeBase): + """Minimal CoreNodeBase implementation for testing.""" + + def __init__(self, node_id: str, is_pool: bool) -> None: + self.id = node_id + self._is_pool = is_pool + self._schema: Any = None + self._internal_id = "" + self.display_label = None + self.typename = None + + @property + def hfid(self) -> list[str] | None: + return None + + @property + def hfid_str(self) -> str | None: + return None + + def get_human_friendly_id(self) -> list[str] | None: + return None + + def get_human_friendly_id_as_string(self, include_kind: bool = False) -> str | None: + return None + + def get_kind(self) -> str: + return "" + + def get_all_kinds(self) -> list[str]: + return [] + + def get_branch(self) -> str: + return "" + + def is_ip_prefix(self) -> bool: + return False + + def is_ip_address(self) -> bool: + return False + + def is_resource_pool(self) -> bool: + return self._is_pool + + def get_raw_graphql_data(self) -> dict | None: + return None diff --git a/tests/unit/sdk/test_client.py b/tests/unit/sdk/test_client.py index e9cce23e..1e883f95 100644 --- a/tests/unit/sdk/test_client.py +++ b/tests/unit/sdk/test_client.py @@ -14,11 +14,9 @@ if TYPE_CHECKING: from collections.abc import Callable, Mapping from inspect import Parameter - from typing import Any from pytest_httpx import HTTPXMock - from infrahub_sdk.schema import NodeSchemaAPI from tests.unit.sdk.conftest import BothClients pytestmark = pytest.mark.httpx_mock(can_send_already_matched_responses=True) @@ -636,208 +634,6 @@ async def test_method_filters_empty( assert len(repos) == 0 -@pytest.mark.parametrize("client_type", client_types) -async def test_allocate_next_ip_address( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubIPAddressPoolGetResource": { - "ok": True, - "node": { - "id": "17da1246-54f1-a9c0-2784-179f0ec5b128", - "kind": "IpamIPAddress", - "identifier": "test", - "display_label": "192.0.2.0/32", - }, - } - } - }, - match_headers={"X-Infrahub-Tracker": "allocate-ip-loopback"}, - is_reusable=True, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPAddress": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "__typename": "IpamIPAddress", - "address": {"value": "192.0.2.0/32"}, - "description": {"value": "test"}, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, - is_reusable=True, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_address = await clients.standard.allocate_next_ip_address( - resource_pool=ip_pool, - identifier="test", - prefix_length=32, - address_type="IpamIPAddress", - data={"description": "test"}, - tracker="allocate-ip-loopback", - ) - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_address = clients.sync.allocate_next_ip_address( - resource_pool=ip_pool, - identifier="test", - prefix_length=32, - address_type="IpamIPAddress", - data={"description": "test"}, - tracker="allocate-ip-loopback", - ) - - assert ip_address - assert str(ip_address.address.value) == "192.0.2.0/32" - assert ip_address.description.value == "test" - - -@pytest.mark.parametrize("client_type", client_types) -async def test_allocate_next_ip_prefix( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipprefix_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubIPPrefixPoolGetResource": { - "ok": True, - "node": { - "id": "7d9bd8d-8fc2-70b0-278a-179f425e25cb", - "kind": "IpamIPPrefix", - "identifier": "test", - "display_label": "192.0.2.0/31", - }, - } - } - }, - match_headers={"X-Infrahub-Tracker": "allocate-ip-interco"}, - is_reusable=True, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPPrefix": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "__typename": "IpamIPPrefix", - "prefix": {"value": "192.0.2.0/31"}, - "description": {"value": "test"}, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipprefix-page1"}, - is_reusable=True, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipprefix_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core intercos", - "default_prefix_type": "IpamIPPrefix", - "default_prefix_length": 31, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_prefix = await clients.standard.allocate_next_ip_prefix( - resource_pool=ip_pool, - identifier="test", - prefix_length=31, - prefix_type="IpamIPPrefix", - data={"description": "test"}, - tracker="allocate-ip-interco", - ) - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipprefix_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core intercos", - "default_prefix_type": "IpamIPPrefix", - "default_prefix_length": 31, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_prefix = clients.sync.allocate_next_ip_prefix( - resource_pool=ip_pool, - identifier="test", - prefix_length=31, - prefix_type="IpamIPPrefix", - data={"description": "test"}, - tracker="allocate-ip-interco", - ) - - assert ip_prefix - assert str(ip_prefix.prefix.value) == "192.0.2.0/31" - assert ip_prefix.description.value == "test" - - EXPECTED_ECHO = """URL: http://mock/graphql/main QUERY: diff --git a/tests/unit/sdk/test_node.py b/tests/unit/sdk/test_node.py index 8dc18c9b..3db48edf 100644 --- a/tests/unit/sdk/test_node.py +++ b/tests/unit/sdk/test_node.py @@ -2211,289 +2211,6 @@ async def test_relationships_excluded_input_data( assert node.tags.has_update is False -@pytest.mark.parametrize("client_type", client_types) -async def test_create_input_data_with_resource_pool_relationship( - client: InfrahubClient, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - simple_device_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - if client_type == "standard": - ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - else: - ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - - assert device._generate_input_data()["data"] == { - "data": { - "name": {"value": "device-01"}, - "primary_address": {"from_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}}, - "ip_address_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}, - }, - } - - -@pytest.mark.parametrize("client_type", client_types) -async def test_create_mutation_query_with_resource_pool_relationship( - client: InfrahubClient, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - simple_device_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - if client_type == "standard": - ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - else: - ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - - assert device._generate_mutation_query() == { - "object": { - "id": None, - "primary_address": {"node": {"__typename": None, "display_label": None, "id": None}}, - "ip_address_pool": {"node": {"__typename": None, "display_label": None, "id": None}}, - }, - "ok": None, - } - - -@pytest.mark.parametrize("client_type", client_types) -async def test_get_pool_allocated_resources( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubResourcePoolAllocated": { - "count": 2, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "kind": "IpamIPAddress", - "branch": "main", - "identifier": "ip-1", - } - }, - { - "node": { - "id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - "kind": "IpamIPAddress", - "branch": "main", - "identifier": "ip-2", - } - }, - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "get-allocated-resources-page1"}, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPAddress": { - "count": 2, - "edges": [ - {"node": {"id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", "__typename": "IpamIPAddress"}}, - {"node": {"id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", "__typename": "IpamIPAddress"}}, - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - resources = await ip_pool.get_pool_allocated_resources(resource=ip_prefix) - assert len(resources) == 2 - assert [resource.id for resource in resources] == [ - "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - ] - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - resources = ip_pool.get_pool_allocated_resources(resource=ip_prefix) - assert len(resources) == 2 - assert [resource.id for resource in resources] == [ - "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - ] - - -@pytest.mark.parametrize("client_type", client_types) -async def test_get_pool_resources_utilization( - httpx_mock: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubResourcePoolUtilization": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd86-3471-a020-2782-179ff078e58f", - "utilization": 93.75, - "utilization_branches": 0, - "utilization_default_branch": 93.75, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "get-pool-utilization"}, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - utilizations = await ip_pool.get_pool_resources_utilization() - assert len(utilizations) == 1 - assert utilizations[0]["utilization"] == 93.75 - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - utilizations = ip_pool.get_pool_resources_utilization() - assert len(utilizations) == 1 - assert utilizations[0]["utilization"] == 93.75 - - @pytest.mark.parametrize("client_type", client_types) async def test_from_graphql( clients: BothClients, mock_schema_query_01: HTTPXMock, location_data01: dict[str, Any], client_type: str From dd38027052c3c78b78a7f6d68e46518b9735aeeb Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Thu, 26 Feb 2026 23:57:57 +0100 Subject: [PATCH 09/10] Adapted documentation to new rules in infrahub-develop --- .vale/styles/Infrahub/sentence-case.yml | 1 + .vale/styles/spelling-exceptions.txt | 1 + docs/AGENTS.md | 6 +++--- docs/docs/python-sdk/guides/client.mdx | 2 +- docs/docs/python-sdk/guides/python-typing.mdx | 2 +- docs/docs/python-sdk/topics/object_file.mdx | 20 +++++++++---------- 6 files changed, 17 insertions(+), 15 deletions(-) diff --git a/.vale/styles/Infrahub/sentence-case.yml b/.vale/styles/Infrahub/sentence-case.yml index 126e18f6..c27cf7a1 100644 --- a/.vale/styles/Infrahub/sentence-case.yml +++ b/.vale/styles/Infrahub/sentence-case.yml @@ -52,6 +52,7 @@ exceptions: - Jinja - Jinja2 - JWT + - MDX - Namespace - NATS - Node diff --git a/.vale/styles/spelling-exceptions.txt b/.vale/styles/spelling-exceptions.txt index 0a4c0144..ecba179f 100644 --- a/.vale/styles/spelling-exceptions.txt +++ b/.vale/styles/spelling-exceptions.txt @@ -79,6 +79,7 @@ kbps Keycloak Loopbacks markdownlint +MDX max_count memgraph menu_placement diff --git a/docs/AGENTS.md b/docs/AGENTS.md index f869e84f..02f1fc6c 100644 --- a/docs/AGENTS.md +++ b/docs/AGENTS.md @@ -1,4 +1,4 @@ -# docs/AGENTS.md +# Documentation agents Docusaurus documentation following Diataxis framework. @@ -34,12 +34,12 @@ Sidebar navigation is dynamic: `sidebars-*.ts` files read the filesystem at buil No manual sidebar update is needed when adding a new `.mdx` file. However, to control the display order of a new page, add its doc ID to the ordered list in the corresponding `sidebars-*.ts` file. -## Adding Documentation +## Adding documentation 1. Create MDX file in appropriate directory 2. Add frontmatter with `title` -## MDX Pattern +## MDX pattern Use Tabs for async/sync examples, callouts for notes: diff --git a/docs/docs/python-sdk/guides/client.mdx b/docs/docs/python-sdk/guides/client.mdx index 460036a1..90872a0f 100644 --- a/docs/docs/python-sdk/guides/client.mdx +++ b/docs/docs/python-sdk/guides/client.mdx @@ -251,7 +251,7 @@ Your client is now configured to use the specified default branch instead of `ma ## Hello world example -Let's create a simple "Hello World" example to verify your client configuration works correctly. This example will connect to your Infrahub instance and query the available accounts. +Let's create a "Hello World" example to verify your client configuration works correctly. This example will connect to your Infrahub instance and query the available accounts. 1. Create a new file called `hello_world.py`: diff --git a/docs/docs/python-sdk/guides/python-typing.mdx b/docs/docs/python-sdk/guides/python-typing.mdx index 9bc2c323..77780177 100644 --- a/docs/docs/python-sdk/guides/python-typing.mdx +++ b/docs/docs/python-sdk/guides/python-typing.mdx @@ -131,7 +131,7 @@ infrahubctl graphql generate-return-types queries/get_tags.gql ### Example workflow -1. **Create your GraphQL queries** in `.gql` files preferably in a directory (e.g., `queries/`): +1. **Create your GraphQL queries** in `.gql` files preferably in a directory (for example, `queries/`): ```graphql # queries/get_tags.gql diff --git a/docs/docs/python-sdk/topics/object_file.mdx b/docs/docs/python-sdk/topics/object_file.mdx index 751599f0..75744d89 100644 --- a/docs/docs/python-sdk/topics/object_file.mdx +++ b/docs/docs/python-sdk/topics/object_file.mdx @@ -68,13 +68,13 @@ spec: > Multiple documents in a single YAML file are also supported, each document will be loaded separately. Documents are separated by `---` -### Data Processing Parameters +### Data processing parameters The `parameters` field controls how the data in the object file is processed before loading into Infrahub: -| Parameter | Description | Default | -| -------------- | ------------------------------------------------------------------------------------------------------- | ------- | -| `expand_range` | When set to `true`, range patterns (e.g., `[1-5]`) in string fields are expanded into multiple objects. | `false` | +| Parameter | Description | Default | +| -------------- | -------------------------------------------------------------------------------------------------------------- | ------- | +| `expand_range` | When set to `true`, range patterns (for example, `[1-5]`) in string fields are expanded into multiple objects. | `false` | When `expand_range` is not specified, it defaults to `false`. @@ -208,9 +208,9 @@ Metadata support is planned for future releases. Currently, the Object file does 3. Validate object files before loading them into production environments. 4. Use comments in your YAML files to document complex relationships or dependencies. -## Range Expansion in Object Files +## Range expansion in object files -The Infrahub Python SDK supports **range expansion** for string fields in object files when the `parameters > expand_range` is set to `true`. This feature allows you to specify a range pattern (e.g., `[1-5]`) in any string value, and the SDK will automatically expand it into multiple objects during validation and processing. +The Infrahub Python SDK supports **range expansion** for string fields in object files when the `parameters > expand_range` is set to `true`. This feature allows you to specify a range pattern (for example, `[1-5]`) in any string value, and the SDK will automatically expand it into multiple objects during validation and processing. ```yaml --- @@ -225,7 +225,7 @@ spec: type: Country ``` -### How Range Expansion Works +### How range expansion works - Any string field containing a pattern like `[1-5]`, `[10-15]`, or `[1,3,5]` will be expanded into multiple objects. - If multiple fields in the same object use range expansion, **all expanded lists must have the same length**. If not, validation will fail. @@ -233,7 +233,7 @@ spec: ### Examples -#### Single Field Expansion +#### Single field expansion ```yaml spec: @@ -256,7 +256,7 @@ This will expand to: type: Country ``` -#### Multiple Field Expansion (Matching Lengths) +#### Multiple field expansion (matching lengths) ```yaml spec: @@ -283,7 +283,7 @@ This will expand to: type: Country ``` -#### Error: Mismatched Range Lengths +#### Error: mismatched range lengths If you use ranges of different lengths in multiple fields: From 52d475ea9599a06557ddd8c0b8a50bc1cc576b9c Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Fri, 27 Feb 2026 00:06:59 +0100 Subject: [PATCH 10/10] Adapted test typing of merged tests from develop --- tests/unit/sdk/pool/conftest.py | 6 +- tests/unit/sdk/pool/test_allocate.py | 19 +++--- .../sdk/pool/test_relationship_from_pool.py | 32 +++++----- .../sdk/test_attribute_generate_input_data.py | 63 +++++++++++-------- 4 files changed, 66 insertions(+), 54 deletions(-) diff --git a/tests/unit/sdk/pool/conftest.py b/tests/unit/sdk/pool/conftest.py index e8276be6..9d0fd245 100644 --- a/tests/unit/sdk/pool/conftest.py +++ b/tests/unit/sdk/pool/conftest.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import Any + import pytest from infrahub_sdk.schema import BranchSupportType, NodeSchema, NodeSchemaAPI @@ -7,7 +9,7 @@ @pytest.fixture async def ipaddress_pool_schema() -> NodeSchemaAPI: - data = { + data: dict[str, Any] = { "name": "IPAddressPool", "namespace": "Core", "description": "A pool of IP address resources", @@ -57,7 +59,7 @@ async def ipaddress_pool_schema() -> NodeSchemaAPI: @pytest.fixture async def ipprefix_pool_schema() -> NodeSchemaAPI: - data = { + data: dict[str, Any] = { "name": "IPPrefixPool", "namespace": "Core", "description": "A pool of IP prefix resources", diff --git a/tests/unit/sdk/pool/test_allocate.py b/tests/unit/sdk/pool/test_allocate.py index 1ed53500..eacc1a7b 100644 --- a/tests/unit/sdk/pool/test_allocate.py +++ b/tests/unit/sdk/pool/test_allocate.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast import pytest @@ -11,6 +11,7 @@ from pytest_httpx import HTTPXMock + from infrahub_sdk.protocols_base import CoreNode, CoreNodeSync from infrahub_sdk.schema import NodeSchemaAPI from tests.unit.sdk.conftest import BothClients @@ -83,7 +84,7 @@ async def test_allocate_next_ip_address( }, ) ip_address = await clients.standard.allocate_next_ip_address( - resource_pool=ip_pool, + resource_pool=cast("CoreNode", ip_pool), identifier="test", prefix_length=32, address_type="IpamIPAddress", @@ -105,7 +106,7 @@ async def test_allocate_next_ip_address( }, ) ip_address = clients.sync.allocate_next_ip_address( - resource_pool=ip_pool, + resource_pool=cast("CoreNodeSync", ip_pool), identifier="test", prefix_length=32, address_type="IpamIPAddress", @@ -114,8 +115,8 @@ async def test_allocate_next_ip_address( ) assert ip_address - assert str(ip_address.address.value) == "192.0.2.0/32" - assert ip_address.description.value == "test" + assert str(cast("InfrahubNodeSync", ip_address).address.value) == "192.0.2.0/32" + assert cast("InfrahubNodeSync", ip_address).description.value == "test" @pytest.mark.parametrize("client_type", client_types) @@ -184,7 +185,7 @@ async def test_allocate_next_ip_prefix( }, ) ip_prefix = await clients.standard.allocate_next_ip_prefix( - resource_pool=ip_pool, + resource_pool=cast("CoreNode", ip_pool), identifier="test", prefix_length=31, prefix_type="IpamIPPrefix", @@ -206,7 +207,7 @@ async def test_allocate_next_ip_prefix( }, ) ip_prefix = clients.sync.allocate_next_ip_prefix( - resource_pool=ip_pool, + resource_pool=cast("CoreNodeSync", ip_pool), identifier="test", prefix_length=31, prefix_type="IpamIPPrefix", @@ -215,5 +216,5 @@ async def test_allocate_next_ip_prefix( ) assert ip_prefix - assert str(ip_prefix.prefix.value) == "192.0.2.0/31" - assert ip_prefix.description.value == "test" + assert str(cast("InfrahubNodeSync", ip_prefix).prefix.value) == "192.0.2.0/31" # type: ignore[unresolved-attribute] + assert cast("InfrahubNodeSync", ip_prefix).description.value == "test" # type: ignore[unresolved-attribute] diff --git a/tests/unit/sdk/pool/test_relationship_from_pool.py b/tests/unit/sdk/pool/test_relationship_from_pool.py index 9ce543dc..f8c44c6b 100644 --- a/tests/unit/sdk/pool/test_relationship_from_pool.py +++ b/tests/unit/sdk/pool/test_relationship_from_pool.py @@ -9,15 +9,15 @@ if TYPE_CHECKING: from typing import Any - from infrahub_sdk import InfrahubClient from infrahub_sdk.schema import NodeSchemaAPI + from tests.unit.sdk.conftest import BothClients client_types = ["standard", "sync"] @pytest.mark.parametrize("client_type", client_types) async def test_create_input_data_with_resource_pool_relationship( - client: InfrahubClient, + clients: BothClients, ipaddress_pool_schema: NodeSchemaAPI, ipam_ipprefix_schema: NodeSchemaAPI, simple_device_schema: NodeSchemaAPI, @@ -25,9 +25,9 @@ async def test_create_input_data_with_resource_pool_relationship( client_type: str, ) -> None: if client_type == "standard": - ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) ip_pool = InfrahubNode( - client=client, + client=clients.standard, schema=ipaddress_pool_schema, data={ "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", @@ -39,14 +39,14 @@ async def test_create_input_data_with_resource_pool_relationship( }, ) device = InfrahubNode( - client=client, + client=clients.standard, schema=simple_device_schema, data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, ) else: - ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) ip_pool = InfrahubNodeSync( - client=client, + client=clients.sync, schema=ipaddress_pool_schema, data={ "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", @@ -58,7 +58,7 @@ async def test_create_input_data_with_resource_pool_relationship( }, ) device = InfrahubNodeSync( - client=client, + client=clients.sync, schema=simple_device_schema, data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, ) @@ -74,7 +74,7 @@ async def test_create_input_data_with_resource_pool_relationship( @pytest.mark.parametrize("client_type", client_types) async def test_create_mutation_query_with_resource_pool_relationship( - client: InfrahubClient, + clients: BothClients, ipaddress_pool_schema: NodeSchemaAPI, ipam_ipprefix_schema: NodeSchemaAPI, simple_device_schema: NodeSchemaAPI, @@ -82,9 +82,9 @@ async def test_create_mutation_query_with_resource_pool_relationship( client_type: str, ) -> None: if client_type == "standard": - ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) ip_pool = InfrahubNode( - client=client, + client=clients.standard, schema=ipaddress_pool_schema, data={ "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", @@ -96,14 +96,14 @@ async def test_create_mutation_query_with_resource_pool_relationship( }, ) device = InfrahubNode( - client=client, + client=clients.standard, schema=simple_device_schema, data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, ) else: - ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) ip_pool = InfrahubNodeSync( - client=client, + client=clients.sync, schema=ipaddress_pool_schema, data={ "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", @@ -114,8 +114,8 @@ async def test_create_mutation_query_with_resource_pool_relationship( "resources": [ip_prefix], }, ) - device = InfrahubNode( - client=client, + device = InfrahubNodeSync( + client=clients.sync, schema=simple_device_schema, data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, ) diff --git a/tests/unit/sdk/test_attribute_generate_input_data.py b/tests/unit/sdk/test_attribute_generate_input_data.py index a50a2fe9..394623fc 100644 --- a/tests/unit/sdk/test_attribute_generate_input_data.py +++ b/tests/unit/sdk/test_attribute_generate_input_data.py @@ -9,6 +9,7 @@ from infrahub_sdk.node.attribute import Attribute from infrahub_sdk.protocols_base import CoreNodeBase from infrahub_sdk.schema import AttributeSchemaAPI +from infrahub_sdk.schema.main import AttributeKind # ────────────────────────────────────────────── # Value resolution: from_pool (dict-based) @@ -18,7 +19,7 @@ class TestFromPoolDict: def test_from_pool_with_id(self) -> None: pool_data = {"id": "pool-uuid-1"} - attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": pool_data}) + attr = Attribute(name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data={"from_pool": pool_data}) result = attr._generate_input_data() @@ -27,7 +28,7 @@ def test_from_pool_with_id(self) -> None: def test_from_pool_with_id_and_identifier(self) -> None: pool_data = {"id": "pool-uuid-1", "identifier": "test"} - attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": pool_data}) + attr = Attribute(name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data={"from_pool": pool_data}) result = attr._generate_input_data() @@ -37,7 +38,7 @@ def test_from_pool_with_id_and_identifier(self) -> None: def test_from_pool_with_pool_name(self) -> None: """from_pool can be a plain string (pool name), e.g. from_pool: 'VLAN ID Pool'.""" attr = Attribute( - name="vlan_id", schema=_make_schema("Number", optional=True), data={"from_pool": "VLAN ID Pool"} + name="vlan_id", schema=_make_schema(AttributeKind.NUMBER, optional=True), data={"from_pool": "VLAN ID Pool"} ) result = attr._generate_input_data() @@ -48,7 +49,9 @@ def test_from_pool_with_pool_name(self) -> None: def test_from_pool_value_is_none(self) -> None: """from_pool pops 'from_pool' and sets Attribute.value to None; value should NOT appear in payload.""" - attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": {"id": "pool-uuid-1"}}) + attr = Attribute( + name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data={"from_pool": {"id": "pool-uuid-1"}} + ) assert attr.value is None result = attr._generate_input_data() @@ -64,7 +67,7 @@ class TestFromPoolNode: def test_pool_node_generates_from_pool(self) -> None: pool_node = _FakeNode(node_id="node-pool-uuid", is_pool=True) - attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=pool_node) + attr = Attribute(name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data=pool_node) result = attr._generate_input_data() @@ -74,7 +77,7 @@ def test_pool_node_generates_from_pool(self) -> None: def test_non_pool_node_treated_as_regular_value(self) -> None: """A CoreNodeBase that is NOT a resource pool should go through the normal value path.""" node = _FakeNode(node_id="regular-node-uuid", is_pool=False) - attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=node) + attr = Attribute(name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data=node) result = attr._generate_input_data() @@ -89,7 +92,7 @@ def test_non_pool_node_treated_as_regular_value(self) -> None: class TestNullValue: def test_null_value_not_mutated(self) -> None: """None value that was never mutated → empty payload, no properties.""" - attr = Attribute(name="test_attr", schema=_make_schema("Text"), data={"value": None}) + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data={"value": None}) result = attr._generate_input_data() @@ -99,7 +102,9 @@ def test_null_value_not_mutated(self) -> None: def test_null_value_mutated_optional(self) -> None: """None value on an optional attr that was mutated → explicit null.""" - attr = Attribute(name="test_attr", schema=_make_schema("Text", optional=True), data={"value": "initial"}) + attr = Attribute( + name="test_attr", schema=_make_schema(AttributeKind.TEXT, optional=True), data={"value": "initial"} + ) attr.value = None # triggers value_has_been_mutated result = attr._generate_input_data() @@ -109,7 +114,9 @@ def test_null_value_mutated_optional(self) -> None: def test_null_value_mutated_non_optional(self) -> None: """None value on a non-optional attr that was mutated → empty payload (same as not mutated).""" - attr = Attribute(name="test_attr", schema=_make_schema("Text", optional=False), data={"value": "initial"}) + attr = Attribute( + name="test_attr", schema=_make_schema(AttributeKind.TEXT, optional=False), data={"value": "initial"} + ) attr.value = None result = attr._generate_input_data() @@ -135,7 +142,7 @@ class TestStringValues: ], ) def test_safe_string(self, value: str) -> None: - attr = Attribute(name="test_attr", schema=_make_schema("Text"), data=value) + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data=value) result = attr._generate_input_data() @@ -151,7 +158,7 @@ def test_safe_string(self, value: str) -> None: ], ) def test_unsafe_string_uses_variable_binding(self, value: str) -> None: - attr = Attribute(name="test_attr", schema=_make_schema("Text"), data=value) + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data=value) result = attr._generate_input_data() @@ -171,7 +178,7 @@ def test_unsafe_string_uses_variable_binding(self, value: str) -> None: class TestIPValues: def test_ipv4_interface(self) -> None: - attr = Attribute(name="address", schema=_make_schema("IPHost"), data={"value": "10.0.0.1/24"}) + attr = Attribute(name="address", schema=_make_schema(AttributeKind.IPHOST), data={"value": "10.0.0.1/24"}) result = attr._generate_input_data() @@ -179,21 +186,21 @@ def test_ipv4_interface(self) -> None: assert result.variables == {} def test_ipv6_interface(self) -> None: - attr = Attribute(name="address", schema=_make_schema("IPHost"), data={"value": "2001:db8::1/64"}) + attr = Attribute(name="address", schema=_make_schema(AttributeKind.IPHOST), data={"value": "2001:db8::1/64"}) result = attr._generate_input_data() assert result.payload["value"] == "2001:db8::1/64" def test_ipv4_network(self) -> None: - attr = Attribute(name="network", schema=_make_schema("IPNetwork"), data={"value": "10.0.0.0/24"}) + attr = Attribute(name="network", schema=_make_schema(AttributeKind.IPNETWORK), data={"value": "10.0.0.0/24"}) result = attr._generate_input_data() assert result.payload["value"] == "10.0.0.0/24" def test_ipv6_network(self) -> None: - attr = Attribute(name="network", schema=_make_schema("IPNetwork"), data={"value": "2001:db8::/32"}) + attr = Attribute(name="network", schema=_make_schema(AttributeKind.IPNETWORK), data={"value": "2001:db8::/32"}) result = attr._generate_input_data() @@ -207,7 +214,7 @@ def test_ipv6_network(self) -> None: class TestScalarValues: def test_number_value(self) -> None: - attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=42) + attr = Attribute(name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data=42) result = attr._generate_input_data() @@ -215,7 +222,7 @@ def test_number_value(self) -> None: assert result.variables == {} def test_boolean_value(self) -> None: - attr = Attribute(name="enabled", schema=_make_schema("Boolean"), data=True) + attr = Attribute(name="enabled", schema=_make_schema(AttributeKind.BOOLEAN), data=True) result = attr._generate_input_data() @@ -230,14 +237,16 @@ def test_boolean_value(self) -> None: class TestProperties: def test_no_properties_set(self) -> None: """When no properties are set, payload only has the value.""" - attr = Attribute(name="test_attr", schema=_make_schema("Text"), data="hello") + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data="hello") result = attr._generate_input_data() assert result.payload == {"value": "hello"} def test_flag_property_is_protected(self) -> None: - attr = Attribute(name="test_attr", schema=_make_schema("Text"), data={"value": "hello", "is_protected": True}) + attr = Attribute( + name="test_attr", schema=_make_schema(AttributeKind.TEXT), data={"value": "hello", "is_protected": True} + ) result = attr._generate_input_data() @@ -247,7 +256,7 @@ def test_flag_property_is_protected(self) -> None: def test_object_property_source(self) -> None: attr = Attribute( name="test_attr", - schema=_make_schema("Text"), + schema=_make_schema(AttributeKind.TEXT), data={"value": "hello", "source": {"id": "source-uuid", "display_label": "Git", "__typename": "CoreGit"}}, ) @@ -259,7 +268,7 @@ def test_object_property_source(self) -> None: def test_object_property_owner(self) -> None: attr = Attribute( name="test_attr", - schema=_make_schema("Text"), + schema=_make_schema(AttributeKind.TEXT), data={ "value": "hello", "owner": {"id": "owner-uuid", "display_label": "Admin", "__typename": "CoreAccount"}, @@ -273,7 +282,7 @@ def test_object_property_owner(self) -> None: def test_both_flag_and_object_properties(self) -> None: attr = Attribute( name="test_attr", - schema=_make_schema("Text"), + schema=_make_schema(AttributeKind.TEXT), data={ "value": "hello", "is_protected": True, @@ -291,7 +300,7 @@ def test_properties_not_appended_for_null_value(self) -> None: """When need_additional_properties is False (null non-mutated), properties are ignored.""" attr = Attribute( name="test_attr", - schema=_make_schema("Text"), + schema=_make_schema(AttributeKind.TEXT), data={ "value": None, "is_protected": True, @@ -308,7 +317,7 @@ def test_properties_appended_for_from_pool(self) -> None: """from_pool payloads have need_additional_properties=True, so properties are included.""" attr = Attribute( name="vlan_id", - schema=_make_schema("Number"), + schema=_make_schema(AttributeKind.NUMBER), data={"from_pool": {"id": "pool-uuid"}, "is_protected": True}, ) @@ -325,14 +334,14 @@ def test_properties_appended_for_from_pool(self) -> None: class TestToDictIntegration: def test_to_dict_simple_value(self) -> None: - attr = Attribute(name="test_attr", schema=_make_schema("Text"), data="hello") + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data="hello") result = attr._generate_input_data().to_dict() assert result == {"data": {"value": "hello"}, "variables": {}} def test_to_dict_with_variables(self) -> None: - attr = Attribute(name="test_attr", schema=_make_schema("Text"), data='has "quotes"') + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data='has "quotes"') result = attr._generate_input_data().to_dict() @@ -344,7 +353,7 @@ def test_to_dict_with_variables(self) -> None: assert result["data"]["value"] == f"${var_name}" -def _make_schema(kind: str = "Text", optional: bool = False) -> AttributeSchemaAPI: +def _make_schema(kind: AttributeKind = AttributeKind.TEXT, optional: bool = False) -> AttributeSchemaAPI: return AttributeSchemaAPI(name="test_attr", kind=kind, optional=optional)