diff --git a/coriolis/tests/integration/base.py b/coriolis/tests/integration/base.py index 0fd65553..cf27bade 100644 --- a/coriolis/tests/integration/base.py +++ b/coriolis/tests/integration/base.py @@ -22,11 +22,14 @@ from keystoneauth1 import session as ks_session from keystoneauth1 import token_endpoint from oslo_config import cfg +from oslo_db.sqlalchemy import models from oslo_log import log as logging +import oslo_messaging as messaging from coriolis import constants from coriolis import context from coriolis.db import api as db_api +from coriolis.providers import factory as providers_factory from coriolis.tests.integration import harness from coriolis.tests.integration import utils as test_utils from coriolis.tests import test_base @@ -34,6 +37,18 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) +# Statuses that represent a completed allocation attempt. +MINION_ALLOCATED_TERMINAL = { + constants.MINION_POOL_STATUS_ALLOCATED, + constants.MINION_POOL_STATUS_ERROR, +} + +# Statuses that represent a completed deallocation attempt. +MINION_DEALLOCATED_TERMINAL = { + constants.MINION_POOL_STATUS_DEALLOCATED, + constants.MINION_POOL_STATUS_ERROR, +} + class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase): """Base class for integration tests.""" @@ -55,7 +70,36 @@ def setUpClass(cls): def setUp(self): super().setUp() - patcher = mock.patch("psutil.Process.send_signal") + to_patch = [ + # Prevent the test runner from being killed by Coriolis sending + # SIGINT to in-process workers. + "psutil.Process.send_signal", + # When removing minion pools, this is also called. + # There is no Keystone, so it needs to be mocked. + "coriolis.keystone.delete_trust", + ] + for thing in to_patch: + patcher = mock.patch(thing) + patcher.start() + self.addCleanup(patcher.stop) + + # fake:// oslo_messaging doesn't serialize objects. After calls, some + # actions may remain as objects, yet they are expected to be dicts. + self._patch_rpc_client_method("call") + self._patch_rpc_client_method("cast") + + def _patch_rpc_client_method(self, method): + original_call = getattr(messaging.RPCClient, method) + + def _call(self, ctxt, method, **kwargs): + for key, value in kwargs.items(): + if isinstance(value, models.ModelBase): + kwargs[key] = dict(value.items()) + + return original_call(self, ctxt, method, **kwargs) + + patcher = mock.patch.object( + messaging.RPCClient, method, _call) patcher.start() self.addCleanup(patcher.stop) @@ -83,7 +127,7 @@ def _create_endpoint(self, **kwargs): return endpoint - def _create_transfer(self, src_id, dst_id, instances): + def _create_transfer(self, src_id, dst_id, instances, **kwargs): """Create a Replica transfer object and return its ID.""" transfer = self._client.transfers.create( origin_endpoint_id=src_id, @@ -96,11 +140,78 @@ def _create_transfer(self, src_id, dst_id, instances): storage_mappings={}, notes="integration test replica", skip_os_morphing=True, + **kwargs, ) self.addCleanup(self._client.transfers.delete, transfer.id) return transfer + def _create_pool( + self, endpoint_id, name="test-pool", skip_allocation=True): + pool = self._client.minion_pools.create( + name=name, + endpoint=endpoint_id, + platform=constants.PROVIDER_PLATFORM_DESTINATION, + os_type=constants.OS_TYPE_LINUX, + environment_options={}, + minimum_minions=1, + maximum_minions=1, + minion_max_idle_time=3600, + minion_retention_strategy=( + constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE), + skip_allocation=skip_allocation, + ) + self.addCleanup(self._safe_delete_pool, pool.id) + + return pool + + def _safe_delete_pool(self, pool_id): + """Delete pool, force-deallocating first if needed.""" + try: + pool = self._client.minion_pools.get(pool_id) + except Exception: + return + + if pool.status not in MINION_DEALLOCATED_TERMINAL: + try: + self._client.minion_pools.deallocate_minion_pool( + pool_id, force=True) + self._wait_for_pool(pool_id, MINION_DEALLOCATED_TERMINAL) + except Exception: + pass + + try: + self._client.minion_pools.delete(pool_id) + except Exception: + pass + + def _wait_for_pool(self, pool_id, terminal_statuses, timeout=180): + """Poll the DB until *pool_id* reaches one of *terminal_statuses*. + + :returns: minion pool ORM object. + :raises: AssertionError on timeout. + """ + ctxt = self._get_db_context() + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + pool = db_api.get_minion_pool(ctxt, pool_id) + if pool and pool.status in terminal_statuses: + return pool + time.sleep(1) + pool = db_api.get_minion_pool(ctxt, pool_id) + last = pool.status if pool else "not found" + self.fail( + "Pool %s did not reach one of %r within %ds (last: %s)" + % (pool_id, terminal_statuses, timeout, last) + ) + + def _get_db_context(self): + return context.RequestContext( + user='int-test', + project_id=harness._TEST_PROJECT_ID, + is_admin=True, + ) + @staticmethod def _ignoreExc(func, ignored_exc=Exception): """Wrap the given function, ignoring exceptions.""" @@ -116,6 +227,7 @@ def f(*args, **kwargs): class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase): _CREATE_SCSI_DBG_DEVS = True + _CREATE_MINION_POOLS = False @classmethod def setUpClass(cls): @@ -138,6 +250,7 @@ def setUp(self): self._src_device = None self._dst_device = None + self._pool_id = None if self._CREATE_SCSI_DBG_DEVS: self._src_device = test_utils.add_scsi_debug_device() @@ -169,11 +282,26 @@ def setUp(self): }, ) + # Create minion pool if needed. + if self._CREATE_MINION_POOLS: + pool = self._create_pool( + self._dst_endpoint.id, "transfer-pool", skip_allocation=False) + self._pool_id = pool.id + + pool_obj = self._wait_for_pool(pool.id, MINION_ALLOCATED_TERMINAL) + + self.assertEqual( + constants.MINION_POOL_STATUS_ALLOCATED, + pool_obj.status, + "Pool did not reach ALLOCATED (got %s)" % pool_obj.status, + ) + # Create transfer replica. self._transfer = self._create_transfer( self._src_endpoint.id, self._dst_endpoint.id, instances=[self._src_device], + destination_minion_pool_id=self._pool_id, ) # mock a few commands that are going to be ran through ssh; they won't @@ -196,13 +324,6 @@ def _execute_and_wait(self, transfer_id, timeout=300): transfer_id, shutdown_instances=False) self.assertExecutionCompleted(execution.id, timeout=timeout) - def _get_db_context(self): - return context.RequestContext( - user='int-test', - project_id=harness._TEST_PROJECT_ID, - is_admin=True, - ) - def wait_for_execution(self, execution_id, timeout=300, desired_statuses=None): """Block until *execution_id* reaches a terminal state. @@ -324,3 +445,83 @@ def _slow_call(*args, **kwargs): ) patcher.start() self.addCleanup(patcher.stop) + + +class MinionPoolTestBase(CoriolisIntegrationTestBase): + """Base class for minion pool integration tests. + + Skips the entire test class when the import provider does not advertise + ``PROVIDER_TYPE_DESTINATION_MINION_POOL`` support. + """ + + @classmethod + def setUpClass(cls): + super().setUpClass() + + available = providers_factory.get_available_providers() + imp_types = available.get(cls._imp_platform, {}).get("types", []) + if constants.PROVIDER_TYPE_DESTINATION_MINION_POOL not in imp_types: + raise unittest.SkipTest( + "Import provider '%s' does not support minion pools" + % cls._imp_platform + ) + + +class MinionPoolReplicaTestBase( + MinionPoolTestBase, ReplicaIntegrationTestBase): + """Base class for replica integration tests using minion pools. + + Extends the assertions to also verify that the minions in the pool have + been used, and that the minions and the pool returns to an available state. + """ + + _CREATE_MINION_POOLS = True + + def _execute_and_wait(self, transfer_id, timeout=300): + super()._execute_and_wait(transfer_id, timeout=timeout) + self.assertPoolAllocated(self._pool_id) + self.assertMachinesAvailable(self._pool_id) + + def assertExecutionCompleted(self, execution_id, timeout=300): + super().assertExecutionCompleted(execution_id, timeout=timeout) + self.assertPoolAllocated(self._pool_id) + self.assertMachinesAvailable(self._pool_id) + + def assertDeploymentCompleted(self, deployment_id, timeout=300): + super().assertDeploymentCompleted(deployment_id, timeout=timeout) + self.assertPoolAllocated(self._pool_id) + self.assertMachinesAvailable(self._pool_id) + + def assertPoolAllocated(self, pool_id): + """Assert the pool is healthy and still in ALLOCATED status.""" + ctxt = self._get_db_context() + pool = db_api.get_minion_pool(ctxt, pool_id) + self.assertIsNotNone(pool, "Pool %s not found" % pool_id) + self.assertEqual( + constants.MINION_POOL_STATUS_ALLOCATED, + pool.status, + "Pool %s is not ALLOCATED (got %s)" % (pool_id, pool.status), + ) + + def assertMachinesAvailable(self, pool_id): + """Assert all machines in the pool are AVAILABLE and have been used.""" + ctxt = self._get_db_context() + pool = db_api.get_minion_pool(ctxt, pool_id, include_machines=True) + self.assertIsNotNone(pool, "Pool %s not found" % pool_id) + self.assertTrue( + pool.minion_machines, + "Pool %s has no minion machines" % pool_id, + ) + for machine in pool.minion_machines: + self.assertEqual( + constants.MINION_MACHINE_STATUS_AVAILABLE, + machine.allocation_status, + "Machine %s in pool %s is not AVAILABLE (got %s)" + % (machine.id, pool_id, machine.allocation_status), + ) + self.assertIsNotNone( + machine.last_used_at, + "Machine %s in pool %s has no last_used_at; " + "it may not have been used by the transfer" + % (machine.id, pool_id), + ) diff --git a/coriolis/tests/integration/harness.py b/coriolis/tests/integration/harness.py index db8f6a4e..07206660 100644 --- a/coriolis/tests/integration/harness.py +++ b/coriolis/tests/integration/harness.py @@ -25,7 +25,6 @@ import socket import subprocess import tempfile -from unittest import mock import uuid from cheroot.workers import threadpool as cheroot_threadpool @@ -49,10 +48,12 @@ from coriolis.db.sqlalchemy import migration as db_migration from coriolis.deployer_manager.rpc import server as deployer_manager_rpc_server from coriolis import exception +from coriolis.minion_manager.rpc import server as minion_manager_rpc_server from coriolis import policy as policy_module from coriolis import rpc as rpc_module from coriolis.scheduler.rpc import server as scheduler_rpc_server from coriolis import service +from coriolis.taskflow import runner as taskflow_runner from coriolis.tasks import factory as task_runners_factory from coriolis.tests.integration import utils as test_utils from coriolis.transfer_cron.rpc import server as transfer_cron_rpc_server @@ -204,6 +205,36 @@ def _run(): return result +class _InProcessTaskflowRunner(taskflow_runner.TaskFlowRunner): + """Runs taskflow flows in a daemon thread instead of a child process. + + The default runner uses multiprocessing.spawn, which creates an isolated + process with its own fake:// transport instance (no registered services). + Running in-thread lets pool tasks reach the conductor, scheduler, and + worker that are already live in this process. + """ + + def run_flow_in_background(self, flow, store=None): + coriolis_utils.start_thread( + target=self._run_flow, + args=(flow,), + kwargs={"store": store}, + daemon=True, + ) + + +class _InProcessMinionManagerServerEndpoint( + minion_manager_rpc_server.MinionManagerServerEndpoint): + """Minion manager endpoint that runs pool task flows in-thread.""" + + @property + def _taskflow_runner(self): + return _InProcessTaskflowRunner( + constants.MINION_MANAGER_MAIN_MESSAGING_TOPIC, + max_workers=25, + ) + + class _IntegrationHarness: """Shared Integration tests infrastructure; created once per process. @@ -259,6 +290,13 @@ def __init__(self): 'retry_interval', 1, group='database') cfg.CONF.set_override( 'lock_path', self.lock_path, group='oslo_concurrency') + + # Disable automatic pool-refresh cron jobs (they would try to contact + # Keystone for trust maintenance). + cfg.CONF.set_override( + 'minion_pool_default_refresh_period_minutes', 0, + group='minion_manager') + coriolis_utils.setup_logging() test_utils.init_scsi_debug() @@ -275,6 +313,7 @@ def __init__(self): self._scheduler_svc = None self._transfer_cron_svc = None self._deployer_manager_svc = None + self._minion_manager_svc = None self._worker_svc = None self._worker_host_svc = None @@ -315,7 +354,6 @@ def _start_coriolis_services(self): # Conductor: must start first so the worker can register with it. conductor_endpoint = conductor_rpc_server.ConductorServerEndpoint() conductor_endpoint._licensing_client = None - conductor_endpoint._minion_manager_client_instance = mock.MagicMock() self._conductor_svc = service.MessagingService( constants.CONDUCTOR_MAIN_MESSAGING_TOPIC, [conductor_endpoint], @@ -354,6 +392,18 @@ def _start_coriolis_services(self): ) self._deployer_manager_svc.start() + # Minion manager: runs pool lifecycle task flows in-thread so that + # they can reach the in-process conductor, scheduler, and worker over + # the fake:// transport. + self._minion_manager_svc = service.MessagingService( + constants.MINION_MANAGER_MAIN_MESSAGING_TOPIC, + [_InProcessMinionManagerServerEndpoint()], + minion_manager_rpc_server.VERSION, + worker_count=1, + init_rpc=False, + ) + self._minion_manager_svc.start() + # Worker: constructor calls _register_worker_service() which makes a # blocking RPC call to the conductor, so the conductor must already be # listening. @@ -432,8 +482,9 @@ def _teardown(self): pass for svc in [self._worker_host_svc, self._worker_svc, - self._deployer_manager_svc, self._transfer_cron_svc, - self._scheduler_svc, self._conductor_svc]: + self._minion_manager_svc, self._deployer_manager_svc, + self._transfer_cron_svc, self._scheduler_svc, + self._conductor_svc]: if not svc: continue try: diff --git a/coriolis/tests/integration/providers/test_provider/exp.py b/coriolis/tests/integration/providers/test_provider/exp.py index 0933e1a6..c0fa964a 100644 --- a/coriolis/tests/integration/providers/test_provider/exp.py +++ b/coriolis/tests/integration/providers/test_provider/exp.py @@ -187,7 +187,7 @@ def deploy_replica_source_resources( pkey_path = connection_info["pkey_path"] container_name = "coriolis-replicator-%s" % uuid.uuid4().hex[:8] - container_id = test_utils.start_container( + container_id = test_utils.run_container( test_utils.DATA_MINION_IMAGE, container_name, is_systemd=True, @@ -218,7 +218,7 @@ def deploy_replica_source_resources( }, } except Exception: - test_utils.stop_container(container_id) + test_utils.remove_container(container_id) raise def delete_replica_source_resources( @@ -226,7 +226,7 @@ def delete_replica_source_resources( migr_resources_dict): container_id = (migr_resources_dict or {}).get("container_id") if container_id: - test_utils.stop_container(container_id) + test_utils.remove_container(container_id) def replicate_disks( self, ctxt, connection_info, source_environment, instance_name, diff --git a/coriolis/tests/integration/providers/test_provider/imp.py b/coriolis/tests/integration/providers/test_provider/imp.py index e4e9f479..0060429c 100644 --- a/coriolis/tests/integration/providers/test_provider/imp.py +++ b/coriolis/tests/integration/providers/test_provider/imp.py @@ -16,6 +16,7 @@ import paramiko from coriolis.providers import backup_writers +from coriolis.providers.base import BaseDestinationMinionPoolProvider from coriolis.providers.base import BaseEndpointDestinationOptionsProvider from coriolis.providers.base import BaseEndpointNetworksProvider from coriolis.providers.base import BaseEndpointProvider @@ -24,6 +25,7 @@ from coriolis.providers.base import BaseReplicaImportValidationProvider from coriolis.providers.base import BaseUpdateDestinationReplicaProvider from coriolis.tests.integration import utils as test_utils +from coriolis import utils as coriolis_utils LOG = logging.getLogger(__name__) @@ -38,7 +40,8 @@ class TestImportProvider( BaseEndpointStorageProvider, BaseUpdateDestinationReplicaProvider, BaseReplicaImportProvider, - BaseReplicaImportValidationProvider): + BaseReplicaImportValidationProvider, + BaseDestinationMinionPoolProvider): """Destination-side provider backed by a local `scsi_debug` block device. ``connection_info`` (the destination endpoint's connection info) has the @@ -144,16 +147,29 @@ def deploy_replica_disks( def deploy_replica_target_resources( self, ctxt, connection_info, target_environment, volumes_info): + result = self._create_minion( + "coriolis-writer", connection_info, volumes_info) + + return { + "volumes_info": volumes_info, + "connection_info": result["backup_writer_connection_info"], + "migr_resources": {"container_id": result["container_id"]}, + } + + def _create_minion( + self, name_prefix, connection_info, volumes_info, + device_cgroup_rules=None): pkey_path = connection_info["pkey_path"] dest_devices = [vol["volume_dev"] for vol in volumes_info] - container_name = "coriolis-writer-%s" % uuid.uuid4().hex[:8] + container_name = "%s-%s" % (name_prefix, uuid.uuid4().hex[:8]) - container_id = test_utils.start_container( + container_id = test_utils.run_container( test_utils.DATA_MINION_IMAGE, container_name, is_systemd=True, ssh_key=f"{pkey_path}.pub", devices=dest_devices, + device_cgroup_rules=device_cgroup_rules, ) try: @@ -172,15 +188,15 @@ def deploy_replica_target_resources( writer_conn_details = bootstrapper.setup_writer() return { - "volumes_info": volumes_info, - "connection_info": { + "container_id": container_id, + "ssh_connection_info": ssh_conn_info, + "backup_writer_connection_info": { "backend": "http_backup_writer", "connection_details": writer_conn_details, }, - "migr_resources": {"container_id": container_id}, } except Exception: - test_utils.stop_container(container_id) + test_utils.remove_container(container_id) raise def delete_replica_target_resources( @@ -188,7 +204,7 @@ def delete_replica_target_resources( migr_resources_dict): container_id = (migr_resources_dict or {}).get("container_id") if container_id: - test_utils.stop_container(container_id) + test_utils.remove_container(container_id) def delete_replica_disks( self, ctxt, connection_info, target_environment, volumes_info): @@ -265,3 +281,119 @@ def validate_replica_import_input( def validate_replica_deployment_input( self, ctxt, connection_info, target_environment, export_info): return {} + + # BaseDestinationMinionPoolProvider + + def get_minion_pool_environment_schema(self): + return self.get_target_environment_schema() + + def get_minion_pool_options( + self, ctxt, connection_info, env=None, option_names=None): + return self.get_target_environment_options( + ctxt, connection_info, env, option_names) + + def validate_minion_compatibility_for_transfer( + self, ctxt, connection_info, export_info, environment_options, + minion_properties): + pass + + def validate_minion_pool_environment_options( + self, ctxt, connection_info, environment_options): + pass + + def set_up_pool_shared_resources( + self, ctxt, connection_info, environment_options, pool_identifier): + return {} + + def tear_down_pool_shared_resources( + self, ctxt, connection_info, environment_options, + pool_shared_resources): + pass + + def create_minion( + self, ctxt, connection_info, environment_options, pool_identifier, + pool_os_type, pool_shared_resources, new_minion_identifier): + # Devices are hotplugged after container creation via mknod / nsenter. + # We must pre-authorize all block devices through the + # --device-cgroup-rule option, otherwise any device added will be + # inaccessible ("operation not permitted" error on open). + result = self._create_minion( + "coriolis-pool-minion", connection_info, [], + device_cgroup_rules=["b *:* rwm"]) + + backup_writer_conn_info = result["backup_writer_connection_info"] + return { + "connection_info": result["ssh_connection_info"], + "backup_writer_connection_info": backup_writer_conn_info, + "minion_provider_properties": { + "container_id": result["container_id"], + }, + } + + def delete_minion(self, ctxt, connection_info, minion_properties): + container_id = (minion_properties or {}).get("container_id") + if container_id: + test_utils.remove_container(container_id) + + def shutdown_minion(self, ctxt, connection_info, minion_properties): + container_id = (minion_properties or {}).get("container_id") + if container_id: + test_utils.stop_container(container_id) + + def start_minion(self, ctxt, connection_info, minion_properties): + container_id = (minion_properties or {}).get("container_id") + if container_id: + test_utils.start_container(container_id) + + def attach_volumes_to_minion( + self, ctxt, connection_info, minion_properties, + minion_connection_info, volumes_info): + container_id = minion_properties["container_id"] + for vol in volumes_info: + device_path = vol["volume_dev"] + test_utils.hotplug_device_to_container(container_id, device_path) + + return { + "minion_properties": minion_properties, + "volumes_info": volumes_info, + } + + def detach_volumes_from_minion( + self, ctxt, connection_info, minion_properties, + minion_connection_info, volumes_info): + container_id = (minion_properties or {}).get("container_id") + if not container_id: + return + + for vol in (volumes_info or []): + dev_path = vol.get("volume_dev") + if not dev_path: + continue + + test_utils.unplug_device_from_container(container_id, dev_path) + + return { + "minion_properties": minion_properties, + "volumes_info": volumes_info, + } + + def healthcheck_minion( + self, ctxt, connection_info, minion_properties, + minion_connection_info): + ip = minion_connection_info.get("ip") + port = minion_connection_info.get("port", 22) + username = minion_connection_info.get("username", "root") + pkey = minion_connection_info.get("pkey") + + client = coriolis_utils.connect_ssh(ip, port, username, pkey=pkey) + client.close() + + def validate_osmorphing_minion_compatibility_for_transfer( + self, ctxt, connection_info, export_info, environment_options, + minion_properties): + pass + + def get_additional_os_morphing_info( + self, ctxt, connection_info, target_environment, + instance_deployment_info): + return {} diff --git a/coriolis/tests/integration/test_deployment.py b/coriolis/tests/integration/test_deployment.py index 18b7e00f..79a1b09b 100644 --- a/coriolis/tests/integration/test_deployment.py +++ b/coriolis/tests/integration/test_deployment.py @@ -86,3 +86,8 @@ def test_cancel_deployment(self): constants.EXECUTION_STATUS_CANCELED_FOR_DEBUGGING, ], ) + + +class MinionPoolReplicaDeploymentTests( + base.MinionPoolReplicaTestBase, ReplicaDeploymentIntegrationTest): + """Replica deployment that uses a pre-allocated destination minion pool.""" diff --git a/coriolis/tests/integration/test_minion_pools.py b/coriolis/tests/integration/test_minion_pools.py new file mode 100644 index 00000000..581acaa7 --- /dev/null +++ b/coriolis/tests/integration/test_minion_pools.py @@ -0,0 +1,86 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +"""Integration tests for the Minion Pool lifecycle API. + +Exercises minion-pool operations via the Coriolis REST API: +- CRUD without allocation (create skip_allocation=True, list, get, update, + delete) +- Full allocation lifecycle (allocate -> wait for ALLOCATED -> refresh -> + deallocate -> wait for DEALLOCATED -> delete) +""" + +from coriolis import constants +from coriolis.tests.integration import base + + +class MinionPoolLifecycleTest(base.MinionPoolTestBase): + + def setUp(self): + super().setUp() + + self._endpoint = self._create_endpoint( + name="pool-dst", + endpoint_type=self._imp_platform, + connection_info={ + "devices": [], + "pkey_path": self._harness.ssh_key_path, + }, + ) + + def test_minion_pool_crud(self): + # Create + pool = self._create_pool(self._endpoint.id) + + self.assertEqual("test-pool", pool.name) + self.assertEqual( + constants.MINION_POOL_STATUS_DEALLOCATED, pool.status) + + # List + pools = self._client.minion_pools.list() + + pool_ids = [p.id for p in pools] + self.assertIn(pool.id, pool_ids) + + # Get + fetched = self._client.minion_pools.get(pool.id) + + self.assertEqual(pool.id, fetched.id) + self.assertEqual("test-pool", fetched.name) + + # Update + updated = self._client.minion_pools.update( + pool.id, {"notes": "updated notes"}) + + self.assertEqual("updated notes", updated.notes) + + # Delete + self._client.minion_pools.delete(pool.id) + + pools = self._client.minion_pools.list() + self.assertNotIn(pool.id, [p.id for p in pools]) + + def test_allocate_deallocate(self): + pool = self._create_pool(self._endpoint.id) + self.assertEqual( + constants.MINION_POOL_STATUS_DEALLOCATED, pool.status) + + # Allocate + self._client.minion_pools.allocate_minion_pool(pool.id) + + final = self._wait_for_pool(pool.id, base.MINION_ALLOCATED_TERMINAL) + self.assertEqual( + constants.MINION_POOL_STATUS_ALLOCATED, + final.status, + "Pool allocation ended in unexpected status '%s'" % final.status, + ) + + # Deallocate + self._client.minion_pools.deallocate_minion_pool(pool.id) + + final = self._wait_for_pool(pool.id, base.MINION_DEALLOCATED_TERMINAL) + self.assertEqual( + constants.MINION_POOL_STATUS_DEALLOCATED, + final.status, + "Pool deallocation ended in unexpected status '%s'" % final.status, + ) diff --git a/coriolis/tests/integration/transfers/test_executions.py b/coriolis/tests/integration/transfers/test_executions.py index cd3e37c2..f77576e2 100644 --- a/coriolis/tests/integration/transfers/test_executions.py +++ b/coriolis/tests/integration/transfers/test_executions.py @@ -116,3 +116,8 @@ def _test_cancel_running_execution(self, force): "Expected a canceled/error status after cancel, got %s" % final.status, ) + + +class MinionPoolTransferExecutionsTests( + base.MinionPoolReplicaTestBase, TransferExecutionsTests): + """Transfer executions that use a pre-allocated destination minion pool.""" diff --git a/coriolis/tests/integration/transfers/test_transfer.py b/coriolis/tests/integration/transfers/test_transfer.py index e917bc8c..23e8625b 100644 --- a/coriolis/tests/integration/transfers/test_transfer.py +++ b/coriolis/tests/integration/transfers/test_transfer.py @@ -73,3 +73,13 @@ def test_incremental_replica_transfer(self): test_utils.devices_match(self._src_device, self._dst_device), "Destination does not match source after incremental transfer", ) + + +class MinionPoolTransferTest( + base.MinionPoolReplicaTestBase, ReplicaTransferIntegrationTest): + """Transfer execution that uses a pre-allocated destination minion pool.""" + + def test_transfer(self): + super().test_transfer() + self.assertPoolAllocated(self._pool_id) + self.assertMachinesAvailable(self._pool_id) diff --git a/coriolis/tests/integration/utils.py b/coriolis/tests/integration/utils.py index a45e6a3e..8bbc8d2f 100644 --- a/coriolis/tests/integration/utils.py +++ b/coriolis/tests/integration/utils.py @@ -182,7 +182,12 @@ def wait_for_ssh(host, port, username, pkey_path, timeout=30): # Docker utils -def _start_container(image, name, extra_args=None): +def start_container(container_id): + """Start a stopped Docker container.""" + _run(["docker", "start", container_id], check=False) + + +def _run_container(image, name, extra_args=None): cmd = ["docker", "run", "--detach", "--name", name] if extra_args: cmd.extend(extra_args) @@ -191,9 +196,9 @@ def _start_container(image, name, extra_args=None): return result.stdout.decode().strip() -def start_container( +def run_container( image, name, is_systemd=False, ssh_key=None, volumes=None, devices=None, - extra_args=None, + device_cgroup_rules=None, extra_args=None, ): """Start a detached Docker container and return its container ID. @@ -204,11 +209,15 @@ def start_container( :param ssh_key: SSH key to add as a volume to the authorized_keys. :param volumes: List of volumes to attach to the container. :param devices: List of devices to attach to the container. + :param device_cgroup_rules: List of device cgroup rules (e.g. + ``["b *:* rwm"]``). This is needed for device hotplug after container + creation. :param extra_args: Optional list of extra ``docker run`` arguments. :returns: container ID string (stripped). """ volumes = volumes or [] devices = devices or [] + device_cgroup_rules = device_cgroup_rules or [] extra_args = extra_args or [] sec_opts = [] caps = [] @@ -228,21 +237,29 @@ def start_container( for device in devices: extra_args += ["--device", f"{device}:{device}"] + for rule in device_cgroup_rules: + extra_args += ["--device-cgroup-rule", rule] + for cap in caps: extra_args += ["--cap-add", cap] for sec_opt in sec_opts: extra_args += ["--security-opt", sec_opt] - return _start_container(image, name, extra_args) + return _run_container(image, name, extra_args) def stop_container(container_id): + """Stop a Docker container.""" + _run(["docker", "stop", "--time", "5", container_id], check=False) + + +def remove_container(container_id): """Stop and remove a Docker container, ignoring errors. :param container_id: container ID or name to stop / remove. """ - _run(["docker", "stop", "--time", "5", container_id], check=False) + stop_container(container_id) _run(["docker", "rm", "--force", container_id], check=False) @@ -257,3 +274,32 @@ def get_container_ip(container_id): "{{.NetworkSettings.IPAddress}}", container_id]) return result.stdout.decode().strip() + + +def _get_container_pid(container_id): + """Return the host PID of the init process of *container_id*.""" + result = _run( + ["docker", "inspect", "--format", "{{.State.Pid}}", container_id]) + return int(result.stdout.decode().strip()) + + +def hotplug_device_to_container(container_id, device_path): + """Create a device node for *device_path* in *container_id*'s namespace.""" + pid = _get_container_pid(container_id) + stat_result = os.stat(device_path) + major = os.major(stat_result.st_rdev) + minor = os.minor(stat_result.st_rdev) + + _run([ + "nsenter", "--target", str(pid), "--mount", "--", + "mknod", device_path, "b", str(major), str(minor), + ]) + + +def unplug_device_from_container(container_id, device_path): + """Remove a device node from *container_id*'s mount namespace.""" + pid = _get_container_pid(container_id) + _run([ + "nsenter", "--target", str(pid), "--mount", "--", + "rm", "-f", device_path, + ], check=False)