diff --git a/.vale/styles/Infrahub/sentence-case.yml b/.vale/styles/Infrahub/sentence-case.yml index 126e18f6..c27cf7a1 100644 --- a/.vale/styles/Infrahub/sentence-case.yml +++ b/.vale/styles/Infrahub/sentence-case.yml @@ -52,6 +52,7 @@ exceptions: - Jinja - Jinja2 - JWT + - MDX - Namespace - NATS - Node diff --git a/.vale/styles/spelling-exceptions.txt b/.vale/styles/spelling-exceptions.txt index 0a4c0144..ecba179f 100644 --- a/.vale/styles/spelling-exceptions.txt +++ b/.vale/styles/spelling-exceptions.txt @@ -79,6 +79,7 @@ kbps Keycloak Loopbacks markdownlint +MDX max_count memgraph menu_placement diff --git a/AGENTS.md b/AGENTS.md index 00de5ab1..7abf694f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -7,7 +7,7 @@ Infrahub Python SDK - async/sync client for Infrahub infrastructure management. ```bash uv sync --all-groups --all-extras # Install all deps uv run invoke format # Format code -uv run invoke lint # All linters (code + yamllint + documentation) +uv run invoke lint # Full pipeline: ruff, yamllint, ty, mypy, markdownlint, vale uv run invoke lint-code # All linters for Python code uv run pytest tests/unit/ # Unit tests uv run pytest tests/integration/ # Integration tests diff --git a/changelog/497.fixed.md b/changelog/497.fixed.md new file mode 100644 index 00000000..b32323d1 --- /dev/null +++ b/changelog/497.fixed.md @@ -0,0 +1 @@ +Fixed Python SDK query generation regarding from_pool generated attribute value diff --git a/dev/commands/feedback.md b/dev/commands/feedback.md new file mode 100644 index 00000000..b89896be --- /dev/null +++ b/dev/commands/feedback.md @@ -0,0 +1,92 @@ +# Session Feedback + +Analyze this conversation and identify what documentation or context was missing, incomplete, or incorrect. The goal is to continuously improve the project's knowledge base so future conversations are more efficient. + +## Step 1: Session Analysis + +Reflect on the work done in this conversation. For each area, identify friction points: + +1. **Exploration overhead**: What parts of the codebase did you have to discover by searching that should have been documented? (e.g., patterns, conventions, module responsibilities) +2. **Wrong assumptions**: Did you make incorrect assumptions due to missing or misleading documentation? +3. **Repeated patterns**: Did you discover recurring patterns or conventions that aren't documented anywhere? +4. **Missing context**: What background knowledge would have helped you start faster? (e.g., architecture decisions, data flow, naming conventions) +5. **Tooling gaps**: Were there commands, scripts, or workflows that you had to figure out? + +## Step 2: Documentation Audit + +For each friction point identified, determine the appropriate fix. Check the existing documentation to avoid duplicating what's already there: + +- `AGENTS.md` — Top-level project instructions and component map +- `CLAUDE.md` — Entry point referencing AGENTS.md +- `docs/AGENTS.md` — Documentation site guide +- `infrahub_sdk/ctl/AGENTS.md` — CLI development guide +- `infrahub_sdk/pytest_plugin/AGENTS.md` — Pytest plugin guide +- `tests/AGENTS.md` — Testing guide + +Read the relevant existing files to understand what's already documented before proposing changes. + +## Step 3: Generate Report + +Present the feedback as a structured report with the following sections. Only include sections that have content — skip empty sections. + +### Format + +```markdown +## Session Feedback Report + +### What I Was Working On + + +### Documentation Gaps + + +For each gap: + +- **Topic**: What's missing +- **Where**: Which file should contain this (existing file to update, or new file to create) +- **Why**: How this would have helped during this conversation +- **Suggested content**: A draft of what should be added (be specific and actionable) + +### Documentation Corrections + + +For each correction: + +- **File**: Path to the file +- **Issue**: What's wrong or misleading +- **Fix**: What it should say instead + +### Discovered Patterns + + +For each pattern: + +- **Pattern**: Description of the convention +- **Evidence**: Where in the code this pattern is used (file paths) +- **Where to document**: Which AGENTS.md or guide file should capture this + +### Memory Updates + + +For each update: + +- **Action**: Add / Update / Remove +- **Content**: What to write +- **Reason**: Why this is worth remembering across sessions +``` + +## Step 4: Apply Changes + +After presenting the report, ask the user which changes they want to apply. Present the options: + +1. **Apply all** — Create/update all proposed documentation files and memory +2. **Cherry-pick** — Let the user select which changes to apply +3. **None** — Just keep the report as reference, don't modify any files + + +For approved changes: + +- Edit existing files when updating documentation +- Create new files only when no appropriate existing file exists +- Update `MEMORY.md` with approved memory changes +- Keep all changes minimal and focused — don't over-document diff --git a/dev/commands/pre-ci.md b/dev/commands/pre-ci.md new file mode 100644 index 00000000..7f14d70d --- /dev/null +++ b/dev/commands/pre-ci.md @@ -0,0 +1,36 @@ +Run a subset of fast CI checks locally. These are lightweight validations that catch common issues before pushing. Run all steps and report a summary at the end. + +## Steps + +1. **Format** Python code: + ```bash + uv run invoke format + ``` + +2. **Lint** (YAML, Ruff, ty, mypy, markdownlint, vale): + ```bash + uv run invoke lint + ``` + +3. **Python unit tests**: + ```bash + uv run pytest tests/unit/ + ``` + +4. **Docs unit tests** (vitest): + ```bash + (cd docs && npx --no-install vitest run) + ``` + +5. **Validate generated documentation** (regenerate and check for drift): + ```bash + uv run invoke docs-validate + ``` + +## Instructions + +- Run each step in order using the Bash tool. +- If a step fails, continue with the remaining steps. +- At the end, print a summary table of all steps with pass/fail status. +- Do NOT commit or push anything. + diff --git a/docs/AGENTS.md b/docs/AGENTS.md index f869e84f..02f1fc6c 100644 --- a/docs/AGENTS.md +++ b/docs/AGENTS.md @@ -1,4 +1,4 @@ -# docs/AGENTS.md +# Documentation agents Docusaurus documentation following Diataxis framework. @@ -34,12 +34,12 @@ Sidebar navigation is dynamic: `sidebars-*.ts` files read the filesystem at buil No manual sidebar update is needed when adding a new `.mdx` file. However, to control the display order of a new page, add its doc ID to the ordered list in the corresponding `sidebars-*.ts` file. -## Adding Documentation +## Adding documentation 1. Create MDX file in appropriate directory 2. Add frontmatter with `title` -## MDX Pattern +## MDX pattern Use Tabs for async/sync examples, callouts for notes: diff --git a/docs/docs/python-sdk/guides/client.mdx b/docs/docs/python-sdk/guides/client.mdx index 460036a1..90872a0f 100644 --- a/docs/docs/python-sdk/guides/client.mdx +++ b/docs/docs/python-sdk/guides/client.mdx @@ -251,7 +251,7 @@ Your client is now configured to use the specified default branch instead of `ma ## Hello world example -Let's create a simple "Hello World" example to verify your client configuration works correctly. This example will connect to your Infrahub instance and query the available accounts. +Let's create a "Hello World" example to verify your client configuration works correctly. This example will connect to your Infrahub instance and query the available accounts. 1. Create a new file called `hello_world.py`: diff --git a/docs/docs/python-sdk/guides/python-typing.mdx b/docs/docs/python-sdk/guides/python-typing.mdx index 9bc2c323..77780177 100644 --- a/docs/docs/python-sdk/guides/python-typing.mdx +++ b/docs/docs/python-sdk/guides/python-typing.mdx @@ -131,7 +131,7 @@ infrahubctl graphql generate-return-types queries/get_tags.gql ### Example workflow -1. **Create your GraphQL queries** in `.gql` files preferably in a directory (e.g., `queries/`): +1. **Create your GraphQL queries** in `.gql` files preferably in a directory (for example, `queries/`): ```graphql # queries/get_tags.gql diff --git a/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx b/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx index a7b82ecb..d08c7fc5 100644 --- a/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx +++ b/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx @@ -24,3 +24,15 @@ value(self) -> Any ```python value(self, value: Any) -> None ``` + +#### `is_from_pool_attribute` + +```python +is_from_pool_attribute(self) -> bool +``` + +Check whether this attribute's value is sourced from a resource pool. + +**Returns:** + +- True if the attribute value is a resource pool node or was explicitly allocated from a pool. diff --git a/docs/docs/python-sdk/topics/object_file.mdx b/docs/docs/python-sdk/topics/object_file.mdx index 751599f0..75744d89 100644 --- a/docs/docs/python-sdk/topics/object_file.mdx +++ b/docs/docs/python-sdk/topics/object_file.mdx @@ -68,13 +68,13 @@ spec: > Multiple documents in a single YAML file are also supported, each document will be loaded separately. Documents are separated by `---` -### Data Processing Parameters +### Data processing parameters The `parameters` field controls how the data in the object file is processed before loading into Infrahub: -| Parameter | Description | Default | -| -------------- | ------------------------------------------------------------------------------------------------------- | ------- | -| `expand_range` | When set to `true`, range patterns (e.g., `[1-5]`) in string fields are expanded into multiple objects. | `false` | +| Parameter | Description | Default | +| -------------- | -------------------------------------------------------------------------------------------------------------- | ------- | +| `expand_range` | When set to `true`, range patterns (for example, `[1-5]`) in string fields are expanded into multiple objects. | `false` | When `expand_range` is not specified, it defaults to `false`. @@ -208,9 +208,9 @@ Metadata support is planned for future releases. Currently, the Object file does 3. Validate object files before loading them into production environments. 4. Use comments in your YAML files to document complex relationships or dependencies. -## Range Expansion in Object Files +## Range expansion in object files -The Infrahub Python SDK supports **range expansion** for string fields in object files when the `parameters > expand_range` is set to `true`. This feature allows you to specify a range pattern (e.g., `[1-5]`) in any string value, and the SDK will automatically expand it into multiple objects during validation and processing. +The Infrahub Python SDK supports **range expansion** for string fields in object files when the `parameters > expand_range` is set to `true`. This feature allows you to specify a range pattern (for example, `[1-5]`) in any string value, and the SDK will automatically expand it into multiple objects during validation and processing. ```yaml --- @@ -225,7 +225,7 @@ spec: type: Country ``` -### How Range Expansion Works +### How range expansion works - Any string field containing a pattern like `[1-5]`, `[10-15]`, or `[1,3,5]` will be expanded into multiple objects. - If multiple fields in the same object use range expansion, **all expanded lists must have the same length**. If not, validation will fail. @@ -233,7 +233,7 @@ spec: ### Examples -#### Single Field Expansion +#### Single field expansion ```yaml spec: @@ -256,7 +256,7 @@ This will expand to: type: Country ``` -#### Multiple Field Expansion (Matching Lengths) +#### Multiple field expansion (matching lengths) ```yaml spec: @@ -283,7 +283,7 @@ This will expand to: type: Country ``` -#### Error: Mismatched Range Lengths +#### Error: mismatched range lengths If you use ranges of different lengths in multiple fields: diff --git a/infrahub_sdk/node/attribute.py b/infrahub_sdk/node/attribute.py index 8043d567..54dd99aa 100644 --- a/infrahub_sdk/node/attribute.py +++ b/infrahub_sdk/node/attribute.py @@ -2,7 +2,7 @@ import ipaddress from collections.abc import Callable -from typing import TYPE_CHECKING, Any, get_args +from typing import TYPE_CHECKING, Any, NamedTuple, get_args from ..protocols_base import CoreNodeBase from ..uuidt import UUIDT @@ -13,6 +13,33 @@ from ..schema import AttributeSchemaAPI +class _GraphQLPayloadAttribute(NamedTuple): + """Result of resolving an attribute value for a GraphQL mutation. + + Attributes: + payload: Key/value entries to include in the mutation payload + (e.g. ``{"value": ...}`` or ``{"from_pool": ...}``). + variables: GraphQL variable bindings for unsafe string values. + needs_metadata: When ``True``, the payload needs to append property flags/objects + """ + + payload: dict[str, Any] + variables: dict[str, Any] + needs_metadata: bool + + def to_dict(self) -> dict[str, Any]: + return {"data": self.payload, "variables": self.variables} + + def add_properties(self, properties_flag: dict[str, Any], properties_object: dict[str, str | None]) -> None: + if not self.needs_metadata: + return + for prop_name, prop in properties_flag.items(): + self.payload[prop_name] = prop + + for prop_name, prop in properties_object.items(): + self.payload[prop_name] = prop + + class Attribute: """Represents an attribute of a Node, including its schema, value, and properties.""" @@ -25,8 +52,12 @@ def __init__(self, name: str, schema: AttributeSchemaAPI, data: Any | dict) -> N """ self.name = name self._schema = schema + self._from_pool: dict[str, Any] | None = None - if not isinstance(data, dict) or "value" not in data: + if isinstance(data, dict) and "from_pool" in data: + self._from_pool = data.pop("from_pool") + data.setdefault("value", None) + elif not isinstance(data, dict) or "value" not in data: data = {"value": data} self._properties_flag = PROPERTIES_FLAG @@ -76,38 +107,55 @@ def value(self, value: Any) -> None: self._value = value self.value_has_been_mutated = True - def _generate_input_data(self) -> dict | None: - data: dict[str, Any] = {} - variables: dict[str, Any] = {} - - if self.value is None: - if self._schema.optional and self.value_has_been_mutated: - data["value"] = None - return data - - if isinstance(self.value, str): - if SAFE_VALUE.match(self.value): - data["value"] = self.value - else: - var_name = f"value_{UUIDT.new().hex}" - variables[var_name] = self.value - data["value"] = f"${var_name}" - elif isinstance(self.value, get_args(IP_TYPES)): - data["value"] = self.value.with_prefixlen - elif isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): - data["from_pool"] = {"id": self.value.id} - else: - data["value"] = self.value - - for prop_name in self._properties_flag: - if getattr(self, prop_name) is not None: - data[prop_name] = getattr(self, prop_name) + def _initialize_graphql_payload(self) -> _GraphQLPayloadAttribute: + """Resolve the attribute value into a GraphQL mutation payload object.""" - for prop_name in self._properties_object: - if getattr(self, prop_name) is not None: - data[prop_name] = getattr(self, prop_name)._generate_input_data() + # Pool-based allocation (dict data or resource-pool node) + if self._from_pool is not None: + return _GraphQLPayloadAttribute(payload={"from_pool": self._from_pool}, variables={}, needs_metadata=True) + if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): + return _GraphQLPayloadAttribute( + payload={"from_pool": {"id": self.value.id}}, variables={}, needs_metadata=True + ) - return {"data": data, "variables": variables} + # Null value + if self.value is None: + data = {"value": None} if (self._schema.optional and self.value_has_been_mutated) else {} + return _GraphQLPayloadAttribute(payload=data, variables={}, needs_metadata=False) + + # Unsafe strings need a variable binding to avoid injection + if isinstance(self.value, str) and not SAFE_VALUE.match(self.value): + var_name = f"value_{UUIDT.new().hex}" + return _GraphQLPayloadAttribute( + payload={"value": f"${var_name}"}, + variables={var_name: self.value}, + needs_metadata=True, + ) + + # Safe strings, IP types, and everything else + value = self.value.with_prefixlen if isinstance(self.value, get_args(IP_TYPES)) else self.value + return _GraphQLPayloadAttribute(payload={"value": value}, variables={}, needs_metadata=True) + + def _generate_input_data(self) -> _GraphQLPayloadAttribute: + """Build the input payload for a GraphQL mutation on this attribute. + + Returns a ResolvedValue object, which contains all the data required. + """ + graphql_payload = self._initialize_graphql_payload() + + properties_flag: dict[str, Any] = { + property_name: getattr(self, property_name) + for property_name in self._properties_flag + if getattr(self, property_name) is not None + } + properties_object: dict[str, str | None] = { + property_name: getattr(self, property_name)._generate_input_data() + for property_name in self._properties_object + if getattr(self, property_name) is not None + } + graphql_payload.add_properties(properties_flag, properties_object) + + return graphql_payload def _generate_query_data(self, property: bool = False, include_metadata: bool = False) -> dict | None: data: dict[str, Any] = {"value": None} @@ -128,7 +176,15 @@ def _generate_query_data(self, property: bool = False, include_metadata: bool = return data def _generate_mutation_query(self) -> dict[str, Any]: - if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): + if self.is_from_pool_attribute(): # If it points to a pool, ask for the value of the pool allocated resource return {self.name: {"value": None}} return {} + + def is_from_pool_attribute(self) -> bool: + """Check whether this attribute's value is sourced from a resource pool. + + Returns: + True if the attribute value is a resource pool node or was explicitly allocated from a pool. + """ + return (isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool()) or self._from_pool is not None diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index bbc14304..a47209dc 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -286,7 +286,7 @@ def _get_file_for_upload_sync(self) -> PreparedFile: def get_raw_graphql_data(self) -> dict | None: return self._data - def _generate_input_data( # noqa: C901, PLR0915 + def _generate_input_data( # noqa: C901 self, exclude_unmodified: bool = False, exclude_hfid: bool = False, @@ -298,27 +298,18 @@ def _generate_input_data( # noqa: C901, PLR0915 dict[str, Dict]: Representation of an input data in dict format """ - data = {} - variables = {} + data: dict[str, Any] = {} + variables: dict[str, Any] = {} for item_name in self._attributes: attr: Attribute = getattr(self, item_name) if attr._schema.read_only: continue - attr_data = attr._generate_input_data() - - # NOTE, this code has been inherited when we splitted attributes and relationships - # into 2 loops, most likely it's possible to simply it - if attr_data and isinstance(attr_data, dict): - if variable_values := attr_data.get("data"): - data[item_name] = variable_values - else: - data[item_name] = attr_data - if variable_names := attr_data.get("variables"): - variables.update(variable_names) - - elif attr_data and isinstance(attr_data, list): - data[item_name] = attr_data + graphql_payload = attr._generate_input_data() + if graphql_payload.payload: + data[item_name] = graphql_payload.payload + if graphql_payload.variables: + variables.update(graphql_payload.variables) for item_name in self._relationships: allocate_from_pool = False @@ -1124,11 +1115,7 @@ async def _process_mutation_result( for attr_name in self._attributes: attr = getattr(self, attr_name) - if ( - attr_name not in object_response - or not isinstance(attr.value, InfrahubNodeBase) - or not attr.value.is_resource_pool() - ): + if attr_name not in object_response or not attr.is_from_pool_attribute(): continue # Process allocated resource from a pool and update attribute @@ -2011,11 +1998,7 @@ def _process_mutation_result( for attr_name in self._attributes: attr = getattr(self, attr_name) - if ( - attr_name not in object_response - or not isinstance(attr.value, InfrahubNodeBase) - or not attr.value.is_resource_pool() - ): + if attr_name not in object_response or not attr.is_from_pool_attribute(): continue # Process allocated resource from a pool and update attribute diff --git a/tests/AGENTS.md b/tests/AGENTS.md index f3608ead..cce67364 100644 --- a/tests/AGENTS.md +++ b/tests/AGENTS.md @@ -17,6 +17,12 @@ uv run pytest tests/unit/test_client.py # Single file ```text tests/ ├── unit/ # Fast, mocked, no external deps +│ ├── ctl/ # CLI command tests +│ └── sdk/ # SDK tests +│ ├── pool/ # Resource pool allocation tests +│ ├── spec/ # Object spec tests +│ ├── checks/ # InfrahubCheck tests +│ └── ... # Core SDK tests (client, node, schema, etc.) ├── integration/ # Real Infrahub via testcontainers ├── fixtures/ # Test data (JSON, YAML) └── helpers/ # Test utilities diff --git a/tests/unit/sdk/conftest.py b/tests/unit/sdk/conftest.py index 3c3fe933..ad725532 100644 --- a/tests/unit/sdk/conftest.py +++ b/tests/unit/sdk/conftest.py @@ -1015,115 +1015,6 @@ async def ipam_ipprefix_data() -> dict[str, Any]: } -@pytest.fixture -async def ipaddress_pool_schema() -> NodeSchemaAPI: - data = { - "name": "IPAddressPool", - "namespace": "Core", - "description": "A pool of IP address resources", - "label": "IP Address Pool", - "default_filter": "name__value", - "order_by": ["name__value"], - "display_labels": ["name__value"], - "include_in_menu": False, - "branch": BranchSupportType.AGNOSTIC.value, - "inherit_from": ["CoreResourcePool"], - "attributes": [ - { - "name": "default_address_type", - "kind": "Text", - "optional": False, - "description": "The object type to create when reserving a resource in the pool", - }, - { - "name": "default_prefix_length", - "kind": "Number", - "optional": True, - }, - ], - "relationships": [ - { - "name": "resources", - "peer": "BuiltinIPPrefix", - "kind": "Attribute", - "identifier": "ipaddresspool__resource", - "cardinality": "many", - "optional": False, - "order_weight": 4000, - }, - { - "name": "ip_namespace", - "peer": "BuiltinIPNamespace", - "kind": "Attribute", - "identifier": "ipaddresspool__ipnamespace", - "cardinality": "one", - "optional": False, - "order_weight": 5000, - }, - ], - } - return NodeSchema(**data).convert_api() - - -@pytest.fixture -async def ipprefix_pool_schema() -> NodeSchemaAPI: - data = { - "name": "IPPrefixPool", - "namespace": "Core", - "description": "A pool of IP prefix resources", - "label": "IP Prefix Pool", - "include_in_menu": False, - "branch": BranchSupportType.AGNOSTIC.value, - "inherit_from": ["CoreResourcePool"], - "attributes": [ - { - "name": "default_prefix_length", - "kind": "Number", - "description": "The default prefix length as an integer for prefixes allocated from this pool.", - "optional": True, - "order_weight": 5000, - }, - { - "name": "default_member_type", - "kind": "Text", - "enum": ["prefix", "address"], - "default_value": "prefix", - "optional": True, - "order_weight": 3000, - }, - { - "name": "default_prefix_type", - "kind": "Text", - "optional": True, - "order_weight": 4000, - }, - ], - "relationships": [ - { - "name": "resources", - "peer": "BuiltinIPPrefix", - "kind": "Attribute", - "identifier": "prefixpool__resource", - "cardinality": "many", - "branch": BranchSupportType.AGNOSTIC.value, - "optional": False, - "order_weight": 6000, - }, - { - "name": "ip_namespace", - "peer": "BuiltinIPNamespace", - "kind": "Attribute", - "identifier": "prefixpool__ipnamespace", - "cardinality": "one", - "branch": BranchSupportType.AGNOSTIC.value, - "optional": False, - "order_weight": 7000, - }, - ], - } - return NodeSchema(**data).convert_api() - - @pytest.fixture async def address_schema() -> NodeSchemaAPI: data = { @@ -2691,3 +2582,23 @@ async def non_file_object_schema() -> NodeSchemaAPI: "relationships": [], } return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def vlan_schema() -> NodeSchemaAPI: + data = { + "name": "VLAN", + "namespace": "Infra", + "label": "VLAN", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "attributes": [ + {"name": "name", "kind": "Text", "unique": True}, + {"name": "vlan_id", "kind": "Number"}, + {"name": "role", "kind": "Text", "optional": True}, + {"name": "status", "kind": "Text", "optional": True}, + ], + "relationships": [], + } + return NodeSchema(**data).convert_api() diff --git a/tests/unit/sdk/pool/__init__.py b/tests/unit/sdk/pool/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/sdk/pool/conftest.py b/tests/unit/sdk/pool/conftest.py new file mode 100644 index 00000000..9d0fd245 --- /dev/null +++ b/tests/unit/sdk/pool/conftest.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +from typing import Any + +import pytest + +from infrahub_sdk.schema import BranchSupportType, NodeSchema, NodeSchemaAPI + + +@pytest.fixture +async def ipaddress_pool_schema() -> NodeSchemaAPI: + data: dict[str, Any] = { + "name": "IPAddressPool", + "namespace": "Core", + "description": "A pool of IP address resources", + "label": "IP Address Pool", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "include_in_menu": False, + "branch": BranchSupportType.AGNOSTIC.value, + "inherit_from": ["CoreResourcePool"], + "attributes": [ + { + "name": "default_address_type", + "kind": "Text", + "optional": False, + "description": "The object type to create when reserving a resource in the pool", + }, + { + "name": "default_prefix_length", + "kind": "Number", + "optional": True, + }, + ], + "relationships": [ + { + "name": "resources", + "peer": "BuiltinIPPrefix", + "kind": "Attribute", + "identifier": "ipaddresspool__resource", + "cardinality": "many", + "optional": False, + "order_weight": 4000, + }, + { + "name": "ip_namespace", + "peer": "BuiltinIPNamespace", + "kind": "Attribute", + "identifier": "ipaddresspool__ipnamespace", + "cardinality": "one", + "optional": False, + "order_weight": 5000, + }, + ], + } + return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def ipprefix_pool_schema() -> NodeSchemaAPI: + data: dict[str, Any] = { + "name": "IPPrefixPool", + "namespace": "Core", + "description": "A pool of IP prefix resources", + "label": "IP Prefix Pool", + "include_in_menu": False, + "branch": BranchSupportType.AGNOSTIC.value, + "inherit_from": ["CoreResourcePool"], + "attributes": [ + { + "name": "default_prefix_length", + "kind": "Number", + "description": "The default prefix length as an integer for prefixes allocated from this pool.", + "optional": True, + "order_weight": 5000, + }, + { + "name": "default_member_type", + "kind": "Text", + "enum": ["prefix", "address"], + "default_value": "prefix", + "optional": True, + "order_weight": 3000, + }, + { + "name": "default_prefix_type", + "kind": "Text", + "optional": True, + "order_weight": 4000, + }, + ], + "relationships": [ + { + "name": "resources", + "peer": "BuiltinIPPrefix", + "kind": "Attribute", + "identifier": "prefixpool__resource", + "cardinality": "many", + "branch": BranchSupportType.AGNOSTIC.value, + "optional": False, + "order_weight": 6000, + }, + { + "name": "ip_namespace", + "peer": "BuiltinIPNamespace", + "kind": "Attribute", + "identifier": "prefixpool__ipnamespace", + "cardinality": "one", + "branch": BranchSupportType.AGNOSTIC.value, + "optional": False, + "order_weight": 7000, + }, + ], + } + return NodeSchema(**data).convert_api() diff --git a/tests/unit/sdk/pool/test_allocate.py b/tests/unit/sdk/pool/test_allocate.py new file mode 100644 index 00000000..eacc1a7b --- /dev/null +++ b/tests/unit/sdk/pool/test_allocate.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, cast + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from pytest_httpx import HTTPXMock + + from infrahub_sdk.protocols_base import CoreNode, CoreNodeSync + from infrahub_sdk.schema import NodeSchemaAPI + from tests.unit.sdk.conftest import BothClients + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_allocate_next_ip_address( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubIPAddressPoolGetResource": { + "ok": True, + "node": { + "id": "17da1246-54f1-a9c0-2784-179f0ec5b128", + "kind": "IpamIPAddress", + "identifier": "test", + "display_label": "192.0.2.0/32", + }, + } + } + }, + match_headers={"X-Infrahub-Tracker": "allocate-ip-loopback"}, + is_reusable=True, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPAddress": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "__typename": "IpamIPAddress", + "address": {"value": "192.0.2.0/32"}, + "description": {"value": "test"}, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, + is_reusable=True, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_address = await clients.standard.allocate_next_ip_address( + resource_pool=cast("CoreNode", ip_pool), + identifier="test", + prefix_length=32, + address_type="IpamIPAddress", + data={"description": "test"}, + tracker="allocate-ip-loopback", + ) + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_address = clients.sync.allocate_next_ip_address( + resource_pool=cast("CoreNodeSync", ip_pool), + identifier="test", + prefix_length=32, + address_type="IpamIPAddress", + data={"description": "test"}, + tracker="allocate-ip-loopback", + ) + + assert ip_address + assert str(cast("InfrahubNodeSync", ip_address).address.value) == "192.0.2.0/32" + assert cast("InfrahubNodeSync", ip_address).description.value == "test" + + +@pytest.mark.parametrize("client_type", client_types) +async def test_allocate_next_ip_prefix( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipprefix_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubIPPrefixPoolGetResource": { + "ok": True, + "node": { + "id": "7d9bd8d-8fc2-70b0-278a-179f425e25cb", + "kind": "IpamIPPrefix", + "identifier": "test", + "display_label": "192.0.2.0/31", + }, + } + } + }, + match_headers={"X-Infrahub-Tracker": "allocate-ip-interco"}, + is_reusable=True, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPPrefix": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "__typename": "IpamIPPrefix", + "prefix": {"value": "192.0.2.0/31"}, + "description": {"value": "test"}, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipprefix-page1"}, + is_reusable=True, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipprefix_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core intercos", + "default_prefix_type": "IpamIPPrefix", + "default_prefix_length": 31, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_prefix = await clients.standard.allocate_next_ip_prefix( + resource_pool=cast("CoreNode", ip_pool), + identifier="test", + prefix_length=31, + prefix_type="IpamIPPrefix", + data={"description": "test"}, + tracker="allocate-ip-interco", + ) + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipprefix_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core intercos", + "default_prefix_type": "IpamIPPrefix", + "default_prefix_length": 31, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_prefix = clients.sync.allocate_next_ip_prefix( + resource_pool=cast("CoreNodeSync", ip_pool), + identifier="test", + prefix_length=31, + prefix_type="IpamIPPrefix", + data={"description": "test"}, + tracker="allocate-ip-interco", + ) + + assert ip_prefix + assert str(cast("InfrahubNodeSync", ip_prefix).prefix.value) == "192.0.2.0/31" # type: ignore[unresolved-attribute] + assert cast("InfrahubNodeSync", ip_prefix).description.value == "test" # type: ignore[unresolved-attribute] diff --git a/tests/unit/sdk/pool/test_attribute_from_pool.py b/tests/unit/sdk/pool/test_attribute_from_pool.py new file mode 100644 index 00000000..75d63f6d --- /dev/null +++ b/tests/unit/sdk/pool/test_attribute_from_pool.py @@ -0,0 +1,204 @@ +""" +When using from_pool on a number attribute (e.g. vlan_id), the SDK should generate: + vlan_id: { from_pool: { id: "...", identifier: "..." } } + +There are two ways to request a pool allocation: +1. Dict-based: {"from_pool": {"id": "...", "identifier": "..."}} +2. Node-based: pass an InfrahubNode pool object as the attribute value +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from infrahub_sdk import InfrahubClient, InfrahubClientSync + from infrahub_sdk.schema import NodeSchemaAPI + + +POOL_ID = "185b9728-1b76-dda7-d13d-106529b1bcd9" + + +# ────────────────────────────────────────────── +# Dict-based from_pool - async client +# ────────────────────────────────────────────── + + +async def test_number_attribute_from_pool_with_identifier( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """A number attribute with from_pool and identifier should NOT be wrapped in value.""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + "role": "user", + "status": "active", + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["role"] == {"value": "user"} + assert input_data["status"] == {"value": "active"} + assert input_data["vlan_id"] == {"from_pool": {"id": POOL_ID, "identifier": "test"}} + assert "value" not in input_data["vlan_id"] + + +async def test_number_attribute_regular_value( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """Regular number values should still be wrapped in value as before.""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": 100, + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["vlan_id"] == {"value": 100} + + +async def test_number_attribute_from_pool_mutation_query( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """A from_pool dict attribute should request value back in the mutation query.""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + mutation_query = node._generate_mutation_query() + + assert mutation_query["object"]["vlan_id"] == {"value": None} + + +# ────────────────────────────────────────────── +# Dict-based from_pool - sync client +# ────────────────────────────────────────────── + + +async def test_sync_number_attribute_from_pool_with_identifier( + client_sync: InfrahubClientSync, + vlan_schema: NodeSchemaAPI, +) -> None: + """A number attribute with from_pool and identifier should NOT be wrapped in value (sync client).""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + "role": "user", + "status": "active", + } + node = InfrahubNodeSync(client=client_sync, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["role"] == {"value": "user"} + assert input_data["status"] == {"value": "active"} + assert input_data["vlan_id"] == {"from_pool": {"id": POOL_ID, "identifier": "test"}} + assert "value" not in input_data["vlan_id"] + + +async def test_sync_number_attribute_regular_value( + client_sync: InfrahubClientSync, + vlan_schema: NodeSchemaAPI, +) -> None: + """Regular number values should still be wrapped in value as before (sync client).""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": 100, + } + node = InfrahubNodeSync(client=client_sync, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["vlan_id"] == {"value": 100} + + +# ────────────────────────────────────────────── +# Node-based from_pool - async client +# ────────────────────────────────────────────── + +NODE_POOL_ID = "185b9728-1b56-dda7-d13d-106535b1bcd9" + + +async def test_attribute_with_pool_node_generates_from_pool( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], +) -> None: + """When an attribute value is a CoreNodeBase pool node, _generate_input_data should produce from_pool.""" + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": NODE_POOL_ID, + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + vlan = InfrahubNode( + client=client, + schema=vlan_schema, + data={"name": "Example VLAN", "vlan_id": ip_pool}, + ) + + # Act + input_data = vlan._generate_input_data()["data"]["data"] + + assert input_data["vlan_id"] == {"from_pool": {"id": NODE_POOL_ID}} + assert "value" not in input_data["vlan_id"] + + +async def test_attribute_with_pool_node_generates_mutation_query( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], +) -> None: + """When an attribute value is a CoreNodeBase pool node, _generate_mutation_query should request value back.""" + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": NODE_POOL_ID, + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + vlan = InfrahubNode( + client=client, + schema=vlan_schema, + data={"name": "Example VLAN", "vlan_id": ip_pool}, + ) + + # Act + mutation_query = vlan._generate_mutation_query() + + assert mutation_query["object"]["vlan_id"] == {"value": None} diff --git a/tests/unit/sdk/pool/test_pool_queries.py b/tests/unit/sdk/pool/test_pool_queries.py new file mode 100644 index 00000000..4f27cba7 --- /dev/null +++ b/tests/unit/sdk/pool/test_pool_queries.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from pytest_httpx import HTTPXMock + + from infrahub_sdk.schema import NodeSchemaAPI + from tests.unit.sdk.conftest import BothClients + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_get_pool_allocated_resources( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubResourcePoolAllocated": { + "count": 2, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "kind": "IpamIPAddress", + "branch": "main", + "identifier": "ip-1", + } + }, + { + "node": { + "id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + "kind": "IpamIPAddress", + "branch": "main", + "identifier": "ip-2", + } + }, + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "get-allocated-resources-page1"}, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPAddress": { + "count": 2, + "edges": [ + {"node": {"id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", "__typename": "IpamIPAddress"}}, + {"node": {"id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", "__typename": "IpamIPAddress"}}, + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + resources = await ip_pool.get_pool_allocated_resources(resource=ip_prefix) + assert len(resources) == 2 + assert [resource.id for resource in resources] == [ + "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + ] + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + resources = ip_pool.get_pool_allocated_resources(resource=ip_prefix) + assert len(resources) == 2 + assert [resource.id for resource in resources] == [ + "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + ] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_get_pool_resources_utilization( + httpx_mock: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubResourcePoolUtilization": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd86-3471-a020-2782-179ff078e58f", + "utilization": 93.75, + "utilization_branches": 0, + "utilization_default_branch": 93.75, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "get-pool-utilization"}, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + utilizations = await ip_pool.get_pool_resources_utilization() + assert len(utilizations) == 1 + assert utilizations[0]["utilization"] == 93.75 + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + utilizations = ip_pool.get_pool_resources_utilization() + assert len(utilizations) == 1 + assert utilizations[0]["utilization"] == 93.75 diff --git a/tests/unit/sdk/pool/test_relationship_from_pool.py b/tests/unit/sdk/pool/test_relationship_from_pool.py new file mode 100644 index 00000000..f8c44c6b --- /dev/null +++ b/tests/unit/sdk/pool/test_relationship_from_pool.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from infrahub_sdk.schema import NodeSchemaAPI + from tests.unit.sdk.conftest import BothClients + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_create_input_data_with_resource_pool_relationship( + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + simple_device_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=clients.standard, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNodeSync( + client=clients.sync, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + + assert device._generate_input_data()["data"] == { + "data": { + "name": {"value": "device-01"}, + "primary_address": {"from_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}}, + "ip_address_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}, + }, + } + + +@pytest.mark.parametrize("client_type", client_types) +async def test_create_mutation_query_with_resource_pool_relationship( + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + simple_device_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=clients.standard, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNodeSync( + client=clients.sync, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + + assert device._generate_mutation_query() == { + "object": { + "id": None, + "primary_address": {"node": {"__typename": None, "display_label": None, "id": None}}, + "ip_address_pool": {"node": {"__typename": None, "display_label": None, "id": None}}, + }, + "ok": None, + } diff --git a/tests/unit/sdk/test_attribute_generate_input_data.py b/tests/unit/sdk/test_attribute_generate_input_data.py new file mode 100644 index 00000000..394623fc --- /dev/null +++ b/tests/unit/sdk/test_attribute_generate_input_data.py @@ -0,0 +1,404 @@ +"""Unit tests for Attribute._generate_input_data covering all code paths.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from infrahub_sdk.node.attribute import Attribute +from infrahub_sdk.protocols_base import CoreNodeBase +from infrahub_sdk.schema import AttributeSchemaAPI +from infrahub_sdk.schema.main import AttributeKind + +# ────────────────────────────────────────────── +# Value resolution: from_pool (dict-based) +# ────────────────────────────────────────────── + + +class TestFromPoolDict: + def test_from_pool_with_id(self) -> None: + pool_data = {"id": "pool-uuid-1"} + attr = Attribute(name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data={"from_pool": pool_data}) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": {"id": "pool-uuid-1"}} + assert result.variables == {} + + def test_from_pool_with_id_and_identifier(self) -> None: + pool_data = {"id": "pool-uuid-1", "identifier": "test"} + attr = Attribute(name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data={"from_pool": pool_data}) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": {"id": "pool-uuid-1", "identifier": "test"}} + assert result.variables == {} + + def test_from_pool_with_pool_name(self) -> None: + """from_pool can be a plain string (pool name), e.g. from_pool: 'VLAN ID Pool'.""" + attr = Attribute( + name="vlan_id", schema=_make_schema(AttributeKind.NUMBER, optional=True), data={"from_pool": "VLAN ID Pool"} + ) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": "VLAN ID Pool"} + assert result.variables == {} + assert "value" not in result.payload + + def test_from_pool_value_is_none(self) -> None: + """from_pool pops 'from_pool' and sets Attribute.value to None; value should NOT appear in payload.""" + attr = Attribute( + name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data={"from_pool": {"id": "pool-uuid-1"}} + ) + + assert attr.value is None + result = attr._generate_input_data() + assert "value" not in result.payload + + +# ────────────────────────────────────────────── +# Value resolution: from_pool (node-based) +# ────────────────────────────────────────────── + + +class TestFromPoolNode: + def test_pool_node_generates_from_pool(self) -> None: + pool_node = _FakeNode(node_id="node-pool-uuid", is_pool=True) + + attr = Attribute(name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data=pool_node) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": {"id": "node-pool-uuid"}} + assert result.variables == {} + + def test_non_pool_node_treated_as_regular_value(self) -> None: + """A CoreNodeBase that is NOT a resource pool should go through the normal value path.""" + node = _FakeNode(node_id="regular-node-uuid", is_pool=False) + attr = Attribute(name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data=node) + + result = attr._generate_input_data() + + assert result.payload == {"value": node} + + +# ────────────────────────────────────────────── +# Value resolution: null values +# ────────────────────────────────────────────── + + +class TestNullValue: + def test_null_value_not_mutated(self) -> None: + """None value that was never mutated → empty payload, no properties.""" + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data={"value": None}) + + result = attr._generate_input_data() + + assert result.payload == {} + assert result.variables == {} + assert result.needs_metadata is False + + def test_null_value_mutated_optional(self) -> None: + """None value on an optional attr that was mutated → explicit null.""" + attr = Attribute( + name="test_attr", schema=_make_schema(AttributeKind.TEXT, optional=True), data={"value": "initial"} + ) + attr.value = None # triggers value_has_been_mutated + + result = attr._generate_input_data() + + assert result.payload == {"value": None} + assert result.needs_metadata is False + + def test_null_value_mutated_non_optional(self) -> None: + """None value on a non-optional attr that was mutated → empty payload (same as not mutated).""" + attr = Attribute( + name="test_attr", schema=_make_schema(AttributeKind.TEXT, optional=False), data={"value": "initial"} + ) + attr.value = None + + result = attr._generate_input_data() + + assert result.payload == {} + assert result.needs_metadata is False + + +# ────────────────────────────────────────────── +# Value resolution: strings (safe vs unsafe) +# ────────────────────────────────────────────── + + +class TestStringValues: + @pytest.mark.parametrize( + "value", + [ + pytest.param("simple", id="alphanumeric"), + pytest.param("user.name", id="dots"), + pytest.param("/opt/repos/infrahub", id="filepath"), + pytest.param("https://github.com/opsmill", id="url"), + pytest.param("", id="empty-string"), + ], + ) + def test_safe_string(self, value: str) -> None: + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data=value) + + result = attr._generate_input_data() + + assert result.payload == {"value": value} + assert result.variables == {} + + @pytest.mark.parametrize( + "value", + [ + pytest.param('has "quotes"', id="quotes"), + pytest.param("has\nnewline", id="newline"), + pytest.param("special{chars}", id="braces"), + ], + ) + def test_unsafe_string_uses_variable_binding(self, value: str) -> None: + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data=value) + + result = attr._generate_input_data() + + # payload["value"] should be a variable reference like "$value_" + assert "value" in result.payload + assert result.payload["value"].startswith("$value_") + # The actual string should be in variables + assert len(result.variables) == 1 + var_name = next(iter(result.variables)) + assert result.variables[var_name] == value + + +# ────────────────────────────────────────────── +# Value resolution: IP types +# ────────────────────────────────────────────── + + +class TestIPValues: + def test_ipv4_interface(self) -> None: + attr = Attribute(name="address", schema=_make_schema(AttributeKind.IPHOST), data={"value": "10.0.0.1/24"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "10.0.0.1/24" + assert result.variables == {} + + def test_ipv6_interface(self) -> None: + attr = Attribute(name="address", schema=_make_schema(AttributeKind.IPHOST), data={"value": "2001:db8::1/64"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "2001:db8::1/64" + + def test_ipv4_network(self) -> None: + attr = Attribute(name="network", schema=_make_schema(AttributeKind.IPNETWORK), data={"value": "10.0.0.0/24"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "10.0.0.0/24" + + def test_ipv6_network(self) -> None: + attr = Attribute(name="network", schema=_make_schema(AttributeKind.IPNETWORK), data={"value": "2001:db8::/32"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "2001:db8::/32" + + +# ────────────────────────────────────────────── +# Value resolution: other scalars +# ────────────────────────────────────────────── + + +class TestScalarValues: + def test_number_value(self) -> None: + attr = Attribute(name="vlan_id", schema=_make_schema(AttributeKind.NUMBER), data=42) + + result = attr._generate_input_data() + + assert result.payload == {"value": 42} + assert result.variables == {} + + def test_boolean_value(self) -> None: + attr = Attribute(name="enabled", schema=_make_schema(AttributeKind.BOOLEAN), data=True) + + result = attr._generate_input_data() + + assert result.payload == {"value": True} + + +# ────────────────────────────────────────────── +# Property handling +# ────────────────────────────────────────────── + + +class TestProperties: + def test_no_properties_set(self) -> None: + """When no properties are set, payload only has the value.""" + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data="hello") + + result = attr._generate_input_data() + + assert result.payload == {"value": "hello"} + + def test_flag_property_is_protected(self) -> None: + attr = Attribute( + name="test_attr", schema=_make_schema(AttributeKind.TEXT), data={"value": "hello", "is_protected": True} + ) + + result = attr._generate_input_data() + + assert result.payload["value"] == "hello" + assert result.payload["is_protected"] is True + + def test_object_property_source(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema(AttributeKind.TEXT), + data={"value": "hello", "source": {"id": "source-uuid", "display_label": "Git", "__typename": "CoreGit"}}, + ) + + result = attr._generate_input_data() + + assert result.payload["value"] == "hello" + assert result.payload["source"] == "source-uuid" + + def test_object_property_owner(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema(AttributeKind.TEXT), + data={ + "value": "hello", + "owner": {"id": "owner-uuid", "display_label": "Admin", "__typename": "CoreAccount"}, + }, + ) + + result = attr._generate_input_data() + + assert result.payload["owner"] == "owner-uuid" + + def test_both_flag_and_object_properties(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema(AttributeKind.TEXT), + data={ + "value": "hello", + "is_protected": True, + "source": {"id": "src-uuid", "display_label": "Git", "__typename": "CoreGit"}, + }, + ) + + result = attr._generate_input_data() + + assert result.payload["value"] == "hello" + assert result.payload["is_protected"] is True + assert result.payload["source"] == "src-uuid" + + def test_properties_not_appended_for_null_value(self) -> None: + """When need_additional_properties is False (null non-mutated), properties are ignored.""" + attr = Attribute( + name="test_attr", + schema=_make_schema(AttributeKind.TEXT), + data={ + "value": None, + "is_protected": True, + "source": {"id": "src-uuid", "display_label": "Git", "__typename": "CoreGit"}, + }, + ) + + result = attr._generate_input_data() + + # Null value, not mutated → empty payload, properties NOT appended + assert result.payload == {} + + def test_properties_appended_for_from_pool(self) -> None: + """from_pool payloads have need_additional_properties=True, so properties are included.""" + attr = Attribute( + name="vlan_id", + schema=_make_schema(AttributeKind.NUMBER), + data={"from_pool": {"id": "pool-uuid"}, "is_protected": True}, + ) + + result = attr._generate_input_data() + + assert result.payload["from_pool"] == {"id": "pool-uuid"} + assert result.payload["is_protected"] is True + + +# ────────────────────────────────────────────── +# Return type: to_dict() integration +# ────────────────────────────────────────────── + + +class TestToDictIntegration: + def test_to_dict_simple_value(self) -> None: + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data="hello") + + result = attr._generate_input_data().to_dict() + + assert result == {"data": {"value": "hello"}, "variables": {}} + + def test_to_dict_with_variables(self) -> None: + attr = Attribute(name="test_attr", schema=_make_schema(AttributeKind.TEXT), data='has "quotes"') + + result = attr._generate_input_data().to_dict() + + assert "data" in result + assert "variables" in result + assert len(result["variables"]) == 1 + var_name = next(iter(result["variables"])) + assert result["variables"][var_name] == 'has "quotes"' + assert result["data"]["value"] == f"${var_name}" + + +def _make_schema(kind: AttributeKind = AttributeKind.TEXT, optional: bool = False) -> AttributeSchemaAPI: + return AttributeSchemaAPI(name="test_attr", kind=kind, optional=optional) + + +class _FakeNode(CoreNodeBase): + """Minimal CoreNodeBase implementation for testing.""" + + def __init__(self, node_id: str, is_pool: bool) -> None: + self.id = node_id + self._is_pool = is_pool + self._schema: Any = None + self._internal_id = "" + self.display_label = None + self.typename = None + + @property + def hfid(self) -> list[str] | None: + return None + + @property + def hfid_str(self) -> str | None: + return None + + def get_human_friendly_id(self) -> list[str] | None: + return None + + def get_human_friendly_id_as_string(self, include_kind: bool = False) -> str | None: + return None + + def get_kind(self) -> str: + return "" + + def get_all_kinds(self) -> list[str]: + return [] + + def get_branch(self) -> str: + return "" + + def is_ip_prefix(self) -> bool: + return False + + def is_ip_address(self) -> bool: + return False + + def is_resource_pool(self) -> bool: + return self._is_pool + + def get_raw_graphql_data(self) -> dict | None: + return None diff --git a/tests/unit/sdk/test_client.py b/tests/unit/sdk/test_client.py index 9234f9cc..13346c02 100644 --- a/tests/unit/sdk/test_client.py +++ b/tests/unit/sdk/test_client.py @@ -14,11 +14,9 @@ if TYPE_CHECKING: from collections.abc import Callable, Mapping from inspect import Parameter - from typing import Any from pytest_httpx import HTTPXMock - from infrahub_sdk.schema import NodeSchemaAPI from tests.unit.sdk.conftest import BothClients pytestmark = pytest.mark.httpx_mock(can_send_already_matched_responses=True) @@ -636,208 +634,6 @@ async def test_method_filters_empty( assert len(repos) == 0 -@pytest.mark.parametrize("client_type", client_types) -async def test_allocate_next_ip_address( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubIPAddressPoolGetResource": { - "ok": True, - "node": { - "id": "17da1246-54f1-a9c0-2784-179f0ec5b128", - "kind": "IpamIPAddress", - "identifier": "test", - "display_label": "192.0.2.0/32", - }, - } - } - }, - match_headers={"X-Infrahub-Tracker": "allocate-ip-loopback"}, - is_reusable=True, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPAddress": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "__typename": "IpamIPAddress", - "address": {"value": "192.0.2.0/32"}, - "description": {"value": "test"}, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, - is_reusable=True, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_address = await clients.standard.allocate_next_ip_address( - resource_pool=ip_pool, - identifier="test", - prefix_length=32, - address_type="IpamIPAddress", - data={"description": "test"}, - tracker="allocate-ip-loopback", - ) - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_address = clients.sync.allocate_next_ip_address( - resource_pool=ip_pool, - identifier="test", - prefix_length=32, - address_type="IpamIPAddress", - data={"description": "test"}, - tracker="allocate-ip-loopback", - ) - - assert ip_address - assert str(ip_address.address.value) == "192.0.2.0/32" - assert ip_address.description.value == "test" - - -@pytest.mark.parametrize("client_type", client_types) -async def test_allocate_next_ip_prefix( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipprefix_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubIPPrefixPoolGetResource": { - "ok": True, - "node": { - "id": "7d9bd8d-8fc2-70b0-278a-179f425e25cb", - "kind": "IpamIPPrefix", - "identifier": "test", - "display_label": "192.0.2.0/31", - }, - } - } - }, - match_headers={"X-Infrahub-Tracker": "allocate-ip-interco"}, - is_reusable=True, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPPrefix": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "__typename": "IpamIPPrefix", - "prefix": {"value": "192.0.2.0/31"}, - "description": {"value": "test"}, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipprefix-page1"}, - is_reusable=True, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipprefix_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core intercos", - "default_prefix_type": "IpamIPPrefix", - "default_prefix_length": 31, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_prefix = await clients.standard.allocate_next_ip_prefix( - resource_pool=ip_pool, - identifier="test", - prefix_length=31, - prefix_type="IpamIPPrefix", - data={"description": "test"}, - tracker="allocate-ip-interco", - ) - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipprefix_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core intercos", - "default_prefix_type": "IpamIPPrefix", - "default_prefix_length": 31, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_prefix = clients.sync.allocate_next_ip_prefix( - resource_pool=ip_pool, - identifier="test", - prefix_length=31, - prefix_type="IpamIPPrefix", - data={"description": "test"}, - tracker="allocate-ip-interco", - ) - - assert ip_prefix - assert str(ip_prefix.prefix.value) == "192.0.2.0/31" - assert ip_prefix.description.value == "test" - - EXPECTED_ECHO = """URL: http://mock/graphql/main QUERY: diff --git a/tests/unit/sdk/test_node.py b/tests/unit/sdk/test_node.py index f539ea51..ad3d77eb 100644 --- a/tests/unit/sdk/test_node.py +++ b/tests/unit/sdk/test_node.py @@ -2216,289 +2216,6 @@ async def test_relationships_excluded_input_data( assert node.tags.has_update is False -@pytest.mark.parametrize("client_type", client_types) -async def test_create_input_data_with_resource_pool_relationship( - client: InfrahubClient, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - simple_device_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - if client_type == "standard": - ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - else: - ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - - assert device._generate_input_data()["data"] == { - "data": { - "name": {"value": "device-01"}, - "primary_address": {"from_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}}, - "ip_address_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}, - }, - } - - -@pytest.mark.parametrize("client_type", client_types) -async def test_create_mutation_query_with_resource_pool_relationship( - client: InfrahubClient, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - simple_device_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - if client_type == "standard": - ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - else: - ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - - assert device._generate_mutation_query() == { - "object": { - "id": None, - "primary_address": {"node": {"__typename": None, "display_label": None, "id": None}}, - "ip_address_pool": {"node": {"__typename": None, "display_label": None, "id": None}}, - }, - "ok": None, - } - - -@pytest.mark.parametrize("client_type", client_types) -async def test_get_pool_allocated_resources( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubResourcePoolAllocated": { - "count": 2, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "kind": "IpamIPAddress", - "branch": "main", - "identifier": "ip-1", - } - }, - { - "node": { - "id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - "kind": "IpamIPAddress", - "branch": "main", - "identifier": "ip-2", - } - }, - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "get-allocated-resources-page1"}, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPAddress": { - "count": 2, - "edges": [ - {"node": {"id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", "__typename": "IpamIPAddress"}}, - {"node": {"id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", "__typename": "IpamIPAddress"}}, - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - resources = await ip_pool.get_pool_allocated_resources(resource=ip_prefix) - assert len(resources) == 2 - assert [resource.id for resource in resources] == [ - "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - ] - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - resources = ip_pool.get_pool_allocated_resources(resource=ip_prefix) - assert len(resources) == 2 - assert [resource.id for resource in resources] == [ - "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - ] - - -@pytest.mark.parametrize("client_type", client_types) -async def test_get_pool_resources_utilization( - httpx_mock: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubResourcePoolUtilization": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd86-3471-a020-2782-179ff078e58f", - "utilization": 93.75, - "utilization_branches": 0, - "utilization_default_branch": 93.75, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "get-pool-utilization"}, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - utilizations = await ip_pool.get_pool_resources_utilization() - assert len(utilizations) == 1 - assert utilizations[0]["utilization"] == 93.75 - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - utilizations = ip_pool.get_pool_resources_utilization() - assert len(utilizations) == 1 - assert utilizations[0]["utilization"] == 93.75 - - @pytest.mark.parametrize("client_type", client_types) async def test_from_graphql( clients: BothClients, mock_schema_query_01: HTTPXMock, location_data01: dict[str, Any], client_type: str