From 893861e53a9f48f5c1bc0b8dc7ec10c8ca56a37e Mon Sep 17 00:00:00 2001 From: Valeriia Ziukina Date: Mon, 26 Jan 2026 10:56:18 +0100 Subject: [PATCH 1/4] fix(vpcep): tests added --- otcextensions/sdk/apig/v2/_proxy.py | 25 +++ otcextensions/sdk/apig/v2/vpc_endpoint.py | 35 ++++ otcextensions/sdk/vlb/v3/_proxy.py | 15 ++ otcextensions/sdk/vlb/v3/availability_zone.py | 2 +- otcextensions/sdk/vpcep/v1/_proxy.py | 4 +- otcextensions/sdk/vpcep/v1/connection.py | 6 +- otcextensions/sdk/vpcep/v1/quota.py | 2 +- otcextensions/tests/functional/base.py | 84 ++++++++ .../sdk/apig/v2/test_vpc_endpoint.py | 31 +++ .../tests/functional/sdk/vpcep/__init__.py | 183 ++++++++++++++++++ .../sdk/vpcep/v1/test_connection.py | 77 ++++++++ .../functional/sdk/vpcep/v1/test_endpoint.py | 69 +++++++ .../sdk/vpcep/v1/test_public_service.py | 26 +++ .../functional/sdk/vpcep/v1/test_quota.py | 37 ++++ .../functional/sdk/vpcep/v1/test_service.py | 105 +++++++++- .../sdk/vpcep/v1/test_target_service.py | 32 +++ .../functional/sdk/vpcep/v1/test_whitelist.py | 94 +++++++++ 17 files changed, 813 insertions(+), 14 deletions(-) create mode 100644 otcextensions/sdk/apig/v2/vpc_endpoint.py create mode 100644 otcextensions/tests/functional/sdk/apig/v2/test_vpc_endpoint.py create mode 100644 otcextensions/tests/functional/sdk/vpcep/v1/test_connection.py create mode 100644 otcextensions/tests/functional/sdk/vpcep/v1/test_endpoint.py create mode 100644 otcextensions/tests/functional/sdk/vpcep/v1/test_public_service.py create mode 100644 otcextensions/tests/functional/sdk/vpcep/v1/test_quota.py create mode 100644 otcextensions/tests/functional/sdk/vpcep/v1/test_target_service.py create mode 100644 otcextensions/tests/functional/sdk/vpcep/v1/test_whitelist.py diff --git a/otcextensions/sdk/apig/v2/_proxy.py b/otcextensions/sdk/apig/v2/_proxy.py index d59166ac7..f966ab6e2 100644 --- a/otcextensions/sdk/apig/v2/_proxy.py +++ b/otcextensions/sdk/apig/v2/_proxy.py @@ -47,6 +47,7 @@ from otcextensions.sdk.apig.v2 import ssl_domain as _ssl_domain from otcextensions.sdk.apig.v2 import tag as _tag from otcextensions.sdk.apig.v2 import config as _config +from otcextensions.sdk.apig.v2 import vpc_endpoint as _vpc_endpoint class Proxy(proxy.Proxy): @@ -3469,3 +3470,27 @@ def configs_for_gateway(self, gateway_id, **attrs): paginated=False, base_path=base_path, **attrs,) + + # ======== VPC Endpoint Management Methods ======== + + def vpc_endpoints(self, gateway, **attrs): + """List all VPC endpoints + + This method retrieves all VPC endpoints associated with the specified + API Gateway instance. + + :param gateway: The ID or an instance of + :class:`~otcextensions.sdk.apig.v2.gateway.Gateway` + :param attrs: Optional query parameters for filtering the list, + such as limit, offset, id, marker_id, status + + :returns: A generator of + :class:`~otcextensions.sdk.apig.v2.vpc_endpoint.VpcEndpoint` + instances + """ + gateway = self._get_resource(_gateway.Gateway, gateway) + return self._list( + _vpc_endpoint.VpcEndpoint, + gateway_id=gateway.id, + **attrs + ) diff --git a/otcextensions/sdk/apig/v2/vpc_endpoint.py b/otcextensions/sdk/apig/v2/vpc_endpoint.py new file mode 100644 index 000000000..af5612c67 --- /dev/null +++ b/otcextensions/sdk/apig/v2/vpc_endpoint.py @@ -0,0 +1,35 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from openstack import resource + + +class VpcEndpoint(resource.Resource): + resources_key = "connections" + base_path = f'apigw/instances/%(gateway_id)s/vpc-endpoint/connections' + + _query_mapping = resource.QueryParameters('limit', 'offset', 'id', + 'marker_id', 'status') + + # capabilities + allow_fetch = True + allow_commit = True + allow_list = True + + gateway_id = resource.URI('gateway_id') + + # Properties + id = resource.Body("id") + marker_id = resource.Body("marker_id", type=int) + created_at = resource.Body("created_at") + updated_at = resource.Body("updated_at") + domain_id = resource.Body("domain_id") + status = resource.Body("status") diff --git a/otcextensions/sdk/vlb/v3/_proxy.py b/otcextensions/sdk/vlb/v3/_proxy.py index affd97d8c..9613065f3 100644 --- a/otcextensions/sdk/vlb/v3/_proxy.py +++ b/otcextensions/sdk/vlb/v3/_proxy.py @@ -133,6 +133,21 @@ def wait_for_load_balancer(self, name_or_id, status='ACTIVE', return resource.wait_for_status(self, lb, status, failures, interval, wait, attribute='provisioning_status') + def wait_for_delete_load_balancer(self, name_or_id, interval=2, wait=300): + """Wait for a load balancer to be deleted. + + :param name_or_id: The name or ID of a load balancer. + :param interval: Number of seconds to wait between checks. + Default is 2. + :param wait: Maximum number of seconds to wait for deletion. + Default is 300. + :returns: True if the resource is deleted, False if timeout. + :raises: :class:`~openstack.exceptions.ResourceTimeout` if transition + to delete failed to occur in the specified seconds. + """ + lb = self._get_resource(_lb.LoadBalancer, name_or_id) + return resource.wait_for_delete(self, lb, interval, wait) + def get_load_balancer_statuses(self, loadbalancer_id): """Get specific load balancer statuses by load balancer id. diff --git a/otcextensions/sdk/vlb/v3/availability_zone.py b/otcextensions/sdk/vlb/v3/availability_zone.py index 4238137cb..0bce2cdec 100644 --- a/otcextensions/sdk/vlb/v3/availability_zone.py +++ b/otcextensions/sdk/vlb/v3/availability_zone.py @@ -33,7 +33,7 @@ def list(cls, session, paginated=True, base_path=None, if not cls.allow_list: raise exceptions.MethodNotSupported(cls, "list") session = cls._get_session(session) - microversion = cls._get_microversion(session) + microversion = cls._get_microversion(session, action='list') if base_path is None: base_path = cls.base_path diff --git a/otcextensions/sdk/vpcep/v1/_proxy.py b/otcextensions/sdk/vpcep/v1/_proxy.py index 3df43b64c..72c219a35 100644 --- a/otcextensions/sdk/vpcep/v1/_proxy.py +++ b/otcextensions/sdk/vpcep/v1/_proxy.py @@ -210,10 +210,8 @@ def manage_service_connections(self, service, action, endpoints=[]): :param action: action can be ``accept`` or ``reject``. :param endpoints: List of VPC Endpoints Id. - :returns: A generator of connection objects. - :rtype: :class:`~otcextensions.sdk.vpcep.connection.Connection` + :returns: List of connection objects. """ - endpoint_service = self._get_resource(_service.Service, service) connection = self._get_resource( _connection.Connection, diff --git a/otcextensions/sdk/vpcep/v1/connection.py b/otcextensions/sdk/vpcep/v1/connection.py index 6146853f8..a1a90655a 100644 --- a/otcextensions/sdk/vpcep/v1/connection.py +++ b/otcextensions/sdk/vpcep/v1/connection.py @@ -54,8 +54,10 @@ def _action(self, session, action, endpoints=[]): body = {'endpoints': endpoints, 'action': action} response = session.post(uri, json=body) exceptions.raise_from_response(response) - for raw_resource in response.json()[self.resources_key]: - yield Connection.existing(**raw_resource) + return [ + Connection.existing(**raw_resource) + for raw_resource in response.json()[self.resources_key] + ] def accept(self, session, endpoints=[]): """Accept connections.""" diff --git a/otcextensions/sdk/vpcep/v1/quota.py b/otcextensions/sdk/vpcep/v1/quota.py index 98e697583..904b96c37 100644 --- a/otcextensions/sdk/vpcep/v1/quota.py +++ b/otcextensions/sdk/vpcep/v1/quota.py @@ -74,7 +74,7 @@ def list(cls, session, paginated=False, base_path=None, **params): if not cls.allow_list: raise exceptions.MethodNotSupported(cls, "list") session = cls._get_session(session) - microversion = cls._get_microversion(session) + microversion = cls._get_microversion(session, action='list') if base_path is None: base_path = cls.base_path diff --git a/otcextensions/tests/functional/base.py b/otcextensions/tests/functional/base.py index 10ba085f3..8dffba2a5 100644 --- a/otcextensions/tests/functional/base.py +++ b/otcextensions/tests/functional/base.py @@ -11,6 +11,7 @@ # under the License. import os +import uuid from keystoneauth1 import exceptions as _exceptions @@ -72,3 +73,86 @@ def setUp(self): except _exceptions.EndpointNotFound: self.skipTest('Service {service_type} not found in cloud'.format( service_type=service_type)) + + +class NetworkBaseFunctionalTest(BaseFunctionalTest): + def create_network(self, prefix='sdk-test'): + cidr = '192.168.0.0/16' + ipv4 = 4 + uuid_v4 = uuid.uuid4().hex[:8] + router_name = prefix + '-router-' + uuid_v4 + net_name = prefix + '-net-' + uuid_v4 + subnet_name = prefix + '-subnet-' + uuid_v4 + + network = self.conn.network.create_network(name=net_name) + self.assertEqual(net_name, network.name) + net_id = network.id + subnet = self.conn.network.create_subnet( + name=subnet_name, + ip_version=ipv4, + network_id=net_id, + cidr=cidr + ) + self.assertEqual(subnet_name, subnet.name) + subnet_id = subnet.id + + router = self.conn.network.create_router(name=router_name) + self.assertEqual(router_name, router.name) + router_id = router.id + interface = router.add_interface( + self.conn.network, + subnet_id=subnet_id + ) + self.assertEqual(interface['subnet_id'], subnet_id) + self.assertIn('port_id', interface) + return { + 'router_id': router_id, + 'subnet_id': subnet_id, + 'network_id': net_id + } + + def destroy_network(self, params: dict): + router_id = params.get('router_id') + subnet_id = params.get('subnet_id') + network_id = params.get('network_id') + router = self.conn.network.get_router(router_id) + + interface = router.remove_interface( + self.conn.network, + subnet_id=subnet_id + ) + self.assertEqual(interface['subnet_id'], subnet_id) + self.assertIn('port_id', interface) + sot = self.conn.network.delete_router( + router_id, + ignore_missing=False + ) + self.assertIsNone(sot) + sot = self.conn.network.delete_subnet( + subnet_id, + ignore_missing=False + ) + self.assertIsNone(sot) + sot = self.conn.network.delete_network( + network_id, + ignore_missing=False + ) + self.assertIsNone(sot) + + def create_port(self, network_id, prefix='sdk-test'): + uuid_v4 = uuid.uuid4().hex[:8] + port_name = prefix + '-port-' + uuid_v4 + + port = self.conn.network.create_port( + name=port_name, + network_id=network_id + ) + self.assertEqual(port_name, port.name) + return port + + def destroy_port(self, port): + sot = self.conn.network.delete_port( + port, + ignore_missing=False + ) + self.assertIsNone(sot) diff --git a/otcextensions/tests/functional/sdk/apig/v2/test_vpc_endpoint.py b/otcextensions/tests/functional/sdk/apig/v2/test_vpc_endpoint.py new file mode 100644 index 000000000..ec6af312b --- /dev/null +++ b/otcextensions/tests/functional/sdk/apig/v2/test_vpc_endpoint.py @@ -0,0 +1,31 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from otcextensions.tests.functional.sdk.apig import TestApiG + + +class TestVpcEndpoint(TestApiG): + gateway = "4a5d39b75bc341e89033c65e97ad5bca" + + def setUp(self): + super(TestVpcEndpoint, self).setUp() + + def test_01_list_vpc_endpoint(self): + self.conn.vpcep.create_endpoint( + subnet_id='1bc849dc-4426-455a-8207-fb5f726c6ff7', + endpoint_service_id='b002616f-f386-4da1-8118-04d5e090200f', + vpc_id='80a6ba4b-44d7-4c57-be40-5013a1e55273', + + ) + list(self.client.vpc_endpoints( + gateway=TestVpcEndpoint.gateway, + )) diff --git a/otcextensions/tests/functional/sdk/vpcep/__init__.py b/otcextensions/tests/functional/sdk/vpcep/__init__.py index e69de29bb..f2880af38 100644 --- a/otcextensions/tests/functional/sdk/vpcep/__init__.py +++ b/otcextensions/tests/functional/sdk/vpcep/__init__.py @@ -0,0 +1,183 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import atexit +import time +import uuid + +import openstack + +from otcextensions.tests.functional import base + +_logger = openstack._log.setup_logging('openstack') + +# Module-level cache for shared ELB +_shared_load_balancer = None +_shared_vpc_id = None +_shared_subnet_id = None + + +def _cleanup_shared_elb(): + """Cleanup shared ELB at module exit.""" + global _shared_load_balancer + if _shared_load_balancer: + try: + from openstack import connection + from otcextensions import sdk + conn = connection.Connection(config=base.TEST_CLOUD_REGION) + sdk.register_otc_extensions(conn) + conn.vlb.delete_load_balancer(_shared_load_balancer.id) + conn.vlb.wait_for_delete_load_balancer( + _shared_load_balancer.id, interval=5, wait=300) + _logger.info('Deleted shared ELB') + except Exception as e: + _logger.warning(f'Error deleting shared ELB: {e}') + finally: + _shared_load_balancer = None + + +atexit.register(_cleanup_shared_elb) + + +class TestVpcepBase(base.BaseFunctionalTest): + """Base class for VPCEP tests that don't need ELB + (quota, public_service).""" + + def setUp(self): + super(TestVpcepBase, self).setUp() + self.client = self.conn.vpcep + + +class TestVpcep(base.NetworkBaseFunctionalTest): + """Base class for VPCEP functional tests that need ELB. + + Uses lazy initialization with module-level caching to share ELB + between tests. ELB is cleaned up at module exit via atexit. + """ + + def setUp(self): + super(TestVpcep, self).setUp() + self.client = self.conn.vpcep + + self._ensure_load_balancer() + + self.load_balancer = _shared_load_balancer + self.vpc_id = _shared_vpc_id + self.subnet_id = _shared_subnet_id + + if not self.load_balancer: + self.skipTest('ELB not available') + + def _ensure_load_balancer(self): + """Lazily create shared ELB if not exists.""" + global _shared_load_balancer, _shared_vpc_id, _shared_subnet_id + + if _shared_load_balancer: + return + + self._find_existing_network() + if not _shared_vpc_id or not _shared_subnet_id: + return + + azs = list(self.conn.vlb.availability_zones()) + az = azs[0].code if azs else 'eu-de-01' + + lb_name = 'sdk-vpcep-elb-' + uuid.uuid4().hex[:8] + + attrs = { + 'name': lb_name, + 'description': 'ELB for VPCEP functional tests', + 'vip_subnet_cidr_id': _shared_subnet_id, + 'vpc_id': _shared_vpc_id, + 'availability_zone_list': [az], + 'guaranteed': True, + 'provider': 'vlb', + } + + _logger.info(f'Creating ELB with attrs: {attrs}') + _shared_load_balancer = self.conn.vlb.create_load_balancer(**attrs) + _logger.info(f'Created ELB: {_shared_load_balancer.id}') + + self.conn.vlb.wait_for_load_balancer( + _shared_load_balancer.id, + status='ACTIVE', + interval=5, + wait=300 + ) + + _shared_load_balancer = self.conn.vlb.get_load_balancer( + _shared_load_balancer.id + ) + _shared_vpc_id = _shared_load_balancer.vpc_id + _logger.info(f'ELB active, port_id: {_shared_load_balancer.port_id}, ' + f'vpc_id: {_shared_vpc_id}') + + def _find_existing_network(self): + """Find existing VPC and subnet for ELB creation.""" + global _shared_vpc_id, _shared_subnet_id + + vpcs = list(self.conn.vpc.vpcs()) + if not vpcs: + _logger.warning('No VPCs found') + return + + for vpc in vpcs: + subnets = list(self.conn.vpc.subnets(vpc_id=vpc.id)) + for subnet in subnets: + if subnet.status == 'ACTIVE' and subnet.neutron_subnet_id: + _shared_vpc_id = vpc.id + _shared_subnet_id = subnet.neutron_subnet_id + _logger.info( + f'Using VPC {vpc.name} ({vpc.id}) ' + f'subnet {subnet.name} ({subnet.neutron_subnet_id})' + ) + return + + _logger.warning('No suitable VPC/subnet found') + + def create_service_helper(self, name=None, approval=False): + """Create a VPCEP service using the shared ELB and wait for it.""" + if not name: + name = 'svc' + uuid.uuid4().hex[:8] + + attrs = { + 'service_name': name, + 'port_id': self.load_balancer.port_id, + 'vpc_id': self.vpc_id, + 'server_type': 'LB', + 'ports': [{'client_port': 80, 'server_port': 80, + 'protocol': 'TCP'}], + 'approval_enabled': approval, + 'service_type': 'interface' + } + + service = self.client.create_service(**attrs) + self.addCleanup(self._cleanup_service, service.id) + + for _ in range(40): + service = self.client.get_service(service.id) + if service.status == 'available': + return service + time.sleep(3) + raise Exception(f'Service {service.id} not available') + + def _cleanup_service(self, service_id): + """Delete service with retries (endpoints may still be deleting).""" + for _ in range(10): + try: + self.client.delete_service(service_id, ignore_missing=True) + return + except Exception as e: + if 'EndPoint.300' in str(e): + time.sleep(3) + else: + _logger.warning(f'Error deleting service {service_id}:{e}') + return diff --git a/otcextensions/tests/functional/sdk/vpcep/v1/test_connection.py b/otcextensions/tests/functional/sdk/vpcep/v1/test_connection.py new file mode 100644 index 000000000..8d3b0ee83 --- /dev/null +++ b/otcextensions/tests/functional/sdk/vpcep/v1/test_connection.py @@ -0,0 +1,77 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import uuid +import time +from otcextensions.tests.functional.sdk.vpcep import TestVpcep + + +class TestConnection(TestVpcep): + + def setUp(self): + super(TestConnection, self).setUp() + self.network_data = self.create_network() + self.addCleanup(self.destroy_network, self.network_data) + self.port = self.create_port(self.network_data['network_id']) + self.addCleanup(self.destroy_port, self.port.id) + self.service = self.create_service_helper(approval=True) + self.endpoint_name = 'sdk-vpcep-test-endpoint-' + uuid.uuid4().hex[:8] + ep_attrs = { + 'router_id': self.network_data['router_id'], + 'network_id': self.network_data['network_id'], + 'endpoint_service_id': self.service.id, + 'enable_dns': False + } + self.endpoint = self.client.create_endpoint(**ep_attrs) + self.addCleanup(self.client.delete_endpoint, self.endpoint.id) + + def _get_connection(self): + for i in range(10): + connections = list(self.client.service_connections( + self.service.id)) + my_conn = next( + (c for c in connections if c.id == self.endpoint.id), None) + if my_conn: + return my_conn + time.sleep(2) + return None + + def test_list_service_connections(self): + """Test listing service connections.""" + my_conn = self._get_connection() + self.assertIsNotNone(my_conn, "Connection not found") + self.assertEqual('pendingAcceptance', my_conn.status) + + def test_accept_service_connections(self): + """Test accepting a service connection.""" + my_conn = self._get_connection() + self.assertIsNotNone(my_conn, "Connection not found") + self.assertEqual('pendingAcceptance', my_conn.status) + self.client.manage_service_connections( + self.service.id, action='accept', endpoints=[self.endpoint.id]) + time.sleep(2) + connections = list(self.client.service_connections(self.service.id)) + my_conn = next((c for c in connections if c.id == self.endpoint.id), + None) + self.assertEqual('accepted', my_conn.status) + + def test_reject_service_connections(self): + """Test rejecting a service connection.""" + my_conn = self._get_connection() + self.assertIsNotNone(my_conn, "Connection not found") + self.assertEqual('pendingAcceptance', my_conn.status) + self.client.manage_service_connections( + self.service.id, action='reject', endpoints=[self.endpoint.id]) + time.sleep(2) + connections = list(self.client.service_connections(self.service.id)) + my_conn = next((c for c in connections if c.id == self.endpoint.id), + None) + self.assertEqual('rejected', my_conn.status) diff --git a/otcextensions/tests/functional/sdk/vpcep/v1/test_endpoint.py b/otcextensions/tests/functional/sdk/vpcep/v1/test_endpoint.py new file mode 100644 index 000000000..106ed4170 --- /dev/null +++ b/otcextensions/tests/functional/sdk/vpcep/v1/test_endpoint.py @@ -0,0 +1,69 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import time +import uuid +from otcextensions.tests.functional.sdk.vpcep import TestVpcep + + +class TestEndpoint(TestVpcep): + def setUp(self): + super(TestEndpoint, self).setUp() + self.network_data = self.create_network() + self.addCleanup(self.destroy_network, self.network_data) + self.port = self.create_port(self.network_data['network_id']) + self.addCleanup(self.destroy_port, self.port.id) + self.service = self.create_service_helper() + self.endpoint_name = 'sdk-vpcep-test-endpoint-' + uuid.uuid4().hex[:8] + + def _create_endpoint(self, remove=True): + attrs = { + 'router_id': self.network_data['router_id'], + 'network_id': self.network_data['network_id'], + 'endpoint_service_id': self.service.id, + 'enable_dns': False, + 'tags': [{'key': 'test-key', 'value': 'test-value'}] + } + endpoint = self.client.create_endpoint(**attrs) + self.assertIsNotNone(endpoint) + if remove: + self.addCleanup(self.client.delete_endpoint, endpoint.id) + return endpoint + + def test_create_endpoint(self): + """Test creating an Endpoint and verifying its attributes.""" + ep = self._create_endpoint() + self.assertEqual(self.service.id, ep.endpoint_service_id) + self.assertEqual(self.network_data['router_id'], ep.router_id) + self.assertTrue(len(ep.tags) > 0) + self.assertEqual('test-key', ep.tags[0].key) + + def test_list_endpoints(self): + """Test listing Endpoints.""" + ep = self._create_endpoint() + eps = list(self.client.endpoints()) + self.assertGreater(len(eps), 0) + found = any([e.id == ep.id for e in eps]) + self.assertTrue(found) + + def test_get_endpoint(self): + """Test retrieving a single Endpoint.""" + ep = self._create_endpoint() + got = self.client.get_endpoint(ep.id) + self.assertEqual(ep.id, got.id) + + def test_delete_endpoint(self): + """Test deleting an Endpoint.""" + ep = self._create_endpoint(remove=False) + self.client.delete_endpoint(ep.id) + time.sleep(5) + eps = list(self.client.endpoints()) + self.assertFalse(any(e.id == ep.id for e in eps)) diff --git a/otcextensions/tests/functional/sdk/vpcep/v1/test_public_service.py b/otcextensions/tests/functional/sdk/vpcep/v1/test_public_service.py new file mode 100644 index 000000000..ac8a654e0 --- /dev/null +++ b/otcextensions/tests/functional/sdk/vpcep/v1/test_public_service.py @@ -0,0 +1,26 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from otcextensions.tests.functional.sdk.vpcep import TestVpcepBase + + +class TestPublicService(TestVpcepBase): + """Tests for public services catalog (no dependencies).""" + + def test_list_public_services(self): + """Test listing public Endpoint Services.""" + services = list(self.client.public_services()) + self.assertIsInstance(services, list) + if services: + svc = services[0] + self.assertTrue(hasattr(svc, 'service_name')) + self.assertTrue(hasattr(svc, 'service_type')) + self.assertTrue(hasattr(svc, 'owner')) diff --git a/otcextensions/tests/functional/sdk/vpcep/v1/test_quota.py b/otcextensions/tests/functional/sdk/vpcep/v1/test_quota.py new file mode 100644 index 000000000..c8b2f614f --- /dev/null +++ b/otcextensions/tests/functional/sdk/vpcep/v1/test_quota.py @@ -0,0 +1,37 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from otcextensions.tests.functional.sdk.vpcep import TestVpcepBase + + +class TestQuota(TestVpcepBase): + def test_list_endpoint_service_quota(self): + """Test listing Endpoint Service quotas.""" + quotas = list(self.client.resource_quota(type='endpoint_service')) + self.assertGreater(len(quotas), 0) + q = quotas[0] + self.assertTrue(hasattr(q, 'type')) + self.assertTrue(hasattr(q, 'quota')) + self.assertTrue(hasattr(q, 'used')) + self.assertEqual('endpoint_service', q.type) + + def test_list_endpoint_quota(self): + """Test listing Endpoint quotas.""" + quotas_ep = list(self.client.resource_quota(type='endpoint')) + self.assertGreater(len(quotas_ep), 0) + self.assertEqual('endpoint', quotas_ep[0].type) + + def test_list_quota(self): + """Test listing all quotas without type filter.""" + quotas = list(self.client.resource_quota()) + self.assertEqual(2, len(quotas)) + types = {q.type for q in quotas} + self.assertEqual({'endpoint', 'endpoint_service'}, types) diff --git a/otcextensions/tests/functional/sdk/vpcep/v1/test_service.py b/otcextensions/tests/functional/sdk/vpcep/v1/test_service.py index e978bee0f..167859666 100644 --- a/otcextensions/tests/functional/sdk/vpcep/v1/test_service.py +++ b/otcextensions/tests/functional/sdk/vpcep/v1/test_service.py @@ -9,16 +9,107 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from openstack import _log +import time +import uuid -from otcextensions.tests.functional import base +from otcextensions.tests.functional.sdk.vpcep import TestVpcep -_logger = _log.setup_logging('openstack') +class TestService(TestVpcep): -class TestService(base.BaseFunctionalTest): + def setUp(self): + super(TestService, self).setUp() + self.service_name = 'svc' + uuid.uuid4().hex[:8] - def test_initialize(self): - client = self.conn.vpcep + def _create_service(self, remove=True, approval=False): + """Create a test service using shared ELB.""" + attrs = { + 'service_name': self.service_name, + 'port_id': self.load_balancer.port_id, + 'vpc_id': self.vpc_id, + 'server_type': 'LB', + 'ports': [{'client_port': 80, 'server_port': 80, + 'protocol': 'TCP'}], + 'is_approval_enabled': approval, + 'service_type': 'interface' + } + service = self.client.create_service(**attrs) + self.assertIsNotNone(service) - self.assertIsNotNone(client) + if remove: + self.addCleanup(self._cleanup_service, service.id) + self._wait_for_service_status(service.id, 'available') + + return service + + def _wait_for_service_status(self, service_id, status, timeout=60): + """Wait for service to reach expected status.""" + start = time.time() + while time.time() - start < timeout: + svc = self.client.get_service(service_id) + if svc.status == status: + return svc + time.sleep(2) + raise Exception(f'Service {service_id} did not reach {status}') + + def test_create_service(self): + """Test creating an Endpoint Service.""" + service = self._create_service() + self.assertIn(self.service_name, service.service_name) + self.assertEqual(self.load_balancer.port_id, service.port_id) + + def test_list_services(self): + """Test listing Endpoint Services.""" + service = self._create_service() + services = list(self.client.services()) + self.assertGreater(len(services), 0) + + found = any(s.id == service.id for s in services) + self.assertTrue(found) + + filtered = list(self.client.services(id=service.id)) + self.assertEqual(1, len(filtered)) + self.assertEqual(service.id, filtered[0].id) + + filtered_name = list(self.client.services(name=service.service_name)) + self.assertEqual(1, len(filtered_name)) + self.assertEqual(service.service_name, filtered_name[0].service_name) + + def test_get_service(self): + """Test retrieving a single Endpoint Service.""" + service = self._create_service() + s = self.client.get_service(service.id) + self.assertIsNotNone(s) + self.assertEqual(service.id, s.id) + + def test_find_service(self): + """Test finding an Endpoint Service.""" + service = self._create_service() + s = self.client.find_service(service.service_name) + self.assertIsNotNone(s) + self.assertEqual(service.id, s.id) + + def test_update_service(self): + """Test updating an Endpoint Service.""" + service = self._create_service() + updated_name = self.service_name + '_upd' + s = self.client.update_service( + service.id, + service_name=updated_name, + is_approval_enabled=True + ) + self.assertIn(updated_name, s.service_name) + self.assertTrue(s.is_approval_enabled) + + fetched = self.client.get_service(service.id) + self.assertIn(updated_name, fetched.service_name) + self.assertTrue(fetched.is_approval_enabled) + + def test_delete_service(self): + """Test deleting an Endpoint Service.""" + service = self._create_service(remove=False) + self.client.delete_service(service.id) + time.sleep(5) + + services = list(self.client.services(id=service.id)) + self.assertEqual(0, len(services)) diff --git a/otcextensions/tests/functional/sdk/vpcep/v1/test_target_service.py b/otcextensions/tests/functional/sdk/vpcep/v1/test_target_service.py new file mode 100644 index 000000000..376b6b343 --- /dev/null +++ b/otcextensions/tests/functional/sdk/vpcep/v1/test_target_service.py @@ -0,0 +1,32 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from otcextensions.tests.functional.sdk.vpcep import TestVpcep + + +class TestTargetService(TestVpcep): + """Tests for target service lookup (requires Service).""" + + def setUp(self): + super(TestTargetService, self).setUp() + self.service = self.create_service_helper() + + def test_get_target_service_by_id(self): + """Test getting a Target Service by ID.""" + target = self.client.get_target_service(self.service.id) + self.assertIsNotNone(target) + self.assertEqual(self.service.id, target.id) + + def test_get_target_service_by_name(self): + """Test getting a Target Service by name.""" + target = self.client.get_target_service(self.service.service_name) + self.assertIsNotNone(target) + self.assertEqual(self.service.service_name, target.service_name) diff --git a/otcextensions/tests/functional/sdk/vpcep/v1/test_whitelist.py b/otcextensions/tests/functional/sdk/vpcep/v1/test_whitelist.py new file mode 100644 index 000000000..32251fe37 --- /dev/null +++ b/otcextensions/tests/functional/sdk/vpcep/v1/test_whitelist.py @@ -0,0 +1,94 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import time + +from otcextensions.tests.functional.sdk.vpcep import TestVpcep + + +class TestWhitelist(TestVpcep): + + def setUp(self): + super(TestWhitelist, self).setUp() + self.network_data = self.create_network() + self.addCleanup(self.destroy_network, self.network_data) + self.port = self.create_port(self.network_data['network_id']) + self.addCleanup(self.destroy_port, self.port.id) + self.service = self.create_service_helper(approval=True) + self.target_domain = self.conn.current_project_id + if not self.target_domain and hasattr(self.conn, 'session'): + self.target_domain = self.conn.session.get_project_id() + + def _add_whitelist(self, domains): + return list(self.client.manage_service_whitelist(self.service.id, + action='add', + domains=domains)) + + def test_add_service_whitelist(self): + """Test adding a domain to the whitelist.""" + if not self.target_domain: + self.skipTest("Cannot determine current project" + " ID for whitelist test") + + domains = [self.target_domain] + added = self._add_whitelist(domains) + found = any([domains[0] in (w.permission or '') for w in added]) + self.assertTrue(found, "Domain not found in added whitelist") + + def test_add_service_whitelist_duplicate(self): + """Test adding a duplicate domain to the whitelist.""" + if not self.target_domain: + self.skipTest("Cannot determine current" + " project ID for whitelist test") + + domains = [self.target_domain] + self._add_whitelist(domains) + + self._add_whitelist(domains) + + listed = list(self.client.service_whitelist(self.service.id)) + found = any([domains[0] in (w.permission or '') for w in listed]) + self.assertTrue(found, "Domain not found " + "in whitelist after duplicate add") + + def test_list_service_whitelist(self): + """Test listing the whitelist.""" + if not self.target_domain: + self.skipTest("Cannot determine current project ID" + " for whitelist test") + + domains = [self.target_domain] + self._add_whitelist(domains) + + listed = list(self.client.service_whitelist(self.service.id)) + self.assertGreater(len(listed), 0) + found = any([domains[0] in (w.permission or '') for w in listed]) + self.assertTrue(found, "Domain not found in listed whitelist") + + def test_remove_service_whitelist(self): + """Test removing a domain from the whitelist.""" + if not self.target_domain: + self.skipTest("Cannot determine current project ID" + " for whitelist test") + + domains = [self.target_domain] + self._add_whitelist(domains) + + list(self.client.manage_service_whitelist(self.service.id, + action='remove', + domains=domains)) + + time.sleep(5) + + listed_after = list(self.client.service_whitelist(self.service.id)) + found = any([domains[0] in (w.permission or '') for w in listed_after]) + self.assertFalse(found, + "Domain still found in whitelist after removal") From 84128f09d926a70f67e5b3a10bce0e5e8813ec70 Mon Sep 17 00:00:00 2001 From: Valeriia Ziukina Date: Mon, 26 Jan 2026 10:59:02 +0100 Subject: [PATCH 2/4] fix(vpcep): tests added reno --- releasenotes/notes/vpcep-tests-added-e6cdff9537de1d0d.yaml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 releasenotes/notes/vpcep-tests-added-e6cdff9537de1d0d.yaml diff --git a/releasenotes/notes/vpcep-tests-added-e6cdff9537de1d0d.yaml b/releasenotes/notes/vpcep-tests-added-e6cdff9537de1d0d.yaml new file mode 100644 index 000000000..015cc5c28 --- /dev/null +++ b/releasenotes/notes/vpcep-tests-added-e6cdff9537de1d0d.yaml @@ -0,0 +1,4 @@ +--- +other: + - | + Functional tests for VPCEP v1 added From 7ac0bed1e8554a5d3672c6928eea0405b689c795 Mon Sep 17 00:00:00 2001 From: Valeriia Ziukina Date: Mon, 26 Jan 2026 11:30:56 +0100 Subject: [PATCH 3/4] fix(vpcep): fix --- otcextensions/sdk/vlb/v3/availability_zone.py | 2 +- otcextensions/sdk/vpcep/v1/quota.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/otcextensions/sdk/vlb/v3/availability_zone.py b/otcextensions/sdk/vlb/v3/availability_zone.py index 0bce2cdec..4238137cb 100644 --- a/otcextensions/sdk/vlb/v3/availability_zone.py +++ b/otcextensions/sdk/vlb/v3/availability_zone.py @@ -33,7 +33,7 @@ def list(cls, session, paginated=True, base_path=None, if not cls.allow_list: raise exceptions.MethodNotSupported(cls, "list") session = cls._get_session(session) - microversion = cls._get_microversion(session, action='list') + microversion = cls._get_microversion(session) if base_path is None: base_path = cls.base_path diff --git a/otcextensions/sdk/vpcep/v1/quota.py b/otcextensions/sdk/vpcep/v1/quota.py index 904b96c37..98e697583 100644 --- a/otcextensions/sdk/vpcep/v1/quota.py +++ b/otcextensions/sdk/vpcep/v1/quota.py @@ -74,7 +74,7 @@ def list(cls, session, paginated=False, base_path=None, **params): if not cls.allow_list: raise exceptions.MethodNotSupported(cls, "list") session = cls._get_session(session) - microversion = cls._get_microversion(session, action='list') + microversion = cls._get_microversion(session) if base_path is None: base_path = cls.base_path From cf019ca2cd16ee4abf51af4076d222c6bdb03fe1 Mon Sep 17 00:00:00 2001 From: Valeriia Ziukina Date: Mon, 26 Jan 2026 16:13:58 +0100 Subject: [PATCH 4/4] fix(vpcep): fix due to comments --- .../tests/functional/sdk/apig/v2/test_vpc_endpoint.py | 9 ++++----- .../tests/functional/sdk/vpcep/v1/test_service.py | 9 --------- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/otcextensions/tests/functional/sdk/apig/v2/test_vpc_endpoint.py b/otcextensions/tests/functional/sdk/apig/v2/test_vpc_endpoint.py index ec6af312b..a7c1d3d9c 100644 --- a/otcextensions/tests/functional/sdk/apig/v2/test_vpc_endpoint.py +++ b/otcextensions/tests/functional/sdk/apig/v2/test_vpc_endpoint.py @@ -14,17 +14,16 @@ class TestVpcEndpoint(TestApiG): - gateway = "4a5d39b75bc341e89033c65e97ad5bca" + gateway = "" def setUp(self): super(TestVpcEndpoint, self).setUp() def test_01_list_vpc_endpoint(self): self.conn.vpcep.create_endpoint( - subnet_id='1bc849dc-4426-455a-8207-fb5f726c6ff7', - endpoint_service_id='b002616f-f386-4da1-8118-04d5e090200f', - vpc_id='80a6ba4b-44d7-4c57-be40-5013a1e55273', - + subnet_id='', + endpoint_service_id='', + vpc_id='', ) list(self.client.vpc_endpoints( gateway=TestVpcEndpoint.gateway, diff --git a/otcextensions/tests/functional/sdk/vpcep/v1/test_service.py b/otcextensions/tests/functional/sdk/vpcep/v1/test_service.py index 167859666..16694807b 100644 --- a/otcextensions/tests/functional/sdk/vpcep/v1/test_service.py +++ b/otcextensions/tests/functional/sdk/vpcep/v1/test_service.py @@ -104,12 +104,3 @@ def test_update_service(self): fetched = self.client.get_service(service.id) self.assertIn(updated_name, fetched.service_name) self.assertTrue(fetched.is_approval_enabled) - - def test_delete_service(self): - """Test deleting an Endpoint Service.""" - service = self._create_service(remove=False) - self.client.delete_service(service.id) - time.sleep(5) - - services = list(self.client.services(id=service.id)) - self.assertEqual(0, len(services))