Move cell message queue switching and add caching

This moves the cell message queue switching code from the
RequestContext to the RPC layer where it's ultimately used.
Originally, it was thought that a separate DB query for
a CellMapping would occur per compute RPC API call in the
API cell and the context manager would be invoked to inject
the cell message queue transport to use for the RPC call.

Since compute RPC calls are based on the CellMapping of an
instance or a host, we could instead have generic functions
that take an instance or host and look up InstanceMapping or
HostMapping to get the CellMapping and return the
corresponding RPC client.

The RPC client objects are cached by CellMapping uuid and
expired clients are removed using a periodic task.

Co-Authored-By: Brian Elliott <bdelliott@gmail.com>

Depends-On: I6f211e9102f79418f9f94a15784f91c4150ab8a7

Change-Id: I96849888087f4b09433cb683a9eb4719d1c35c4c
This commit is contained in:
melanie witt 2016-06-08 04:46:13 +00:00
parent b88677c2d1
commit 4df0869e00
8 changed files with 488 additions and 238 deletions

View File

@ -333,7 +333,8 @@ class ComputeAPI(object):
version_cap = self.VERSION_ALIASES.get(upgrade_level,
upgrade_level)
serializer = objects_base.NovaObjectSerializer()
self.client = self.get_client(target, version_cap, serializer)
default_client = self.get_client(target, version_cap, serializer)
self.router = rpc.ClientRouter(default_client)
def _determine_version_cap(self, target):
global LAST_VERSION
@ -374,21 +375,12 @@ class ComputeAPI(object):
'service': service_version})
return version_cap
def _compat_ver(self, current, legacy):
if self.client.can_send_version(current):
return current
else:
return legacy
# Cells overrides this
def get_client(self, target, version_cap, serializer):
return rpc.get_client(target,
version_cap=version_cap,
serializer=serializer)
def get_cell_client(self, context):
return rpc.get_cell_client(context, self.client)
def add_aggregate_host(self, ctxt, host, aggregate, host_param,
slave_info=None):
'''Add aggregate host.
@ -400,7 +392,7 @@ class ComputeAPI(object):
:param host: This is the host to send the message to.
'''
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'add_aggregate_host',
aggregate=aggregate, host=host_param,
@ -408,7 +400,7 @@ class ComputeAPI(object):
def add_fixed_ip_to_instance(self, ctxt, instance, network_id):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'add_fixed_ip_to_instance',
instance=instance, network_id=network_id)
@ -416,7 +408,7 @@ class ComputeAPI(object):
def attach_interface(self, ctxt, instance, network_id, port_id,
requested_ip):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'attach_interface',
instance=instance, network_id=network_id,
@ -424,13 +416,13 @@ class ComputeAPI(object):
def attach_volume(self, ctxt, instance, bdm):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'attach_volume', instance=instance, bdm=bdm)
def change_instance_metadata(self, ctxt, instance, diff):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'change_instance_metadata',
instance=instance, diff=diff)
@ -438,8 +430,8 @@ class ComputeAPI(object):
def check_can_live_migrate_destination(self, ctxt, instance, destination,
block_migration, disk_over_commit):
version = '4.11'
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
client = self.router.by_host(ctxt, destination)
if not client.can_send_version(version):
# NOTE(eliqiao): This is a new feature that is only available
# once all compute nodes support at least version 4.11.
# This means the new REST API that supports this needs to handle
@ -450,7 +442,7 @@ class ComputeAPI(object):
else:
version = '4.0'
cctxt = cell_client.prepare(server=destination, version=version)
cctxt = client.prepare(server=destination, version=version)
result = cctxt.call(ctxt, 'check_can_live_migrate_destination',
instance=instance,
block_migration=block_migration,
@ -466,13 +458,13 @@ class ComputeAPI(object):
def check_can_live_migrate_source(self, ctxt, instance, dest_check_data):
dest_check_data_obj = dest_check_data
version = '4.8'
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
client = self.router.by_instance(ctxt, instance)
if not client.can_send_version(version):
version = '4.0'
if dest_check_data:
dest_check_data = dest_check_data.to_legacy_dict()
source = _compute_host(None, instance)
cctxt = cell_client.prepare(server=source, version=version)
cctxt = client.prepare(server=source, version=version)
result = cctxt.call(ctxt, 'check_can_live_migrate_source',
instance=instance,
dest_check_data=dest_check_data)
@ -486,7 +478,11 @@ class ComputeAPI(object):
def check_instance_shared_storage(self, ctxt, instance, data, host=None):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
if not host:
client = self.router.by_instance(ctxt, instance)
else:
client = self.router.by_host(ctxt, host)
cctxt = client.prepare(
server=_compute_host(host, instance), version=version)
return cctxt.call(ctxt, 'check_instance_shared_storage',
instance=instance,
@ -495,7 +491,11 @@ class ComputeAPI(object):
def confirm_resize(self, ctxt, instance, migration, host,
reservations=None, cast=True):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
if not host:
client = self.router.by_instance(ctxt, instance)
else:
client = self.router.by_host(ctxt, host)
cctxt = client.prepare(
server=_compute_host(host, instance), version=version)
rpc_method = cctxt.cast if cast else cctxt.call
return rpc_method(ctxt, 'confirm_resize',
@ -504,7 +504,7 @@ class ComputeAPI(object):
def detach_interface(self, ctxt, instance, port_id):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'detach_interface',
instance=instance, port_id=port_id)
@ -512,11 +512,11 @@ class ComputeAPI(object):
def detach_volume(self, ctxt, instance, volume_id, attachment_id=None):
extra = {'attachment_id': attachment_id}
version = '4.7'
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
client = self.router.by_instance(ctxt, instance)
if not client.can_send_version(version):
version = '4.0'
extra.pop('attachment_id')
cctxt = cell_client.prepare(server=_compute_host(None, instance),
cctxt = client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'detach_volume',
instance=instance, volume_id=volume_id, **extra)
@ -524,7 +524,7 @@ class ComputeAPI(object):
def finish_resize(self, ctxt, instance, migration, image, disk_info,
host, reservations=None):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'finish_resize',
instance=instance, migration=migration,
@ -533,7 +533,7 @@ class ComputeAPI(object):
def finish_revert_resize(self, ctxt, instance, migration, host,
reservations=None):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'finish_revert_resize',
instance=instance, migration=migration,
@ -541,78 +541,78 @@ class ComputeAPI(object):
def get_console_output(self, ctxt, instance, tail_length):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_console_output',
instance=instance, tail_length=tail_length)
def get_console_pool_info(self, ctxt, host, console_type):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'get_console_pool_info',
console_type=console_type)
def get_console_topic(self, ctxt, host):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'get_console_topic')
def get_diagnostics(self, ctxt, instance):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_diagnostics', instance=instance)
def get_instance_diagnostics(self, ctxt, instance):
version = '4.13'
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
client = self.router.by_instance(ctxt, instance)
if not client.can_send_version(version):
version = '4.0'
instance = objects_base.obj_to_primitive(instance)
cctxt = cell_client.prepare(server=_compute_host(None, instance),
cctxt = client.prepare(server=_compute_host(None, instance),
version=version)
return cctxt.call(ctxt, 'get_instance_diagnostics', instance=instance)
def get_vnc_console(self, ctxt, instance, console_type):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_vnc_console',
instance=instance, console_type=console_type)
def get_spice_console(self, ctxt, instance, console_type):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_spice_console',
instance=instance, console_type=console_type)
def get_rdp_console(self, ctxt, instance, console_type):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_rdp_console',
instance=instance, console_type=console_type)
def get_mks_console(self, ctxt, instance, console_type):
version = '4.3'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_mks_console',
instance=instance, console_type=console_type)
def get_serial_console(self, ctxt, instance, console_type):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_serial_console',
instance=instance, console_type=console_type)
def validate_console_port(self, ctxt, instance, port, console_type):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'validate_console_port',
instance=instance, port=port,
@ -628,20 +628,20 @@ class ComputeAPI(object):
:param host: This is the host to send the message to.
'''
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'host_maintenance_mode',
host=host_param, mode=mode)
def host_power_action(self, ctxt, host, action):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'host_power_action', action=action)
def inject_network_info(self, ctxt, instance):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'inject_network_info', instance=instance)
@ -649,16 +649,16 @@ class ComputeAPI(object):
migration, migrate_data=None):
args = {'migration': migration}
version = '4.8'
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
client = self.router.by_host(ctxt, host)
if not client.can_send_version(version):
version = '4.2'
if migrate_data:
migrate_data = migrate_data.to_legacy_dict(
pre_migration_result=True)
if not cell_client.can_send_version(version):
if not client.can_send_version(version):
version = '4.0'
args.pop('migration')
cctxt = cell_client.prepare(server=host, version=version)
cctxt = client.prepare(server=host, version=version)
cctxt.cast(ctxt, 'live_migration', instance=instance,
dest=dest, block_migration=block_migration,
migrate_data=migrate_data, **args)
@ -666,31 +666,36 @@ class ComputeAPI(object):
def live_migration_force_complete(self, ctxt, instance, migration):
version = '4.12'
kwargs = {}
if not self.client.can_send_version(version):
if not migration.source_compute:
client = self.router.by_instance(ctxt, instance)
else:
client = self.router.by_host(ctxt, migration.source_compute)
if not client.can_send_version(version):
version = '4.9'
kwargs['migration_id'] = migration.id
cctxt = self.get_cell_client(ctxt).prepare(server=_compute_host(
migration.source_compute, instance), version=version)
cctxt = client.prepare(
server=_compute_host(migration.source_compute, instance),
version=version)
cctxt.cast(ctxt, 'live_migration_force_complete', instance=instance,
**kwargs)
def live_migration_abort(self, ctxt, instance, migration_id):
version = '4.10'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'live_migration_abort', instance=instance,
migration_id=migration_id)
def pause_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'pause_instance', instance=instance)
def post_live_migration_at_destination(self, ctxt, instance,
block_migration, host):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'post_live_migration_at_destination',
instance=instance, block_migration=block_migration)
@ -699,12 +704,12 @@ class ComputeAPI(object):
host, migrate_data=None):
migrate_data_orig = migrate_data
version = '4.8'
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
client = self.router.by_host(ctxt, host)
if not client.can_send_version(version):
version = '4.0'
if migrate_data:
migrate_data = migrate_data.to_legacy_dict()
cctxt = cell_client.prepare(server=host, version=version)
cctxt = client.prepare(server=host, version=version)
result = cctxt.call(ctxt, 'pre_live_migration',
instance=instance,
block_migration=block_migration,
@ -732,18 +737,18 @@ class ComputeAPI(object):
'node': node,
'clean_shutdown': clean_shutdown}
version = '4.1'
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
client = self.router.by_host(ctxt, host)
if not client.can_send_version(version):
version = '4.0'
msg_args['instance_type'] = objects_base.obj_to_primitive(
instance_type)
cctxt = cell_client.prepare(server=host, version=version)
cctxt = client.prepare(server=host, version=version)
cctxt.cast(ctxt, 'prep_resize', **msg_args)
def reboot_instance(self, ctxt, instance, block_device_info,
reboot_type):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'reboot_instance',
instance=instance,
@ -762,13 +767,16 @@ class ComputeAPI(object):
'scheduled_node': node,
'limits': limits}
version = '4.5'
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
if not host:
client = self.router.by_instance(ctxt, instance)
else:
client = self.router.by_host(ctxt, host)
if not client.can_send_version(version):
version = '4.0'
extra.pop('migration')
extra.pop('scheduled_node')
extra.pop('limits')
cctxt = cell_client.prepare(server=_compute_host(host, instance),
cctxt = client.prepare(server=_compute_host(host, instance),
version=version)
cctxt.cast(ctxt, 'rebuild_instance',
instance=instance, new_pass=new_pass,
@ -789,7 +797,7 @@ class ComputeAPI(object):
:param host: This is the host to send the message to.
'''
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'remove_aggregate_host',
aggregate=aggregate, host=host_param,
@ -797,14 +805,14 @@ class ComputeAPI(object):
def remove_fixed_ip_from_instance(self, ctxt, instance, address):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'remove_fixed_ip_from_instance',
instance=instance, address=address)
def remove_volume_connection(self, ctxt, instance, volume_id, host):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'remove_volume_connection',
instance=instance, volume_id=volume_id)
@ -817,13 +825,13 @@ class ComputeAPI(object):
'rescue_image_ref': rescue_image_ref,
'instance': instance,
}
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'rescue_instance', **msg_args)
def reset_network(self, ctxt, instance):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'reset_network', instance=instance)
@ -835,25 +843,29 @@ class ComputeAPI(object):
'clean_shutdown': clean_shutdown,
}
version = '4.1'
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
client = self.router.by_instance(ctxt, instance)
if not client.can_send_version(version):
msg_args['instance_type'] = objects_base.obj_to_primitive(
instance_type)
version = '4.0'
cctxt = cell_client.prepare(server=_compute_host(None, instance),
cctxt = client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'resize_instance', **msg_args)
def resume_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'resume_instance', instance=instance)
def revert_resize(self, ctxt, instance, migration, host,
reservations=None):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
if not host:
client = self.router.by_instance(ctxt, instance)
else:
client = self.router.by_host(ctxt, host)
cctxt = client.prepare(
server=_compute_host(host, instance), version=version)
cctxt.cast(ctxt, 'revert_resize',
instance=instance, migration=migration,
@ -863,34 +875,34 @@ class ComputeAPI(object):
destroy_disks=True,
migrate_data=None):
version = '4.8'
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
client = self.router.by_host(ctxt, host)
if not client.can_send_version(version):
version = '4.0'
if migrate_data:
migrate_data = migrate_data.to_legacy_dict()
extra = {'destroy_disks': destroy_disks,
'migrate_data': migrate_data,
}
cctxt = cell_client.prepare(server=host, version=version)
cctxt = client.prepare(server=host, version=version)
cctxt.cast(ctxt, 'rollback_live_migration_at_destination',
instance=instance, **extra)
def set_admin_password(self, ctxt, instance, new_pass):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'set_admin_password',
instance=instance, new_pass=new_pass)
def set_host_enabled(self, ctxt, host, enabled):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'set_host_enabled', enabled=enabled)
def swap_volume(self, ctxt, instance, old_volume_id, new_volume_id):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'swap_volume',
instance=instance, old_volume_id=old_volume_id,
@ -898,7 +910,7 @@ class ComputeAPI(object):
def get_host_uptime(self, ctxt, host):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'get_host_uptime')
@ -909,14 +921,14 @@ class ComputeAPI(object):
'device_type': device_type}
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'reserve_block_device_name', **kw)
def backup_instance(self, ctxt, instance, image_id, backup_type,
rotation):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'backup_instance',
instance=instance,
@ -926,7 +938,7 @@ class ComputeAPI(object):
def snapshot_instance(self, ctxt, instance, image_id):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'snapshot_instance',
instance=instance,
@ -934,7 +946,7 @@ class ComputeAPI(object):
def start_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'start_instance', instance=instance)
@ -942,14 +954,14 @@ class ComputeAPI(object):
msg_args = {'instance': instance,
'clean_shutdown': clean_shutdown}
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
rpc_method = cctxt.cast if do_cast else cctxt.call
return rpc_method(ctxt, 'stop_instance', **msg_args)
def suspend_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'suspend_instance', instance=instance)
@ -959,7 +971,7 @@ class ComputeAPI(object):
# the method signature has to match with `terminate_instance()`
# method of cells rpcapi.
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'terminate_instance',
instance=instance, bdms=bdms,
@ -967,26 +979,26 @@ class ComputeAPI(object):
def unpause_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'unpause_instance', instance=instance)
def unrescue_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'unrescue_instance', instance=instance)
def soft_delete_instance(self, ctxt, instance, reservations=None):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'soft_delete_instance',
instance=instance, reservations=reservations)
def restore_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'restore_instance', instance=instance)
@ -995,7 +1007,7 @@ class ComputeAPI(object):
msg_args = {'instance': instance, 'image_id': image_id,
'clean_shutdown': clean_shutdown}
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'shelve_instance', **msg_args)
@ -1003,7 +1015,7 @@ class ComputeAPI(object):
clean_shutdown=True):
msg_args = {'instance': instance, 'clean_shutdown': clean_shutdown}
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'shelve_offload_instance', **msg_args)
@ -1016,14 +1028,14 @@ class ComputeAPI(object):
'filter_properties': filter_properties,
'node': node,
}
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'unshelve_instance', **msg_kwargs)
def volume_snapshot_create(self, ctxt, instance, volume_id,
create_info):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'volume_snapshot_create', instance=instance,
volume_id=volume_id, create_info=create_info)
@ -1031,15 +1043,16 @@ class ComputeAPI(object):
def volume_snapshot_delete(self, ctxt, instance, volume_id, snapshot_id,
delete_info):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'volume_snapshot_delete', instance=instance,
volume_id=volume_id, snapshot_id=snapshot_id,
delete_info=delete_info)
def external_instance_event(self, ctxt, instances, events):
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instances[0]),
instance = instances[0]
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance),
version='4.0')
cctxt.cast(ctxt, 'external_instance_event', instances=instances,
events=events)
@ -1050,7 +1063,7 @@ class ComputeAPI(object):
block_device_mapping=None, node=None, limits=None):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_host(ctxt, host).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'build_and_run_instance', instance=instance,
image=image, request_spec=request_spec,
@ -1064,35 +1077,35 @@ class ComputeAPI(object):
def quiesce_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'quiesce_instance', instance=instance)
def unquiesce_instance(self, ctxt, instance, mapping=None):
version = '4.0'
cctxt = self.get_cell_client(ctxt).prepare(
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'unquiesce_instance', instance=instance,
mapping=mapping)
def refresh_instance_security_rules(self, ctxt, instance, host):
version = '4.4'
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
client = self.router.by_instance(ctxt, instance)
if not client.can_send_version(version):
version = '4.0'
instance = objects_base.obj_to_primitive(instance)
cctxt = cell_client.prepare(server=_compute_host(None, instance),
cctxt = client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'refresh_instance_security_rules',
instance=instance)
def trigger_crash_dump(self, ctxt, instance):
version = '4.6'
cell_client = self.get_cell_client(ctxt)
client = self.router.by_instance(ctxt, instance)
if not cell_client.can_send_version(version):
if not client.can_send_version(version):
raise exception.TriggerCrashDumpNotSupported()
cctxt = cell_client.prepare(server=_compute_host(None, instance),
cctxt = client.prepare(server=_compute_host(None, instance),
version=version)
return cctxt.cast(ctxt, "trigger_crash_dump", instance=instance)

View File

@ -342,26 +342,18 @@ def authorize_quota_class_context(context, class_name):
@contextmanager
def target_cell(context, cell_mapping):
"""Adds database and message queue connection information to the context
"""Adds database connection information to the context
for communicating with the given target cell.
:param context: The RequestContext to add connection information
:param cell_mapping: A objects.CellMapping object
"""
original_db_connection = context.db_connection
original_mq_connection = context.mq_connection
# avoid circular imports
# avoid circular import
from nova import db
from nova import rpc
db_connection_string = cell_mapping.database_connection
context.db_connection = db.create_context_manager(db_connection_string)
# NOTE(melwitt): none:// url is a special value meaning do not switch
if not cell_mapping.transport_url.startswith('none'):
transport_url = cell_mapping.transport_url
context.mq_connection = rpc.create_transport(transport_url)
try:
yield context
finally:
context.db_connection = original_db_connection
context.mq_connection = original_mq_connection

View File

@ -31,11 +31,14 @@ import functools
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from oslo_service import periodic_task
from oslo_utils import timeutils
import nova.conf
import nova.context
import nova.exception
from nova.i18n import _
from nova import objects
CONF = nova.conf.CONF
@ -65,20 +68,6 @@ TRANSPORT_ALIASES = {
}
def get_cell_client(context, default_client):
"""Get a RPCClient object based on a RequestContext.
:param context: The RequestContext that can contain a Transport
:param default_client: The default RPCClient
"""
if context.mq_connection:
return messaging.RPCClient(
context.mq_connection, default_client.target,
version_cap=default_client.version_cap,
serializer=default_client.serializer)
return default_client
def init(conf):
global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
exmods = get_allowed_exmods()
@ -363,3 +352,85 @@ class LegacyValidatingNotifier(object):
LOG.warning(self.message, {'event_type': event_type})
getattr(self.notifier, priority)(ctxt, event_type, payload)
class ClientWrapper(object):
def __init__(self, client):
self._client = client
self.last_access_time = timeutils.utcnow()
@property
def client(self):
self.last_access_time = timeutils.utcnow()
return self._client
class ClientRouter(periodic_task.PeriodicTasks):
"""Creates and caches RPC clients that route to cells or the default.
The default client connects to the API cell message queue. The rest of the
clients connect to compute cell message queues.
"""
def __init__(self, default_client):
super(ClientRouter, self).__init__(CONF)
self.clients = {}
self.clients['default'] = ClientWrapper(default_client)
self.target = default_client.target
self.version_cap = default_client.version_cap
# NOTE(melwitt): Cells v1 does its own serialization and won't
# have a serializer available on the client object.
self.serializer = getattr(default_client, 'serializer', None)
self.run_periodic_tasks(nova.context.RequestContext())
def _client(self, context, cell_mapping=None):
if cell_mapping:
client_id = cell_mapping.uuid
else:
client_id = 'default'
try:
client = self.clients[client_id].client
except KeyError:
transport = create_transport(cell_mapping.transport_url)
client = messaging.RPCClient(transport, self.target,
version_cap=self.version_cap,
serializer=self.serializer)
self.clients[client_id] = ClientWrapper(client)
return client
@periodic_task.periodic_task
def _remove_stale_clients(self, context):
timeout = 60
def stale(client_id, last_access_time):
if timeutils.is_older_than(last_access_time, timeout):
LOG.debug('Removing stale RPC client: %s as it was last '
'accessed at %s', client_id, last_access_time)
return True
return False
# Never expire the default client
items_copy = list(self.clients.items())
for client_id, client_wrapper in items_copy:
if (client_id != 'default' and
stale(client_id, client_wrapper.last_access_time)):
del self.clients[client_id]
def by_instance(self, context, instance):
try:
cell_mapping = objects.InstanceMapping.get_by_instance_uuid(
context, instance.uuid).cell_mapping
except nova.exception.InstanceMappingNotFound:
# Not a cells v2 deployment
cell_mapping = None
return self._client(context, cell_mapping=cell_mapping)
def by_host(self, context, host):
try:
cell_mapping = objects.HostMapping.get_by_host(
context, host).cell_mapping
except nova.exception.HostMappingNotFound:
# Not a cells v2 deployment
cell_mapping = None
return self._client(context, cell_mapping=cell_mapping)

View File

@ -10841,8 +10841,9 @@ class ComputeAPIAggrCallsSchedulerTestCase(test.NoDBTestCase):
self.api.delete_aggregate(self.context, 1)
delete_aggregate.assert_called_once_with(self.context, agg)
@mock.patch('nova.compute.rpcapi.ComputeAPI.add_aggregate_host')
@mock.patch.object(scheduler_client.SchedulerClient, 'update_aggregates')
def test_add_host_to_aggregate(self, update_aggregates):
def test_add_host_to_aggregate(self, update_aggregates, mock_add_agg):
self.api.is_safe_to_update_az = mock.Mock()
self.api._update_az_cache_for_host = mock.Mock()
agg = objects.Aggregate(name='fake', metadata={})
@ -10853,9 +10854,14 @@ class ComputeAPIAggrCallsSchedulerTestCase(test.NoDBTestCase):
return_value=agg)):
self.api.add_host_to_aggregate(self.context, 1, 'fakehost')
update_aggregates.assert_called_once_with(self.context, [agg])
mock_add_agg.assert_called_once_with(self.context, aggregate=agg,
host_param='fakehost',
host='fakehost')
@mock.patch('nova.compute.rpcapi.ComputeAPI.remove_aggregate_host')
@mock.patch.object(scheduler_client.SchedulerClient, 'update_aggregates')
def test_remove_host_from_aggregate(self, update_aggregates):
def test_remove_host_from_aggregate(self, update_aggregates,
mock_remove_agg):
self.api._update_az_cache_for_host = mock.Mock()
agg = objects.Aggregate(name='fake', metadata={})
agg.delete_host = mock.Mock()
@ -10865,6 +10871,9 @@ class ComputeAPIAggrCallsSchedulerTestCase(test.NoDBTestCase):
return_value=agg)):
self.api.remove_host_from_aggregate(self.context, 1, 'fakehost')
update_aggregates.assert_called_once_with(self.context, [agg])
mock_remove_agg.assert_called_once_with(self.context, aggregate=agg,
host_param='fakehost',
host='fakehost')
class ComputeAggrTestCase(BaseTestCase):

View File

@ -4528,6 +4528,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
# revert_resize() and the return value is passed to driver.destroy().
# Otherwise we could regress this.
@mock.patch('nova.compute.rpcapi.ComputeAPI.finish_revert_resize')
@mock.patch.object(self.instance, 'revert_migration_context')
@mock.patch.object(self.compute.network_api, 'get_instance_nw_info')
@mock.patch.object(self.compute, '_is_instance_storage_shared')
@ -4552,7 +4553,8 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
finish_revert_resize,
_is_instance_storage_shared,
get_instance_nw_info,
revert_migration_context):
revert_migration_context,
mock_finish_revert):
self.migration.source_compute = self.instance['host']
@ -4572,6 +4574,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
# should not destroy disks otherwise it should destroy disks.
destroy.assert_called_once_with(self.context, self.instance,
mock.ANY, mock.ANY, not is_shared)
mock_finish_revert.assert_called_once_with(
self.context, self.instance, self.migration,
self.migration.source_compute, mock.ANY)
do_test()

View File

@ -52,6 +52,21 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
{'source_type': 'volume', 'destination_type': 'volume',
'instance_uuid': self.fake_instance_obj.uuid,
'volume_id': 'fake-volume-id'}))
# FIXME(melwitt): Temporary while things have no mappings
self.patcher1 = mock.patch('nova.objects.InstanceMapping.'
'get_by_instance_uuid')
self.patcher2 = mock.patch('nova.objects.HostMapping.get_by_host')
mock_inst_mapping = self.patcher1.start()
mock_host_mapping = self.patcher2.start()
mock_inst_mapping.side_effect = exception.InstanceMappingNotFound(
uuid=self.fake_instance_obj.uuid)
mock_host_mapping.side_effect = exception.HostMappingNotFound(
name=self.fake_instance_obj.host)
def tearDown(self):
super(ComputeRpcAPITestCase, self).tearDown()
self.patcher1.stop()
self.patcher2.stop()
@mock.patch('nova.objects.Service.get_minimum_version')
def test_auto_pin(self, mock_get_min):
@ -59,7 +74,7 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
self.flags(compute='auto', group='upgrade_levels')
compute_rpcapi.LAST_VERSION = None
rpcapi = compute_rpcapi.ComputeAPI()
self.assertEqual('4.4', rpcapi.client.version_cap)
self.assertEqual('4.4', rpcapi.router.version_cap)
mock_get_min.assert_called_once_with(mock.ANY, 'nova-compute')
@mock.patch('nova.objects.Service.get_minimum_version')
@ -76,7 +91,7 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
self.flags(compute='auto', group='upgrade_levels')
compute_rpcapi.LAST_VERSION = None
rpcapi = compute_rpcapi.ComputeAPI()
self.assertEqual('4.11', rpcapi.client.version_cap)
self.assertEqual('4.11', rpcapi.router.version_cap)
mock_get_min.assert_called_once_with(mock.ANY, 'nova-compute')
self.assertIsNone(compute_rpcapi.LAST_VERSION)
@ -95,11 +110,15 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
ctxt = context.RequestContext('fake_user', 'fake_project')
rpcapi = kwargs.pop('rpcapi_class', compute_rpcapi.ComputeAPI)()
self.assertIsNotNone(rpcapi.client)
self.assertEqual(rpcapi.client.target.topic, CONF.compute_topic)
self.assertIsNotNone(rpcapi.router)
self.assertEqual(rpcapi.router.target.topic, CONF.compute_topic)
orig_prepare = rpcapi.client.prepare
base_version = rpcapi.client.target.version
# This test wants to run the real prepare function, so must use
# a real client object
default_client = rpcapi.router.clients['default'].client
orig_prepare = default_client.prepare
base_version = rpcapi.router.target.version
expected_version = kwargs.pop('version', base_version)
expected_kwargs = kwargs.copy()
@ -127,13 +146,13 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
expected_kwargs['scheduled_node'] = expected_kwargs.pop('node')
with test.nested(
mock.patch.object(rpcapi.client, rpc_method),
mock.patch.object(rpcapi.client, 'prepare'),
mock.patch.object(rpcapi.client, 'can_send_version'),
mock.patch.object(default_client, rpc_method),
mock.patch.object(default_client, 'prepare'),
mock.patch.object(default_client, 'can_send_version'),
) as (
rpc_mock, prepare_mock, csv_mock
):
prepare_mock.return_value = rpcapi.client
prepare_mock.return_value = default_client
if '_return_value' in kwargs:
rpc_mock.return_value = kwargs.pop('_return_value')
del expected_kwargs['_return_value']
@ -206,10 +225,13 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
rpcapi = compute_rpcapi.ComputeAPI()
cast_mock = mock.Mock()
cctxt_mock = mock.Mock(cast=cast_mock)
rpcapi.router.by_instance = mock.Mock()
mock_client = mock.Mock()
rpcapi.router.by_instance.return_value = mock_client
with test.nested(
mock.patch.object(rpcapi.client, 'can_send_version',
mock.patch.object(mock_client, 'can_send_version',
return_value=False),
mock.patch.object(rpcapi.client, 'prepare',
mock.patch.object(mock_client, 'prepare',
return_value=cctxt_mock)
) as (
can_send_mock, prepare_mock
@ -313,8 +335,9 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
ctxt = context.RequestContext('fake_user', 'fake_project')
version = '4.12'
rpcapi = compute_rpcapi.ComputeAPI()
rpcapi.router.by_host = mock.Mock()
mock_client = mock.MagicMock()
rpcapi.client = mock_client
rpcapi.router.by_host.return_value = mock_client
mock_client.can_send_version.return_value = True
mock_cctx = mock.MagicMock()
mock_client.prepare.return_value = mock_cctx
@ -333,8 +356,9 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
version = '4.9'
ctxt = context.RequestContext('fake_user', 'fake_project')
rpcapi = compute_rpcapi.ComputeAPI()
rpcapi.router.by_host = mock.Mock()
mock_client = mock.MagicMock()
rpcapi.client = mock_client
rpcapi.router.by_host.return_value = mock_client
mock_client.can_send_version.return_value = False
mock_cctx = mock.MagicMock()
mock_client.prepare.return_value = mock_cctx
@ -600,33 +624,51 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
def _test_simple_call(self, method, inargs, callargs, callret,
calltype='call', can_send=False):
rpc = compute_rpcapi.ComputeAPI()
mock_client = mock.Mock()
rpc.router.by_instance = mock.Mock()
rpc.router.by_instance.return_value = mock_client
rpc.router.by_host = mock.Mock()
rpc.router.by_host.return_value = mock_client
@mock.patch.object(rpc, 'client')
@mock.patch.object(compute_rpcapi, '_compute_host')
def _test(mock_ch, mock_client):
def _test(mock_ch):
mock_client.can_send_version.return_value = can_send
call = getattr(mock_client.prepare.return_value, calltype)
call.return_value = callret
ctxt = context.RequestContext()
result = getattr(rpc, method)(ctxt, **inargs)
call.assert_called_once_with(ctxt, method, **callargs)
# Get the target of the prepare call: prepare(server=<target>, ...)
prepare_target = mock_client.prepare.call_args[1]['server']
# If _compute_host(None, instance) was called, then by_instance
# should have been called with the instance. Otherwise by_host
# should have been called with the same host as the prepare target.
if mock_ch.called and mock_ch.call_args[0][0] is None:
instance = mock_ch.call_args[0][1]
rpc.router.by_instance.assert_called_once_with(ctxt, instance)
rpc.router.by_host.assert_not_called()
else:
rpc.router.by_host.assert_called_once_with(ctxt,
prepare_target)
rpc.router.by_instance.assert_not_called()
return result
return _test()
def test_check_can_live_migrate_source_converts_objects(self):
obj = migrate_data_obj.LiveMigrateData()
inst = self.fake_instance_obj
result = self._test_simple_call('check_can_live_migrate_source',
inargs={'instance': 'foo',
inargs={'instance': inst,
'dest_check_data': obj},
callargs={'instance': 'foo',
callargs={'instance': inst,
'dest_check_data': {}},
callret=obj)
self.assertEqual(obj, result)
result = self._test_simple_call('check_can_live_migrate_source',
inargs={'instance': 'foo',
inargs={'instance': inst,
'dest_check_data': obj},
callargs={'instance': 'foo',
callargs={'instance': inst,
'dest_check_data': {}},
callret={'foo': 'bar'})
self.assertIsInstance(result, migrate_data_obj.LiveMigrateData)
@ -635,12 +677,13 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
'detect_implementation')
def test_check_can_live_migrate_destination_converts_dict(self,
mock_det):
inst = self.fake_instance_obj
result = self._test_simple_call('check_can_live_migrate_destination',
inargs={'instance': 'foo',
inargs={'instance': inst,
'destination': 'bar',
'block_migration': False,
'disk_over_commit': False},
callargs={'instance': 'foo',
callargs={'instance': inst,
'block_migration': False,
'disk_over_commit': False},
callret={'foo': 'bar'})
@ -648,14 +691,15 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
def test_live_migration_converts_objects(self):
obj = migrate_data_obj.LiveMigrateData()
inst = self.fake_instance_obj
self._test_simple_call('live_migration',
inargs={'instance': 'foo',
inargs={'instance': inst,
'dest': 'foo',
'block_migration': False,
'host': 'foo',
'migration': None,
'migrate_data': obj},
callargs={'instance': 'foo',
callargs={'instance': inst,
'dest': 'foo',
'block_migration': False,
'migrate_data': {
@ -666,13 +710,14 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
@mock.patch('nova.objects.migrate_data.LiveMigrateData.from_legacy_dict')
def test_pre_live_migration_converts_objects(self, mock_fld):
obj = migrate_data_obj.LiveMigrateData()
inst = self.fake_instance_obj
result = self._test_simple_call('pre_live_migration',
inargs={'instance': 'foo',
inargs={'instance': inst,
'block_migration': False,
'disk': None,
'host': 'foo',
'migrate_data': obj},
callargs={'instance': 'foo',
callargs={'instance': inst,
'block_migration': False,
'disk': None,
'migrate_data': {}},
@ -680,12 +725,12 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
self.assertFalse(mock_fld.called)
self.assertEqual(obj, result)
result = self._test_simple_call('pre_live_migration',
inargs={'instance': 'foo',
inargs={'instance': inst,
'block_migration': False,
'disk': None,
'host': 'foo',
'migrate_data': obj},
callargs={'instance': 'foo',
callargs={'instance': inst,
'block_migration': False,
'disk': None,
'migrate_data': {}},
@ -696,13 +741,14 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
def test_rollback_live_migration_at_destination_converts_objects(self):
obj = migrate_data_obj.LiveMigrateData()
inst = self.fake_instance_obj
method = 'rollback_live_migration_at_destination'
self._test_simple_call(method,
inargs={'instance': 'foo',
inargs={'instance': inst,
'host': 'foo',
'destroy_disks': False,
'migrate_data': obj},
callargs={'instance': 'foo',
callargs={'instance': inst,
'destroy_disks': False,
'migrate_data': {}},
callret=None,

View File

@ -279,37 +279,14 @@ class ContextTestCase(test.NoDBTestCase):
mock.sentinel.target)
@mock.patch('nova.db.create_context_manager')
@mock.patch('nova.rpc.create_transport')
def test_target_cell(self, mock_create_transport, mock_create_ctxt_mgr):
def test_target_cell(self, mock_create_ctxt_mgr):
mock_create_ctxt_mgr.return_value = mock.sentinel.cm
mock_create_transport.return_value = mock.sentinel.tp
ctxt = context.RequestContext('111',
'222',
roles=['admin', 'weasel'])
# Verify the existing db_connection, if any, is restored
ctxt.db_connection = mock.sentinel.db_conn
ctxt.mq_connection = mock.sentinel.mq_conn
mapping = objects.CellMapping(database_connection='fake://',
transport_url='anotherfake://')
mapping = objects.CellMapping(database_connection='fake://')
with context.target_cell(ctxt, mapping):
self.assertEqual(ctxt.db_connection, mock.sentinel.cm)
self.assertEqual(ctxt.mq_connection, mock.sentinel.tp)
self.assertEqual(mock.sentinel.db_conn, ctxt.db_connection)
self.assertEqual(mock.sentinel.mq_conn, ctxt.mq_connection)
mock_create_transport.assert_called_once_with(mapping.transport_url)
@mock.patch('nova.db.create_context_manager')
@mock.patch('nova.rpc.create_transport')
def test_target_cell_transport_url_sentinel(self, mock_create_transport,
mock_create_ctxt_mgr):
mock_create_ctxt_mgr.return_value = mock.sentinel.cm
mock_create_transport.return_value = mock.sentinel.tp
ctxt = context.RequestContext('111',
'222',
roles=['admin', 'weasel'])
mapping = objects.CellMapping(database_connection='fake://',
transport_url='none://')
with context.target_cell(ctxt, mapping):
self.assertEqual(ctxt.db_connection, mock.sentinel.cm)
self.assertIsNone(ctxt.mq_connection)
self.assertFalse(mock_create_transport.called)

View File

@ -12,16 +12,21 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
import fixtures
import mock
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from oslo_utils import fixture as utils_fixture
import testtools
from nova import context
from nova import exception
from nova import objects
from nova import rpc
from nova import test
from nova.tests import uuidsentinel as uuids
# Make a class that resets all of the global variables in nova.rpc
@ -49,53 +54,6 @@ class TestRPC(testtools.TestCase):
super(TestRPC, self).setUp()
self.useFixture(RPCResetFixture())
@mock.patch('oslo_messaging.RPCClient')
def test_cell_client(self, mock_rpcclient):
default_client = mock.Mock()
dynamic_client = mock.Mock()
mock_rpcclient.return_value = dynamic_client
ctxt = mock.Mock()
ctxt.mq_connection = 'fake://'
class FakeAPI(object):
def __init__(self):
self.client = default_client
def rpc_api_function(self, context):
rpc.get_cell_client(context, self.client).do_rpc()
rpcapi = FakeAPI()
rpcapi.rpc_api_function(ctxt)
# verify a dynamic client was created
mock_rpcclient.assert_called_once_with(
ctxt.mq_connection, default_client.target,
version_cap=default_client.version_cap,
serializer=default_client.serializer)
# verify dynamic client handled the rpc
dynamic_client.do_rpc.assert_called_once_with()
@mock.patch('oslo_messaging.RPCClient')
def test_cell_client_no_switch(self, mock_rpcclient):
default_client = mock.Mock()
dynamic_client = mock.Mock()
mock_rpcclient.return_value = dynamic_client
ctxt = mock.Mock()
ctxt.mq_connection = None
class FakeAPI(object):
def __init__(self):
self.client = default_client
def rpc_api_function(self, context):
rpc.get_cell_client(context, self.client).do_rpc()
rpcapi = FakeAPI()
rpcapi.rpc_api_function(ctxt)
# verify a dynamic client was not created
self.assertFalse(mock_rpcclient.called)
# verify default client handled the rpc
default_client.do_rpc.assert_called_once_with()
@mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_transport')
@ -402,3 +360,182 @@ class TestRequestContextSerializer(test.NoDBTestCase):
self.ser.deserialize_context('context')
mock_req.from_dict.assert_called_once_with('context')
class TestClientRouter(test.NoDBTestCase):
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
@mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
def test_by_instance(self, mock_rpcclient, mock_create, mock_get):
default_client = mock.Mock()
cell_client = mock.Mock()
mock_rpcclient.return_value = cell_client
ctxt = mock.Mock()
cm = objects.CellMapping(uuid=uuids.cell_mapping,
transport_url='fake:///')
mock_get.return_value = objects.InstanceMapping(cell_mapping=cm)
instance = objects.Instance(uuid=uuids.instance)
router = rpc.ClientRouter(default_client)
client = router.by_instance(ctxt, instance)
mock_get.assert_called_once_with(ctxt, instance.uuid)
# verify a client was created by ClientRouter
mock_rpcclient.assert_called_once_with(
mock_create.return_value, default_client.target,
version_cap=default_client.version_cap,
serializer=default_client.serializer)
# verify cell client was returned
self.assertEqual(cell_client, client)
# reset and check that cached client is returned the second time
mock_rpcclient.reset_mock()
mock_create.reset_mock()
mock_get.reset_mock()
client = router.by_instance(ctxt, instance)
mock_get.assert_called_once_with(ctxt, instance.uuid)
mock_rpcclient.assert_not_called()
mock_create.assert_not_called()
self.assertEqual(cell_client, client)
@mock.patch('nova.objects.HostMapping.get_by_host')
@mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
def test_by_host(self, mock_rpcclient, mock_create, mock_get):
default_client = mock.Mock()
cell_client = mock.Mock()
mock_rpcclient.return_value = cell_client
ctxt = mock.Mock()
cm = objects.CellMapping(uuid=uuids.cell_mapping,
transport_url='fake:///')
mock_get.return_value = objects.HostMapping(cell_mapping=cm)
host = 'fake-host'
router = rpc.ClientRouter(default_client)
client = router.by_host(ctxt, host)
mock_get.assert_called_once_with(ctxt, host)
# verify a client was created by ClientRouter
mock_rpcclient.assert_called_once_with(
mock_create.return_value, default_client.target,
version_cap=default_client.version_cap,
serializer=default_client.serializer)
# verify cell client was returned
self.assertEqual(cell_client, client)
# reset and check that cached client is returned the second time
mock_rpcclient.reset_mock()
mock_create.reset_mock()
mock_get.reset_mock()
client = router.by_host(ctxt, host)
mock_get.assert_called_once_with(ctxt, host)
mock_rpcclient.assert_not_called()
mock_create.assert_not_called()
self.assertEqual(cell_client, client)
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid',
side_effect=exception.InstanceMappingNotFound(uuid=uuids.instance))
@mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
def test_by_instance_not_found(self, mock_rpcclient, mock_create,
mock_get):
default_client = mock.Mock()
cell_client = mock.Mock()
mock_rpcclient.return_value = cell_client
ctxt = mock.Mock()
instance = objects.Instance(uuid=uuids.instance)
router = rpc.ClientRouter(default_client)
client = router.by_instance(ctxt, instance)
mock_get.assert_called_once_with(ctxt, instance.uuid)
mock_rpcclient.assert_not_called()
mock_create.assert_not_called()
# verify default client was returned
self.assertEqual(default_client, client)
@mock.patch('nova.objects.HostMapping.get_by_host',
side_effect=exception.HostMappingNotFound(name='fake-host'))
@mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
def test_by_host_not_found(self, mock_rpcclient, mock_create, mock_get):
default_client = mock.Mock()
cell_client = mock.Mock()
mock_rpcclient.return_value = cell_client
ctxt = mock.Mock()
host = 'fake-host'
router = rpc.ClientRouter(default_client)
client = router.by_host(ctxt, host)
mock_get.assert_called_once_with(ctxt, host)
mock_rpcclient.assert_not_called()
mock_create.assert_not_called()
# verify default client was returned
self.assertEqual(default_client, client)
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
@mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
def test_remove_stale_clients(self, mock_rpcclient, mock_create, mock_get):
t0 = datetime.datetime(2016, 8, 9, 0, 0, 0)
time_fixture = self.useFixture(utils_fixture.TimeFixture(t0))
default_client = mock.Mock()
ctxt = mock.Mock()
cm1 = objects.CellMapping(uuid=uuids.cell_mapping1,
transport_url='fake:///')
cm2 = objects.CellMapping(uuid=uuids.cell_mapping2,
transport_url='fake:///')
cm3 = objects.CellMapping(uuid=uuids.cell_mapping3,
transport_url='fake:///')
mock_get.side_effect = [objects.InstanceMapping(cell_mapping=cm1),
objects.InstanceMapping(cell_mapping=cm2),
objects.InstanceMapping(cell_mapping=cm3),
objects.InstanceMapping(cell_mapping=cm3)]
instance1 = objects.Instance(uuid=uuids.instance1)
instance2 = objects.Instance(uuid=uuids.instance2)
instance3 = objects.Instance(uuid=uuids.instance3)
router = rpc.ClientRouter(default_client)
cell1_client = router.by_instance(ctxt, instance1)
cell2_client = router.by_instance(ctxt, instance2)
# default client, cell1 client, cell2 client
self.assertEqual(3, len(router.clients))
expected = {'default': default_client,
uuids.cell_mapping1: cell1_client,
uuids.cell_mapping2: cell2_client}
for client_id, client in expected.items():
self.assertEqual(client, router.clients[client_id].client)
# expire cell1 client and cell2 client
time_fixture.advance_time_seconds(80)
# add cell3 client
cell3_client = router.by_instance(ctxt, instance3)
router._remove_stale_clients(ctxt)
# default client, cell3 client
expected = {'default': default_client,
uuids.cell_mapping3: cell3_client}
self.assertEqual(2, len(router.clients))
for client_id, client in expected.items():
self.assertEqual(client, router.clients[client_id].client)
# expire cell3 client
time_fixture.advance_time_seconds(80)
# access cell3 client to refresh it
cell3_client = router.by_instance(ctxt, instance3)
router._remove_stale_clients(ctxt)
# default client and cell3 client should be there
self.assertEqual(2, len(router.clients))
for client_id, client in expected.items():
self.assertEqual(client, router.clients[client_id].client)