diff --git a/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json b/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json index 5baf549a..96ccfd03 100644 --- a/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json +++ b/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json @@ -49,6 +49,7 @@ } }, "executions": [], - "scenario": "replica" + "scenario": "replica", + "clustered": false } } diff --git a/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json b/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json index 55749ec6..5e48ef5b 100644 --- a/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json +++ b/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json @@ -42,6 +42,7 @@ "origin_minion_pool_id": null, "destination_minion_pool_id": null, "instance_osmorphing_minion_pool_mappings": {}, + "clustered": false, "executions": [ { "created_at": "2019-07-11T10:06:47.000000", diff --git a/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json b/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json index 14c909b3..c94b6164 100644 --- a/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json +++ b/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json @@ -74,7 +74,8 @@ "instances": {} }, "id": "0460aa4d-6b16-4c98-bd56-27ee186e4a22", - "scenario": "replica" + "scenario": "replica", + "clustered": false } ] } diff --git a/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json b/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json index 8bcdbbf0..8332a19f 100644 --- a/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json +++ b/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json @@ -133,7 +133,8 @@ "ubuntu-xenial": "echo 'anything you need'" } }, - "scenario": "replica" + "scenario": "replica", + "clustered": false } } } diff --git a/coriolis/api-refs/source/parameters.yaml b/coriolis/api-refs/source/parameters.yaml index 3b332f37..68b5475e 100644 --- a/coriolis/api-refs/source/parameters.yaml +++ b/coriolis/api-refs/source/parameters.yaml @@ -130,6 +130,15 @@ connection_info_schema: in: body type: object required: false +clustered: + description: | + Present on transfer responses. ``true`` when more than one instance is + listed (multi-instance scheduling: sync barriers and shared-disk + coordination). Set by the server at creation from ``instances``; not + accepted on create. + in: body + type: boolean + required: false deployment_cancel: description: | Object containing information about the type of deployment cancellation. diff --git a/coriolis/api-refs/source/transfer.inc b/coriolis/api-refs/source/transfer.inc index fa17913b..e9c86d21 100644 --- a/coriolis/api-refs/source/transfer.inc +++ b/coriolis/api-refs/source/transfer.inc @@ -51,6 +51,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer List Response** @@ -111,6 +112,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer Show Response** @@ -183,6 +185,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer Create Response** diff --git a/coriolis/conductor/rpc/server.py b/coriolis/conductor/rpc/server.py index 98bec5e1..b449a55e 100644 --- a/coriolis/conductor/rpc/server.py +++ b/coriolis/conductor/rpc/server.py @@ -755,6 +755,11 @@ def _begin_tasks( ctxt, task, origin_endpoint, destination_endpoint, retry_count=scheduling_retry_count, retry_period=scheduling_retry_period) + instance_task_info = task_info.get(task.instance, {}) + # NOTE: keep task_info in line with what we store after + # update_transfer_action_info_for_instance + # (export vs volumes) + utils.sync_instance_volumes_with_export(instance_task_info) worker_rpc.begin_task( ctxt, task_id=task.id, @@ -762,7 +767,7 @@ def _begin_tasks( origin=origin, destination=destination, instance=task.instance, - task_info=task_info.get(task.instance, {})) + task_info=instance_task_info) except Exception: LOG.warn( "Error occured while starting new task '%s'. " @@ -786,6 +791,42 @@ def _begin_tasks( "No tasks were started at the beginning of execution '%s'" % ( execution.id)) + @staticmethod + def _apply_clustered_replicate_peer_deploy_dependencies( + transfer, execution + ): + """Order REPLICATE_DISKS after every peer's DEPLOY_TRANSFER_DISKS. + + For clustered transfers, deploy populates volumes_info (including + replicate_disk_data for shared disks). Without this, a waiter's + REPLICATE_DISKS can start before another instance's deploy finishes, + so both sides still replicate. + """ + if not getattr(transfer, "clustered", False): + return + if len(transfer.instances) < 2: + return + deploy_id_by_instance = { + t.instance: t.id for t in execution.tasks + if t.task_type == constants.TASK_TYPE_DEPLOY_TRANSFER_DISKS} + for t in execution.tasks: + if t.task_type != constants.TASK_TYPE_REPLICATE_DISKS: + continue + seen = set() + new_deps = [] + for dep_id in (t.depends_on or []): + if dep_id not in seen: + seen.add(dep_id) + new_deps.append(dep_id) + for inst in transfer.instances: + if inst == t.instance: + continue + peer_deploy_id = deploy_id_by_instance.get(inst) + if peer_deploy_id and peer_deploy_id not in seen: + seen.add(peer_deploy_id) + new_deps.append(peer_deploy_id) + t.depends_on = new_deps + def _check_execution_tasks_sanity( self, execution, initial_task_info): """ Checks whether the given execution's tasks are: @@ -794,10 +835,10 @@ def _check_execution_tasks_sanity( """ all_instances_in_tasks = { t.instance for t in execution.tasks} - instances_tasks_mapping = { - instance: [ - t for t in execution.tasks if t.instance == instance] - for instance in all_instances_in_tasks} + if not all_instances_in_tasks: + return + if initial_task_info is None: + initial_task_info = {} def _check_task_cls_param_requirements(task, instance_task_info_keys): task_cls = tasks_factory.get_task_runner_class(task.task_type) @@ -813,108 +854,85 @@ def _check_task_cls_param_requirements(task, instance_task_info_keys): missing_params)) return task_cls.get_returned_task_info_properties() - for instance, instance_tasks in instances_tasks_mapping.items(): - task_info_keys = set(initial_task_info.get( - instance, {}).keys()) - # mapping between the ID and associated object of processed tasks: - processed_tasks = {} - tasks_to_process = { - t.id: t for t in instance_tasks} - while tasks_to_process: - queued_tasks = [] - # gather all tasks which will be queued to run in parallel: - for task in tasks_to_process.values(): - if task.status in ( - constants.TASK_STATUS_SCHEDULED, - constants.TASK_STATUS_ON_ERROR_ONLY): - if not task.depends_on: - queued_tasks.append(task) - else: - missing_deps = [ - dep_id - for dep_id in task.depends_on - if dep_id not in tasks_to_process and ( - dep_id not in processed_tasks)] - if missing_deps: - raise exception.TaskDependencyException( - "Task '%s' (type '%s') for instance '%s' " - "has non-existent tasks referenced as " - "dependencies: %s" % ( - task.id, task.task_type, - instance, missing_deps)) - if all( - [dep_id in processed_tasks - for dep_id in task.depends_on]): - queued_tasks.append(task) - else: - raise exception.InvalidTaskState( - "Invalid initial state '%s' for task '%s' " - "of type '%s'." % ( - task.status, task.id, task.task_type)) - - # check if nothing was left queued: - if not queued_tasks: - remaining_tasks_deps_map = { - (tid, t.task_type): t.depends_on - for tid, t in tasks_to_process.items()} - processed_tasks_type_map = { - tid: t.task_type - for tid, t in processed_tasks.items()} - raise exception.ExecutionDeadlockException( - "Execution '%s' (type '%s') is bound to be deadlocked:" - " there are leftover tasks for instance '%s' which " - "will never get queued. Already processed tasks are: " - "%s. Tasks left: %s" % ( - execution.id, execution.type, instance, - processed_tasks_type_map, remaining_tasks_deps_map - )) - - # mapping for task_info fields modified by each task: - modified_fields_by_queued_tasks = {} - # check that each task has what it needs and - # register what they return/modify: - for task in queued_tasks: - for new_field in _check_task_cls_param_requirements( - task, task_info_keys): - if new_field not in modified_fields_by_queued_tasks: - modified_fields_by_queued_tasks[new_field] = [ - task] - else: - modified_fields_by_queued_tasks[new_field].append( - task) - - # check if any queued tasks would manipulate the same fields: - conflicting_fields = { - new_field: [t.task_type for t in tasks] - for new_field, tasks in ( - modified_fields_by_queued_tasks.items()) - if len(tasks) > 1} - if conflicting_fields: - raise exception.TaskFieldsConflict( - "There are fields which will encounter a state " - "conflict following the parallelized execution of " - "tasks for execution '%s' (type '%s') for instance " - "'%s'. Conflicting fields and tasks will be: : %s" % ( - execution.id, execution.type, instance, - conflicting_fields)) - - # register queued tasks as processed before continuing: - for task in queued_tasks: - processed_tasks[task.id] = task - tasks_to_process.pop(task.id) - # update current state fields at this point: - task_info_keys = task_info_keys.union(set( - modified_fields_by_queued_tasks.keys())) - LOG.debug( - "Successfully processed following tasks for instance '%s' " - "for execution %s (type '%s') for any state conflict " - "checks: %s", instance, execution.id, execution.type, - [(t.id, t.task_type) for t in queued_tasks]) + all_tasks_by_id = {t.id: t for t in execution.tasks} + task_info_by_instance = { + inst: set((initial_task_info.get(inst) or {}).keys()) + for inst in all_instances_in_tasks} + + tasks_to_process = {t.id: t for t in execution.tasks} + if not tasks_to_process: + return + processed_tasks = {} + while tasks_to_process: + queued_tasks = [] + for task in tasks_to_process.values(): + if task.status not in ( + constants.TASK_STATUS_SCHEDULED, + constants.TASK_STATUS_ON_ERROR_ONLY): + raise exception.InvalidTaskState( + "Invalid initial state '%s' for task '%s' " + "of type '%s'." % ( + task.status, task.id, task.task_type)) + if not task.depends_on: + queued_tasks.append(task) + else: + dep_ids = list(task.depends_on) + missing = [ + dep_id for dep_id in dep_ids + if dep_id not in all_tasks_by_id] + if missing: + raise exception.TaskDependencyException( + "Task '%s' (type '%s') for instance '%s' " + "has non-existent tasks referenced as " + "dependencies: %s" % ( + task.id, task.task_type, task.instance, + missing)) + if all( + dep_id in processed_tasks + for dep_id in dep_ids): + queued_tasks.append(task) + + if not queued_tasks: + raise exception.ExecutionDeadlockException( + "Execution '%s' (type '%s') is bound to be deadlocked: " + "cannot schedule a next wave. Remaining: %s. " + "Processed: %s" % ( + execution.id, execution.type, + {t.id: t.depends_on + for t in tasks_to_process.values()}, + list(processed_tasks))) + + new_fields_by_task = {} + for task in queued_tasks: + new_fields_by_task[task] = _check_task_cls_param_requirements( + task, task_info_by_instance[task.instance]) + modified_by_inst_field = {} + for task, new_fields in new_fields_by_task.items(): + for new_field in new_fields: + key = (task.instance, new_field) + modified_by_inst_field.setdefault(key, []).append(task) + conflicts = { + (inst, field): [t.task_type for t in tlist] + for (inst, field), tlist in modified_by_inst_field.items() + if len(tlist) > 1} + if conflicts: + raise exception.TaskFieldsConflict( + "There are fields which will encounter a state " + "conflict for execution '%s' (type '%s') (instance+field) " + "and tasks: %s" % ( + execution.id, execution.type, conflicts)) + + for _task, new_fields in new_fields_by_task.items(): + for new_field in new_fields: + task_info_by_instance[_task.instance].add(new_field) + for task in queued_tasks: + processed_tasks[task.id] = task + tasks_to_process.pop(task.id, None) LOG.debug( - "Successfully checked all tasks for instance '%s' as part of " - "execution '%s' (type '%s') for any state conflicts: %s", - instance, execution.id, execution.type, - [(t.id, t.task_type) for t in instance_tasks]) + "Sanity check wave for execution '%s': %s", + execution.id, + [(t.id, t.task_type, t.instance) for t in queued_tasks]) + LOG.debug( "Successfully checked all tasks for execution '%s' (type '%s') " "for ordering or state conflicts.", @@ -943,6 +961,11 @@ def execute_transfer_tasks(self, ctxt, transfer_id, shutdown_instances, dest_env['network_map'] = transfer.network_map dest_env['storage_mappings'] = transfer.storage_mappings + # Serialized DEPLOY for clustered transfers so the first instance + # (owner) materializes libvirt volumes before other instances run + # deploy_replica_disks; _setup can then copy pool/volume to waiters. + prev_clustered_deploy_task_id = None + for instance in execution.action.instances: # NOTE: we default/convert the volumes info to an empty list # to preserve backwards-compatibility with older versions @@ -1025,9 +1048,16 @@ def execute_transfer_tasks(self, ctxt, transfer_id, shutdown_instances, disk_deployment_depends_on.append( validate_transfer_destination_inputs_task.id) + disk_deploy_deps = list(disk_deployment_depends_on) + if ( + getattr(transfer, "clustered", False) + and prev_clustered_deploy_task_id is not None): + disk_deploy_deps.append(prev_clustered_deploy_task_id) deploy_transfer_disks_task = self._create_task( instance, constants.TASK_TYPE_DEPLOY_TRANSFER_DISKS, - execution, depends_on=disk_deployment_depends_on) + execution, depends_on=disk_deploy_deps) + if getattr(transfer, "clustered", False): + prev_clustered_deploy_task_id = deploy_transfer_disks_task.id shutdown_deps = [] deploy_transfer_source_resources_task = None @@ -1112,6 +1142,8 @@ def execute_transfer_tasks(self, ctxt, transfer_id, shutdown_instances, replicate_disks_task.id], on_error=True) + self._apply_clustered_replicate_peer_deploy_dependencies( + transfer, execution) self._check_execution_tasks_sanity(execution, transfer.info) # update the action info for all of the Transfers: @@ -1325,6 +1357,7 @@ def create_instances_transfer(self, ctxt, transfer_scenario, network_map, storage_mappings, notes=None, user_scripts=None, clone_disks=True, skip_os_morphing=False): + clustered = len(instances) > 1 supported_scenarios = [ constants.TRANSFER_SCENARIO_REPLICA, constants.TRANSFER_SCENARIO_LIVE_MIGRATION] @@ -1361,6 +1394,7 @@ def create_instances_transfer(self, ctxt, transfer_scenario, transfer.user_scripts = user_scripts or {} transfer.clone_disks = clone_disks transfer.skip_os_morphing = skip_os_morphing + transfer.clustered = clustered self._check_minion_pools_for_action(ctxt, transfer) @@ -1775,6 +1809,7 @@ def deploy_transfer_instances( deployment.user_scripts = user_scripts deployment.clone_disks = clone_disks deployment.skip_os_morphing = skip_os_morphing + deployment.clustered = bool(getattr(transfer, 'clustered', False)) deployment.deployer_id = wait_for_execution deployment.trust_id = trust_id deployment.last_execution_status = init_status @@ -2126,13 +2161,15 @@ def _cancel_tasks_execution( exception_details=( "This task was unscheduled during the cancellation " "of the parent tasks execution.")) + # NOTE: SYNCING covers tasks waiting on a clustered + # barrier; there is no running worker to cancel, so we + # unschedule like PENDING/STARTING. (Peers stuck in + # SYNCING when another instance fails are handled in + # _abort_peer_sync_barrier_tasks_on_error, not here.) elif task.status in ( constants.TASK_STATUS_PENDING, - constants.TASK_STATUS_STARTING): - # any PENDING/STARTING tasks means that they did not have a - # host assigned to them yet, and presuming the host does not - # start executing the task until it marks itself as the runner, - # we can just mark the task as unscheduled: + constants.TASK_STATUS_STARTING, + constants.TASK_STATUS_SYNCING): LOG.debug( "Setting currently '%s' task '%s' to '%s' as part of the " "cancellation of execution '%s'", @@ -2222,6 +2259,30 @@ def _cancel_tasks_execution( "No new tasks were started for execution '%s' following " "state advancement after cancellation.", execution.id) + def _abort_peer_sync_barrier_tasks_on_error( + self, ctxt, execution, errored_task, error_message): + """Mark peer tasks stuck in SYNCING on the same barrier as failed.""" + if errored_task.task_type not in constants.TASK_TYPES_TO_SYNC: + return + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + if not bool(getattr(action, "clustered", False)): + return + execution = db_api.get_tasks_execution(ctxt, execution.id) + for peer in execution.tasks: + if peer.id == errored_task.id: + continue + if peer.status != constants.TASK_STATUS_SYNCING: + continue + if peer.task_type != errored_task.task_type: + continue + db_api.set_task_status( + ctxt, peer.id, constants.TASK_STATUS_ERROR, + exception_details=( + "Aborted: peer task '%s' failed during clustered " + "execution. Original error: %s" % ( + errored_task.id, error_message))) + def _update_reservation_fulfillment_for_execution(self, ctxt, execution): """ Updates the reservation fulfillment status for the parent transfer action of the given execution based on its type. @@ -2585,6 +2646,8 @@ def _start_task(task): task_info = {} else: task_info = action.info[task.instance] + # NOTE: same sync as in update_transfer_action_info_for_instance + utils.sync_instance_volumes_with_export(task_info) db_api.set_task_status( ctxt, task.id, constants.TASK_STATUS_PENDING) try: @@ -2819,6 +2882,323 @@ def _update_volumes_info_for_deployment_parent_transfer( self._update_transfer_volumes_info( ctxt, transfer_id, instance, updated_task_info) + def _clustered_shared_disk_export_maps(self, action): + """Build per-instance export disk maps and shared cluster identities. + + Returns ``(instance_disk_maps, shared_identities)`` where each + ``instance_disk_maps[instance_id]`` maps ``cluster_disk_identity`` + to that disk's export ``id`` from ``export_info``. + """ + identity_counts = {} + explicitly_shareable = {} + instance_disk_maps = {} + for instance_id in action.instances: + export_info = action.info.get(instance_id, {}).get( + "export_info", {}) + disks = export_info.get("devices", {}).get("disks", []) + disk_map = {} + for disk in disks: + ident = utils.cluster_disk_identity(disk or {}) + if not ident: + continue + disk_map[ident] = disk.get("id") + identity_counts[ident] = identity_counts.get(ident, 0) + 1 + if disk.get("shareable"): + explicitly_shareable[ident] = True + instance_disk_maps[instance_id] = disk_map + + shared_identities = { + ident for ident, count in identity_counts.items() + if count > 1 or explicitly_shareable.get(ident, False)} + return instance_disk_maps, shared_identities + + def _clustered_shared_disk_owners( + self, instance_disk_maps, shared_identities, instance_order): + """Pick one owner instance per shared disk identity. + + The owner is the first entry in ``instance_order`` (transfer instance + list order) that has that disk in ``instance_disk_maps``. + """ + owners = {} + for ident in shared_identities: + for inst in instance_order: + if ident in instance_disk_maps.get(inst, {}): + owners[ident] = inst + break + return owners + + def _promote_clustered_shared_disk_shareable_in_export_info( + self, ctxt, execution): + """Promote shareable on export disks when clustered and shared.""" + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + if not getattr(action, "clustered", False): + return + if not action.instances: + return + + _, shared_idents = self._clustered_shared_disk_export_maps(action) + if not shared_idents: + return + + for instance_id in action.instances: + export_info = action.info.get(instance_id, {}).get("export_info") + if not export_info: + continue + disks = export_info.get("devices", {}).get("disks") + if not disks: + continue + updated = False + for disk in disks: + ident = utils.cluster_disk_identity(disk or {}) + if ident in shared_idents and not disk.get("shareable"): + disk["shareable"] = True + updated = True + if updated: + LOG.info( + "Promoted shareable=True for clustered shared disk(s) in " + "export_info for instance '%s' (action '%s').", + instance_id, execution.action_id) + db_api.update_transfer_action_info_for_instance( + ctxt, execution.action_id, instance_id, + {"export_info": export_info}) + + def _setup_shared_disk_volumes_info(self, ctxt, execution): + """Configure volumes_info for shared disks before parallel replicate. + + After the DEPLOY_TRANSFER_DISKS sync barrier, non-owner instances get + ``replicate_disk_data=False`` and inherit the owner's volume entry; + ``shareable`` is set where needed. + """ + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + + instance_disk_maps, shared_identities = ( + self._clustered_shared_disk_export_maps(action)) + if not shared_identities: + return + + owners = self._clustered_shared_disk_owners( + instance_disk_maps, shared_identities, action.instances) + + owner_volumes_cache = {} + for instance_id in action.instances: + disk_map = instance_disk_maps.get(instance_id, {}) + shared_idents_for_inst = [ + i for i in disk_map if i in shared_identities] + if not shared_idents_for_inst: + continue + + is_owner_for_all = all( + owners.get(i) == instance_id for i in shared_idents_for_inst) + if is_owner_for_all: + continue + + volumes_info = list( + action.info.get(instance_id, {}).get("volumes_info", []) or []) + + updated = False + for ident in shared_idents_for_inst: + owner_id = owners.get(ident) + if owner_id == instance_id or not owner_id: + continue + + waiter_disk_id = disk_map[ident] + owner_disk_id = instance_disk_maps.get( + owner_id, {}).get(ident) + if not owner_disk_id: + continue + + if owner_id not in owner_volumes_cache: + owner_volumes_cache[owner_id] = { + utils.cluster_disk_identity(v): v + for v in action.info.get(owner_id, {}).get( + "volumes_info", []) + if utils.cluster_disk_identity(v)} + owner_volume = owner_volumes_cache[owner_id].get(ident) + if not owner_volume: + continue + + inherited = copy.deepcopy(owner_volume) + inherited["disk_id"] = waiter_disk_id + inherited[ + constants.VOLUME_INFO_REPLICATE_DISK_DATA] = False + inherited["shareable"] = True + inherited["volume_dev"] = "" + + target_vol = None + for vol in volumes_info: + if utils.cluster_disk_identity(vol) == ident: + target_vol = vol + break + if target_vol is None: + norm_wid = utils.normalized_volume_disk_path_key( + waiter_disk_id) + for vol in volumes_info: + if utils.normalized_volume_disk_path_key( + vol.get("disk_id")) == norm_wid: + target_vol = vol + break + + if target_vol is not None: + target_vol.update(inherited) + else: + volumes_info.append(inherited) + updated = True + + if updated: + seen_disk_ids = set() + deduped = [] + for vol in volumes_info: + did = utils.normalized_volume_disk_path_key( + vol.get("disk_id")) + if did is not None and did in seen_disk_ids: + LOG.warning( + "Removing duplicate volumes_info entry for " + "disk_id '%s' on instance '%s'.", + vol.get("disk_id"), instance_id) + continue + if did is not None: + seen_disk_ids.add(did) + deduped.append(vol) + volumes_info = deduped + + LOG.info( + "Pre-set shared disk volumes_info for instance '%s' " + "to skip replication of shared disks owned by other " + "instances.", instance_id) + db_api.update_transfer_action_info_for_instance( + ctxt, execution.action_id, instance_id, + {"volumes_info": volumes_info}) + + # Re-read from DB so that volumes_info saved by the waiter loop above + # (which works on a local copy) is visible to the promotion sections. + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + + for owner_id in set(owners.values()): + owner_volumes = action.info.get(owner_id, {}).get( + "volumes_info", []) + owner_updated = False + for vol in owner_volumes: + vol_key = utils.cluster_disk_identity(vol) + if not vol_key or owners.get(vol_key) != owner_id: + continue + if not vol.get("shareable"): + vol["shareable"] = True + owner_updated = True + if owner_updated: + db_api.update_transfer_action_info_for_instance( + ctxt, execution.action_id, owner_id, + {"volumes_info": owner_volumes}) + + for instance_id in action.instances: + disk_map = instance_disk_maps.get(instance_id, {}) + volumes_info = action.info.get(instance_id, {}).get( + "volumes_info", []) + if not volumes_info: + continue + promoted = False + for vol in volumes_info: + vol_key = utils.cluster_disk_identity(vol) + if not vol_key or vol_key not in shared_identities: + continue + if vol_key not in disk_map: + continue + if not vol.get("shareable"): + vol["shareable"] = True + promoted = True + if promoted: + LOG.info( + "Ensured shareable=True on shared-disk volumes for " + "instance '%s' before parallel minion attach.", + instance_id) + db_api.update_transfer_action_info_for_instance( + ctxt, execution.action_id, instance_id, + {"volumes_info": volumes_info}) + + def _handle_task_sync_barrier(self, ctxt, task, execution, action): + """Implements a cross-instance sync barrier for clustered actions. + + If the completed task's type is in TASK_TYPES_TO_SYNC and the + action is clustered, the task is set to SYNCING instead of + remaining COMPLETED. Once all instances' tasks of the same type + reach SYNCING, _handle_synced_tasks runs the type-specific sync + logic, then all tasks are set to COMPLETED and their execution + states are advanced. + + Returns True if the barrier was activated (caller should return + early), False otherwise. + """ + if task.task_type not in constants.TASK_TYPES_TO_SYNC: + return False + if not bool(getattr(action, "clustered", False)): + return False + if task.task_type == constants.TASK_TYPE_DEPLOY_TRANSFER_DISKS: + # Deploy tasks are ordered per instance (chained depends_on) so + # libvirt can reuse the owner volume. The barrier would + # set the first deploy to SYNCING; the next instance + # cannot start until the first is COMPLETED — deadlock. + # _setup_shared_disk_volumes_info already runs from + # task_completed after each deploy. + return False + + db_api.set_task_status( + ctxt, task.id, constants.TASK_STATUS_SYNCING) + + peer_tasks = [ + t for t in execution.tasks + if t.task_type == task.task_type and t.id != task.id] + all_syncing = all( + t.status == constants.TASK_STATUS_SYNCING for t in peer_tasks) + if not all_syncing: + LOG.info( + "Task '%s' (type '%s') for instance '%s' is now SYNCING. " + "Waiting for peer tasks of other instances to reach SYNCING.", + task.id, task.task_type, task.instance) + return True + + LOG.info( + "All tasks of type '%s' across all instances have reached " + "SYNCING for execution '%s'. Running sync handler.", + task.task_type, execution.id) + + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + self._handle_synced_tasks(ctxt, task.task_type, execution, action) + + synced_tasks = [task] + peer_tasks + for synced_task in synced_tasks: + db_api.set_task_status( + ctxt, synced_task.id, constants.TASK_STATUS_COMPLETED) + + for synced_task in synced_tasks: + self._handle_post_task_actions( + ctxt, synced_task, execution, + action.info.get(synced_task.instance, {})) + self._advance_execution_state( + ctxt, execution, instance=synced_task.instance, requery=True) + + return True + + def _handle_synced_tasks(self, ctxt, task_type, execution, action): + """Runs type-specific logic after all instances reach a sync barrier. + + Called once all tasks of a given type across all instances have + reached SYNCING. The handler can inspect and modify action info + (e.g. volumes_info) for all instances. + """ + if task_type == constants.TASK_TYPE_GET_INSTANCE_INFO: + self._promote_clustered_shared_disk_shareable_in_export_info( + ctxt, execution) + elif task_type == constants.TASK_TYPE_DEPLOY_TRANSFER_DISKS: + self._setup_shared_disk_volumes_info(ctxt, execution) + elif task_type == constants.TASK_TYPE_SHUTDOWN_INSTANCE: + LOG.info( + "All instances have been shut down for clustered execution " + "'%s'. No additional sync handling required for shutdown.", + execution.id) + def _handle_post_task_actions(self, ctxt, task, execution, task_info): task_type = task.task_type @@ -3135,11 +3515,35 @@ def task_completed(self, ctxt, task_id, task_result): # NOTE: refresh the execution just in case: execution = db_api.get_tasks_execution(ctxt, task.execution_id) + + # Copy owner's libvirt / replicate flags to waiters as soon as the + # owner finishes DEPLOY, before the next instance's deploy runs. + if ( + task_result + and task.task_type == ( + constants.TASK_TYPE_DEPLOY_TRANSFER_DISKS) + and getattr(action, "clustered", False)): + self._setup_shared_disk_volumes_info(ctxt, execution) + + if self._handle_task_sync_barrier( + ctxt, task, execution, action): + return + self._handle_post_task_actions( ctxt, task, execution, updated_task_info) - newly_started_tasks = self._advance_execution_state( - ctxt, execution, instance=task.instance, requery=False) + # Clustered transfers use cross-instance depends_on + # (chained deploy, peer deploy before replicate). + # Advancing only task.instance leaves peer instances' + # SCHEDULED work unconsidered when the peer is the + # one that must run next (e.g. second instance's deploy after the + # first's deploy completes). + if getattr(action, "clustered", False): + newly_started_tasks = self._advance_execution_state( + ctxt, execution, requery=False) + else: + newly_started_tasks = self._advance_execution_state( + ctxt, execution, instance=task.instance, requery=False) if newly_started_tasks: LOG.info( "The following tasks were started for execution '%s' " @@ -3365,8 +3769,12 @@ def set_task_error(self, ctxt, task_id, exception_details): "Some tasks are running in parallel with the " "OSMorphing task, a debug setup cannot be safely " "achieved. Proceeding with cleanup tasks as usual.") + self._abort_peer_sync_barrier_tasks_on_error( + ctxt, execution, task, exception_details) self._cancel_tasks_execution(ctxt, execution) else: + self._abort_peer_sync_barrier_tasks_on_error( + ctxt, execution, task, exception_details) self._cancel_tasks_execution(ctxt, execution) @task_synchronized diff --git a/coriolis/constants.py b/coriolis/constants.py index 270bf47d..2f97380f 100644 --- a/coriolis/constants.py +++ b/coriolis/constants.py @@ -50,11 +50,13 @@ TASK_STATUS_CANCELED_FROM_DEADLOCK = "STRANDED_AFTER_DEADLOCK" TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY" TASK_STATUS_FAILED_TO_SCHEDULE = "FAILED_TO_SCHEDULE" +TASK_STATUS_SYNCING = "SYNCING" ACTIVE_TASK_STATUSES = [ TASK_STATUS_PENDING, TASK_STATUS_STARTING, TASK_STATUS_RUNNING, + TASK_STATUS_SYNCING, TASK_STATUS_CANCELLING, TASK_STATUS_CANCELLING_AFTER_COMPLETION ] @@ -161,6 +163,11 @@ TASK_TYPE_POWER_ON_DESTINATION_MINION = "POWER_ON_DESTINATION_MINION" TASK_TYPE_POWER_OFF_DESTINATION_MINION = "POWER_OFF_DESTINATION_MINION" +TASK_TYPES_TO_SYNC = [ + TASK_TYPE_GET_INSTANCE_INFO, + TASK_TYPE_DEPLOY_TRANSFER_DISKS, + TASK_TYPE_SHUTDOWN_INSTANCE, +] MINION_POOL_OPERATIONS_TASKS = [ TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS, @@ -219,6 +226,7 @@ DISK_FORMAT_QCOW2 = 'qcow2' DISK_FORMAT_VHD = 'vhd' DISK_FORMAT_VHDX = 'vhdx' +VOLUME_INFO_REPLICATE_DISK_DATA = "replicate_disk_data" DISK_ALLOCATION_TYPE_STATIC = "static" DISK_ALLOCATION_TYPE_DYNAMIC = "dynamic" diff --git a/coriolis/db/api.py b/coriolis/db/api.py index 52f674bb..f49a1414 100644 --- a/coriolis/db/api.py +++ b/coriolis/db/api.py @@ -727,6 +727,7 @@ def update_transfer_action_info_for_instance( """ Updates the info for the given action with the provided dict. Returns the updated value. Sub-fields of the dict already in the info will get overwritten entirely! + After merging, volumes_info is updated so it stays aligned with export_info """ action = get_action(context, action_id, include_task_info=True) if not new_instance_info: @@ -759,6 +760,7 @@ def update_transfer_action_info_for_instance( instance_info_old_copy = instance_info_old.copy() instance_info_old_copy.update(new_instance_info) + utils.sync_instance_volumes_with_export(instance_info_old_copy) action_info[instance] = instance_info_old_copy action.info = action_info diff --git a/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py b/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py new file mode 100644 index 00000000..9e834e85 --- /dev/null +++ b/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py @@ -0,0 +1,20 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +import sqlalchemy + + +def upgrade(migrate_engine): + meta = sqlalchemy.MetaData() + meta.bind = migrate_engine + + base_transfer = sqlalchemy.Table( + 'base_transfer_action', meta, autoload=True) + if 'clustered' in base_transfer.c: + return + # server_default so existing rows get a value when the column is added + # (MySQL stores booleans as TINYINT). + clustered = sqlalchemy.Column( + 'clustered', sqlalchemy.Boolean, nullable=False, + server_default=sqlalchemy.text('0')) + base_transfer.create_column(clustered) diff --git a/coriolis/db/sqlalchemy/models.py b/coriolis/db/sqlalchemy/models.py index d4377999..c379689a 100644 --- a/coriolis/db/sqlalchemy/models.py +++ b/coriolis/db/sqlalchemy/models.py @@ -285,6 +285,10 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase, sqlalchemy.Boolean, nullable=False, default=True) skip_os_morphing = sqlalchemy.Column( sqlalchemy.Boolean, nullable=False, default=False) + # Multi-instance transfer: enables cross-instance sync barriers and + # shared-disk handling. Must be set on INSERT (MySQL NOT NULL). + clustered = sqlalchemy.Column( + sqlalchemy.Boolean, nullable=False, default=False) __mapper_args__ = { 'polymorphic_identity': 'base_transfer_action', @@ -320,6 +324,7 @@ def to_dict(self, include_task_info=True, include_executions=True): "user_scripts": self.user_scripts, "clone_disks": self.clone_disks, "skip_os_morphing": self.skip_os_morphing, + "clustered": bool(self.clustered), } if include_executions: for ex in self.executions: diff --git a/coriolis/schemas/disk_sync_resources_info_schema.json b/coriolis/schemas/disk_sync_resources_info_schema.json index 4a86c375..ffb66309 100644 --- a/coriolis/schemas/disk_sync_resources_info_schema.json +++ b/coriolis/schemas/disk_sync_resources_info_schema.json @@ -15,7 +15,7 @@ }, "volume_dev": { "type": "string", - "description": "String device path (ex: /dev/sdc) from within the temporary minion VM where the disk was attached." + "description": "Guest minion device path (e.g. /dev/disk/by-id/...) when the volume is attached; use \"\" for rows that do not represent a transferred block dev yet (e.g. shared-disk non-owners)." } }, "required": ["disk_id", "volume_dev"], diff --git a/coriolis/schemas/vm_export_info_schema.json b/coriolis/schemas/vm_export_info_schema.json index 05f3017e..b9506df1 100644 --- a/coriolis/schemas/vm_export_info_schema.json +++ b/coriolis/schemas/vm_export_info_schema.json @@ -118,9 +118,14 @@ "type": "string", "description": "The allocation scheme for the given disk (static = thick; dynamic = thin)", "enum": ["static", "dynamic"] + }, + "shareable": { + "type": "boolean", + "description": "Whether the disk is shared (multi-writer) and can be attached to multiple VMs simultaneously." } }, "required": [ + "id", "size_bytes" ] } diff --git a/coriolis/tasks/minion_pool_tasks.py b/coriolis/tasks/minion_pool_tasks.py index 75e24571..21de2f3f 100644 --- a/coriolis/tasks/minion_pool_tasks.py +++ b/coriolis/tasks/minion_pool_tasks.py @@ -477,6 +477,12 @@ class _BaseAttachVolumesToTransferMinionTask( def _get_volumes_info_from_task_info(cls, task_info): return task_info["volumes_info"] + def _run( + self, ctxt, instance, origin, destination, task_info, + event_handler): + return super(_BaseAttachVolumesToTransferMinionTask, self)._run( + ctxt, instance, origin, destination, task_info, event_handler) + @classmethod def get_required_task_info_properties(cls): fields = super( diff --git a/coriolis/tasks/replica_tasks.py b/coriolis/tasks/replica_tasks.py index 0407e8bc..9ef7dd2b 100644 --- a/coriolis/tasks/replica_tasks.py +++ b/coriolis/tasks/replica_tasks.py @@ -23,6 +23,38 @@ def _get_volumes_info(task_info): return volumes_info +def _preserve_replicate_disk_data_from_prior_volumes( + prior_volumes_info, new_volumes_info): + """Keep replicate_disk_data=False set by the conductor for cluster waiters. + + update_transfer_action_info_for_instance replaces the whole + volumes_info list when a task returns. Destination deploy does not + round-trip that flag, so shared-disk waiters would lose it and replicate + again unless we merge it back per disk_id. + """ + if not prior_volumes_info or not new_volumes_info: + return new_volumes_info + key = constants.VOLUME_INFO_REPLICATE_DISK_DATA + by_disk = {} + for v in prior_volumes_info: + if not isinstance(v, dict): + continue + did = v.get("disk_id") + if did is None: + continue + by_disk[str(did)] = v + for vol in new_volumes_info: + if not isinstance(vol, dict): + continue + did = vol.get("disk_id") + if did is None: + continue + prev = by_disk.get(str(did)) + if prev and prev.get(key) is False: + vol[key] = False + return new_volumes_info + + def _check_ensure_volumes_info_ordering(export_info, volumes_info): """ Returns a new list of volumes_info, ensuring that the order of the disks in 'volumes_info' is consistent with the order that the @@ -244,12 +276,27 @@ def _run(self, ctxt, instance, origin, destination, task_info, source_environment = task_info['source_environment'] source_resources = task_info.get('source_resources', {}) - volumes_info = provider.replicate_disks( - ctxt, connection_info, source_environment, instance, - source_resources, migr_source_conn_info, migr_target_conn_info, - volumes_info, incremental) - schemas.validate_value( - volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA) + volumes_to_replicate = [ + vol for vol in volumes_info + if vol.get(constants.VOLUME_INFO_REPLICATE_DISK_DATA, True)] + pre_replicated_volumes = [ + vol for vol in volumes_info + if not vol.get(constants.VOLUME_INFO_REPLICATE_DISK_DATA, True)] + + if volumes_to_replicate: + replicated_volumes = provider.replicate_disks( + ctxt, connection_info, source_environment, instance, + source_resources, migr_source_conn_info, migr_target_conn_info, + volumes_to_replicate, incremental) + schemas.validate_value( + replicated_volumes, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA) + else: + LOG.info( + "No disks marked for replication for instance '%s'. " + "Using pre-provisioned volumes_info.", instance) + replicated_volumes = [] + + volumes_info = pre_replicated_volumes + replicated_volumes volumes_info = _check_ensure_volumes_info_ordering( export_info, volumes_info) @@ -290,15 +337,17 @@ def _run(self, ctxt, instance, origin, destination, task_info, event_handler) connection_info = base.get_connection_info(ctxt, destination) - volumes_info = task_info.get("volumes_info", []) + prior_volumes_info = task_info.get("volumes_info", []) volumes_info = provider.deploy_replica_disks( ctxt, connection_info, target_environment, instance, export_info, - volumes_info) + prior_volumes_info) schemas.validate_value( volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA) volumes_info = _check_ensure_volumes_info_ordering( export_info, volumes_info) + volumes_info = _preserve_replicate_disk_data_from_prior_volumes( + prior_volumes_info, volumes_info) return { 'volumes_info': volumes_info} diff --git a/coriolis/tests/conductor/rpc/test_server.py b/coriolis/tests/conductor/rpc/test_server.py index 096cd33a..81de0efb 100644 --- a/coriolis/tests/conductor/rpc/test_server.py +++ b/coriolis/tests/conductor/rpc/test_server.py @@ -1307,6 +1307,7 @@ def call_execute_transfer_tasks(): if has_origin_minion_pool else None, destination_minion_pool_id=mock.sentinel.destination_minion_pool_id if has_target_minion_pool else None, + clustered=False, ) mock_get_transfer.return_value = mock_transfer diff --git a/coriolis/tests/db/sqlalchemy/test_models.py b/coriolis/tests/db/sqlalchemy/test_models.py index 7b0c7610..3bd4c761 100644 --- a/coriolis/tests/db/sqlalchemy/test_models.py +++ b/coriolis/tests/db/sqlalchemy/test_models.py @@ -283,6 +283,7 @@ def test_to_dict(self): transfer.info = mock.sentinel.info transfer.clone_disks = True transfer.skip_os_morphing = False + transfer.clustered = False expected_result = { "base_id": mock.sentinel.base_id, "user_id": mock.sentinel.user_id, @@ -314,6 +315,7 @@ def test_to_dict(self): "info": mock.sentinel.info, "clone_disks": True, "skip_os_morphing": False, + "clustered": False, } result = transfer.to_dict() diff --git a/coriolis/tests/tasks/test_replica_tasks.py b/coriolis/tests/tasks/test_replica_tasks.py index f31e04be..40fa9029 100644 --- a/coriolis/tests/tasks/test_replica_tasks.py +++ b/coriolis/tests/tasks/test_replica_tasks.py @@ -146,6 +146,8 @@ def test__run(self, mock_unmarshal, mock_check_vol_info, mock_get_vol_info, task_info.get.side_effect = [ task_info['incremental'], task_info['source_resources']] prov_fun = mock_get_provider.return_value.replicate_disks + mock_get_vol_info.return_value = [{"disk_id": "disk_id1"}] + prov_fun.return_value = [{"disk_id": "disk_id1"}] expected_result = {"volumes_info": mock_check_vol_info.return_value} expected_validation_calls = [ mock.call.mock_validate_value( diff --git a/coriolis/utils.py b/coriolis/utils.py index 0655e05a..22cd5934 100644 --- a/coriolis/utils.py +++ b/coriolis/utils.py @@ -197,6 +197,90 @@ def _exec_retry(*args, **kwargs): return _retry_on_error +def normalized_volume_disk_path_key(disk_id): + """Lowercase/stripped path/string key for comparing disk identifiers.""" + if disk_id is None: + return None + s = str(disk_id).strip().lower() + return s if s else None + + +def cluster_disk_identity(disk_id_or_obj): + """Return a stable key for matching the same disk across cluster nodes. + + The provider is expected to return a ``disk_id`` (or ``id`` on export + ``devices.disks`` entries) that is identical across all instances that + share the disk. For example, the VMware provider canonicalises the + datastore path of a multi-writer VMDK so every guest reports the same + string. Core just normalizes that value here for case-insensitive + comparison. + """ + if isinstance(disk_id_or_obj, dict): + disk_id = disk_id_or_obj.get("disk_id") + if disk_id is None: + disk_id = disk_id_or_obj.get("id") + return normalized_volume_disk_path_key(disk_id) + return normalized_volume_disk_path_key(disk_id_or_obj) + + +def ensure_volumes_info_volume_dev_default(volumes_info): + """Ensure each volume row has a ``volume_dev`` string for schema validation + + Use ``""`` when the minion has not attached the volume yet (e.g. non-owner + shared-disk waiters) or the provider did not set the field. + """ + if not volumes_info: + return + for vol in volumes_info: + if not isinstance(vol, dict): + continue + if "volume_dev" not in vol: + vol["volume_dev"] = "" + + +def sync_instance_volumes_with_export(instance_info): + """Fill in volume list fields that should follow export_info. + + Sets default ``volume_dev`` when missing, and copies the ``shareable`` + flag from ``export_info['devices']['disks']`` onto each + ``volumes_info`` entry where it applies. Safe to call more than once. + + Invoked from ``update_transfer_action_info_for_instance`` after merging + new instance info, and in the conductor right before it sends + ``task_info`` to a worker, so the DB and RPC payload stay aligned. + """ + if not isinstance(instance_info, dict): + return + vols = instance_info.get("volumes_info") + if vols is None: + return + ensure_volumes_info_volume_dev_default(vols) + export = instance_info.get("export_info") + if not export: + return + apply_export_disk_shareable_metadata_to_volumes_info(export, vols) + + +def apply_export_disk_shareable_metadata_to_volumes_info( + export_info, volumes_info): + """Propagate shareable from export_info disks to volumes_info entries.""" + if not export_info or not volumes_info: + return + disks = export_info.get("devices", {}).get("disks") or [] + share_idents = set() + for d in disks: + if d.get("shareable"): + cid = cluster_disk_identity(d) + if cid: + share_idents.add(cid) + if not share_idents: + return + for vol in volumes_info: + cid = cluster_disk_identity(vol) + if cid and cid in share_idents: + vol["shareable"] = True + + def get_udev_net_rules(net_ifaces_info): content = "" for name, mac_address in net_ifaces_info.items():