Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 84 additions & 7 deletions src/azure-cli/azure/cli/command_modules/vm/_vm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import os
import re
import importlib
from enum import Enum

from urllib.parse import urlparse

from azure.cli.core.commands.arm import ArmTemplateBuilder
from azure.cli.core.commands.client_factory import get_mgmt_service_client
from azure.cli.core.profiles import ResourceType, get_sdk

from knack.log import get_logger
from knack.util import CLIError
Expand All @@ -32,7 +35,7 @@ def get_target_network_api(cli_ctx):
if cli_ctx.cloud.profile == 'latest':
version = '2022-01-01'
else:
from azure.cli.core.profiles import get_api_version, ResourceType
from azure.cli.core.profiles import get_api_version
version = get_api_version(cli_ctx, ResourceType.MGMT_NETWORK)
return version

Expand All @@ -46,8 +49,6 @@ def read_content_if_is_file(string_or_file):


def _resolve_api_version(cli_ctx, provider_namespace, resource_type, parent_path):
from azure.cli.core.commands.client_factory import get_mgmt_service_client
from azure.cli.core.profiles import ResourceType
client = get_mgmt_service_client(cli_ctx, ResourceType.MGMT_RESOURCE_RESOURCES)
provider = client.providers.get(provider_namespace)

Expand Down Expand Up @@ -75,10 +76,8 @@ def log_pprint_template(template):
def check_existence(cli_ctx, value, resource_group, provider_namespace, resource_type,
parent_name=None, parent_type=None, static_version=None):
# check for name or ID and set the type flags
from azure.cli.core.commands.client_factory import get_mgmt_service_client
from azure.core.exceptions import HttpResponseError
from azure.mgmt.core.tools import parse_resource_id
from azure.cli.core.profiles import ResourceType
id_parts = parse_resource_id(value)
resource_client = get_mgmt_service_client(cli_ctx, ResourceType.MGMT_RESOURCE_RESOURCES,
subscription_id=id_parts.get('subscription', None)).resources
Expand Down Expand Up @@ -414,8 +413,6 @@ def _update(model, lun, value):


def get_storage_blob_uri(cli_ctx, storage):
from azure.cli.core.profiles._shared import ResourceType
from azure.cli.core.commands.client_factory import get_mgmt_service_client
if urlparse(storage).scheme:
storage_uri = storage
else:
Expand Down Expand Up @@ -757,3 +754,83 @@ def _open(filename, mode):
f.write(public_bytes)

return public_bytes.decode()


def _gen_guid():
import uuid
return uuid.uuid4()


def assign_identity(cli_ctx, getter, setter, identity_role=None, identity_scope=None):
import time
from azure.core.exceptions import HttpResponseError

# get
resource = getter()
resource = setter(resource)

# create role assignment:
if identity_scope:
principal_id = resource.get('identity', {}).get('principal_id')

identity_role_id = resolve_role_id(cli_ctx, identity_role, identity_scope)
assignments_client = get_mgmt_service_client(cli_ctx, ResourceType.MGMT_AUTHORIZATION).role_assignments
RoleAssignmentCreateParameters = get_sdk(cli_ctx, ResourceType.MGMT_AUTHORIZATION,
'RoleAssignmentCreateParameters', mod='models',
operation_group='role_assignments')
parameters = RoleAssignmentCreateParameters(role_definition_id=identity_role_id, principal_id=principal_id,
principal_type=None)

logger.info("Creating an assignment with a role '%s' on the scope of '%s'", identity_role_id, identity_scope)
retry_times = 36
assignment_name = _gen_guid()
for retry_time in range(0, retry_times):
try:
assignments_client.create(scope=identity_scope, role_assignment_name=assignment_name,
parameters=parameters)
break
except HttpResponseError as ex:
if ex.error.code == 'RoleAssignmentExists':
logger.info('Role assignment already exists')
break
if retry_time < retry_times and ' does not exist in the directory ' in ex.message:
time.sleep(5)
logger.warning('Retrying role assignment creation: %s/%s', retry_time + 1,
retry_times)
continue
raise
return resource


def resolve_role_id(cli_ctx, role, scope):
import uuid
client = get_mgmt_service_client(cli_ctx, ResourceType.MGMT_AUTHORIZATION).role_definitions

role_id = None
if re.match(r'/subscriptions/[^/]+/providers/Microsoft.Authorization/roleDefinitions/',
role, re.I):
role_id = role
else:
try:
uuid.UUID(role)
role_id = '/subscriptions/{}/providers/Microsoft.Authorization/roleDefinitions/{}'.format(
client.config.subscription_id, role)
except ValueError:
pass
if not role_id: # retrieve role id
role_defs = list(client.list(scope, "roleName eq '{}'".format(role)))
if not role_defs:
raise CLIError("Role '{}' doesn't exist.".format(role))
if len(role_defs) > 1:
ids = [r.id for r in role_defs]
err = "More than one role matches the given name '{}'. Please pick an id from '{}'"
raise CLIError(err.format(role, ids))
role_id = role_defs[0].id
return role_id


class IdentityType(Enum):
SYSTEM_ASSIGNED = 'SystemAssigned'
USER_ASSIGNED = 'UserAssigned'
SYSTEM_ASSIGNED_USER_ASSIGNED = 'SystemAssigned, UserAssigned'
NONE = 'None'
183 changes: 141 additions & 42 deletions src/azure-cli/azure/cli/command_modules/vm/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from knack.log import get_logger
from knack.util import CLIError
from azure.cli.core.azclierror import (
CLIInternalError,
ResourceNotFoundError,
ValidationError,
RequiredArgumentMissingError,
Expand Down Expand Up @@ -826,8 +825,14 @@ def create_snapshot(cmd, resource_group_name, snapshot_name, location=None, size

# region VirtualMachines Identity
def show_vm_identity(cmd, resource_group_name, vm_name):
client = _compute_client_factory(cmd.cli_ctx)
return client.virtual_machines.get(resource_group_name, vm_name).identity
vm = get_vm_by_aaz(cmd, resource_group_name, vm_name)

identity = vm.get("identity", {}) if vm else None

if identity and not identity.get('userAssignedIdentities'):
identity['userAssignedIdentities'] = None

return identity or None


def show_vmss_identity(cmd, resource_group_name, vm_name):
Expand All @@ -837,49 +842,63 @@ def show_vmss_identity(cmd, resource_group_name, vm_name):

def assign_vm_identity(cmd, resource_group_name, vm_name, assign_identity=None, identity_role=None,
identity_role_id=None, identity_scope=None):
VirtualMachineIdentity, ResourceIdentityType, VirtualMachineUpdate = cmd.get_models('VirtualMachineIdentity',
'ResourceIdentityType',
'VirtualMachineUpdate')
UserAssignedIdentitiesValue = cmd.get_models('UserAssignedIdentitiesValue')
from azure.cli.core.commands.arm import assign_identity as assign_identity_helper
client = _compute_client_factory(cmd.cli_ctx)
_, _, external_identities, enable_local_identity = _build_identities_info(assign_identity)
identity, _, external_identities, enable_local_identity = _build_identities_info(assign_identity)

system_assigned = "SystemAssigned"
user_assigned = "UserAssigned"
system_assigned_user_assigned = "SystemAssigned, UserAssigned"
command_args = {'resource_group': resource_group_name, 'vm_name': vm_name}

def getter():
return client.virtual_machines.get(resource_group_name, vm_name)
return get_vm_by_aaz(cmd, resource_group_name, vm_name)

def setter(vm, external_identities=external_identities):
if vm.identity and vm.identity.type == ResourceIdentityType.system_assigned_user_assigned:
identity_types = ResourceIdentityType.system_assigned_user_assigned
elif vm.identity and vm.identity.type == ResourceIdentityType.system_assigned and external_identities:
identity_types = ResourceIdentityType.system_assigned_user_assigned
elif vm.identity and vm.identity.type == ResourceIdentityType.user_assigned and enable_local_identity:
identity_types = ResourceIdentityType.system_assigned_user_assigned
if vm.get('identity') and vm.get('identity').get('type') == system_assigned_user_assigned:
identity_types = system_assigned_user_assigned
elif vm.get('identity') and vm.get('identity').get('type') == system_assigned and external_identities:
identity_types = system_assigned_user_assigned
elif vm.get('identity') and vm.get('identity').get('type') == user_assigned and enable_local_identity:
identity_types = system_assigned_user_assigned
elif external_identities and enable_local_identity:
identity_types = ResourceIdentityType.system_assigned_user_assigned
identity_types = system_assigned_user_assigned
elif external_identities:
identity_types = ResourceIdentityType.user_assigned
identity_types = user_assigned
else:
identity_types = ResourceIdentityType.system_assigned
identity_types = system_assigned

vm.identity = VirtualMachineIdentity(type=identity_types)
if external_identities:
vm.identity.user_assigned_identities = {}
if not cmd.supported_api_version(min_api='2018-06-01', resource_type=ResourceType.MGMT_COMPUTE):
raise CLIInternalError("Usage error: user assigned identity is not available under current profile.",
"You can set the cloud's profile to latest with 'az cloud set --profile latest"
" --name <cloud name>'")
for identity in external_identities:
vm.identity.user_assigned_identities[identity] = UserAssignedIdentitiesValue()

vm_patch = VirtualMachineUpdate()
vm_patch.identity = vm.identity
return patch_vm(cmd, resource_group_name, vm_name, vm_patch)
if identity_types == system_assigned_user_assigned:
command_args['mi_system_assigned'] = "True"
command_args['mi_user_assigned'] = []
elif identity_types == user_assigned:
command_args['mi_user_assigned'] = []
else:
command_args['mi_system_assigned'] = "True"
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mi_user_assigned list is created and then potentially modified by appending to it at lines 879 and 882. However, when identity_types == system_assigned (the else branch at line 876), mi_user_assigned is never initialized in command_args. This means attempting to append to it at line 879 will fail with a KeyError. The else branch should also initialize command_args['mi_user_assigned'] = [].

Suggested change
command_args['mi_system_assigned'] = "True"
command_args['mi_system_assigned'] = "True"
command_args['mi_user_assigned'] = []

Copilot uses AI. Check for mistakes.
command_args['mi_user_assigned'] = []

if vm.get('identity') and vm.get('identity').get('userAssignedIdentities'):
for key in vm.get('identity').get('userAssignedIdentities').keys():
command_args['mi_user_assigned'].append(key)

if identity.get('userAssignedIdentities'):
for key in identity.get('userAssignedIdentities').keys():
if key not in command_args['mi_user_assigned']:
command_args['mi_user_assigned'].append(key)

from .aaz.latest.vm._patch import Patch
update_vm_identity = Patch(cli_ctx=cmd.cli_ctx)(command_args=command_args)
LongRunningOperation(cmd.cli_ctx)(update_vm_identity)
result = update_vm_identity.result()
return result

from ._vm_utils import assign_identity as assign_identity_helper
assign_identity_helper(cmd.cli_ctx, getter, setter, identity_role=identity_role_id, identity_scope=identity_scope)
vm = client.virtual_machines.get(resource_group_name, vm_name)
return _construct_identity_info(identity_scope, identity_role, vm.identity.principal_id,
vm.identity.user_assigned_identities)

vm = getter()
return _construct_identity_info(
identity_scope,
identity_role,
vm.get('identity').get('principalId') if vm.get('identity') else None,
vm.get('identity').get('userAssignedIdentities') if vm.get('identity') else None)
# endregion


Expand Down Expand Up @@ -1367,6 +1386,19 @@ def get_instance_view(cmd, resource_group_name, vm_name, include_user_data=False
return result


def get_vm_by_aaz(cmd, resource_group_name, vm_name, expand=None):
from .aaz.latest.vm._show import Show
command_args = {
'resource_group': resource_group_name,
'vm_name': vm_name,
}

if expand:
command_args['expand'] = expand

return Show(cli_ctx=cmd.cli_ctx)(command_args=command_args)


def get_vm(cmd, resource_group_name, vm_name, expand=None):
client = _compute_client_factory(cmd.cli_ctx)
return client.virtual_machines.get(resource_group_name, vm_name, expand=expand)
Expand Down Expand Up @@ -2493,7 +2525,7 @@ def _remove_identities(cmd, resource_group_name, name, identities, getter, sette
return None
emsis_to_remove = []
if identities:
existing_emsis = {x.lower() for x in list((resource.identity.user_assigned_identities or {}).keys())}
existing_emsis = {x.lower() for x in (resource.identity.user_assigned_identities or {}).keys()}
emsis_to_remove = {x.lower() for x in identities}
non_existing = emsis_to_remove.difference(existing_emsis)
if non_existing:
Expand All @@ -2520,18 +2552,85 @@ def _remove_identities(cmd, resource_group_name, name, identities, getter, sette
return result.identity


def _remove_identities_by_aaz(cmd, resource_group_name, name, identities, getter, setter):
from ._vm_utils import MSI_LOCAL_ID, IdentityType

remove_system_assigned_identity = False

if MSI_LOCAL_ID in identities:
remove_system_assigned_identity = True
identities.remove(MSI_LOCAL_ID)

resource = getter(cmd, resource_group_name, name)
existing_identity = resource.get('identity')

if existing_identity is None:
return None

existing_emsis = [x.lower() for x in (existing_identity.get('userAssignedIdentities') or {}).keys()]

if identities:
emsis_to_remove = [x.lower() for x in identities]

non_existing = [emsis for emsis in emsis_to_remove if emsis not in existing_emsis]
if non_existing:
raise CLIError("'{}' are not associated with '{}'".format(','.join(non_existing), name))

emsis_to_retain = [emsis for emsis in existing_emsis if emsis not in emsis_to_remove]

if not emsis_to_retain: # if all emsis are gone, we need to update the type
if existing_identity['type'] == IdentityType.USER_ASSIGNED.value:
existing_identity['type'] = IdentityType.NONE.value
elif existing_identity['type'] == IdentityType.SYSTEM_ASSIGNED_USER_ASSIGNED.value:
existing_identity['type'] = IdentityType.SYSTEM_ASSIGNED.value

existing_identity['userAssignedIdentities'] = {}
for emsis in identities:
existing_identity['userAssignedIdentities'][emsis] = {}
else:
existing_identity['userAssignedIdentities'] = None

if remove_system_assigned_identity:
if existing_identity['type'] == IdentityType.SYSTEM_ASSIGNED_USER_ASSIGNED.value \
or existing_identity['type'] == IdentityType.USER_ASSIGNED.value:
existing_identity['type'] = IdentityType.USER_ASSIGNED.value
else:
existing_identity['type'] = IdentityType.NONE.value

result = LongRunningOperation(cmd.cli_ctx)(setter(resource_group_name, name, resource))
return result.get('identity') or None


def remove_vm_identity(cmd, resource_group_name, vm_name, identities=None):
def setter(resource_group_name, vm_name, vm):
client = _compute_client_factory(cmd.cli_ctx)
VirtualMachineUpdate = cmd.get_models('VirtualMachineUpdate', operation_group='virtual_machines')
vm_update = VirtualMachineUpdate(identity=vm.identity)
return client.virtual_machines.begin_update(resource_group_name, vm_name, vm_update)
command_args = {
'resource_group': resource_group_name,
'vm_name': vm_name
}

from ._vm_utils import IdentityType
if vm.get('identity') and vm.get('identity').get('type') == IdentityType.USER_ASSIGNED.value:
command_args['mi_user_assigned'] = \
([key for key in list(vm.get('identity', {}).get('userAssignedIdentities', {}).keys())] +
['UserAssigned'])
elif vm.get('identity') and vm.get('identity').get('type') == IdentityType.SYSTEM_ASSIGNED.value:
command_args['mi_user_assigned'] = []
command_args['mi_system_assigned'] = 'True'
elif vm.get('identity') and vm.get('identity').get('type') == IdentityType.SYSTEM_ASSIGNED_USER_ASSIGNED.value:
command_args['mi_user_assigned'] = \
[key for key in list(vm.get('identity', {}).get('userAssignedIdentities', {}).keys())]
command_args['mi_system_assigned'] = 'True'
else:
command_args['mi_user_assigned'] = []

from .operations.vm import VMIdentityRemove
return VMIdentityRemove(cli_ctx=cmd.cli_ctx)(command_args=command_args)

if identities is None:
from ._vm_utils import MSI_LOCAL_ID
identities = [MSI_LOCAL_ID]

return _remove_identities(cmd, resource_group_name, vm_name, identities, get_vm, setter)
return _remove_identities_by_aaz(cmd, resource_group_name, vm_name, identities, get_vm_by_aaz, setter)


# region VirtualMachines Images
Expand Down
Loading
Loading