Merge "Move cell message queue switching and add caching"
This commit is contained in:
commit
8bd486594f
@ -333,7 +333,8 @@ class ComputeAPI(object):
|
||||
version_cap = self.VERSION_ALIASES.get(upgrade_level,
|
||||
upgrade_level)
|
||||
serializer = objects_base.NovaObjectSerializer()
|
||||
self.client = self.get_client(target, version_cap, serializer)
|
||||
default_client = self.get_client(target, version_cap, serializer)
|
||||
self.router = rpc.ClientRouter(default_client)
|
||||
|
||||
def _determine_version_cap(self, target):
|
||||
global LAST_VERSION
|
||||
@ -374,21 +375,12 @@ class ComputeAPI(object):
|
||||
'service': service_version})
|
||||
return version_cap
|
||||
|
||||
def _compat_ver(self, current, legacy):
|
||||
if self.client.can_send_version(current):
|
||||
return current
|
||||
else:
|
||||
return legacy
|
||||
|
||||
# Cells overrides this
|
||||
def get_client(self, target, version_cap, serializer):
|
||||
return rpc.get_client(target,
|
||||
version_cap=version_cap,
|
||||
serializer=serializer)
|
||||
|
||||
def get_cell_client(self, context):
|
||||
return rpc.get_cell_client(context, self.client)
|
||||
|
||||
def add_aggregate_host(self, ctxt, host, aggregate, host_param,
|
||||
slave_info=None):
|
||||
'''Add aggregate host.
|
||||
@ -400,7 +392,7 @@ class ComputeAPI(object):
|
||||
:param host: This is the host to send the message to.
|
||||
'''
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
cctxt.cast(ctxt, 'add_aggregate_host',
|
||||
aggregate=aggregate, host=host_param,
|
||||
@ -408,7 +400,7 @@ class ComputeAPI(object):
|
||||
|
||||
def add_fixed_ip_to_instance(self, ctxt, instance, network_id):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'add_fixed_ip_to_instance',
|
||||
instance=instance, network_id=network_id)
|
||||
@ -416,7 +408,7 @@ class ComputeAPI(object):
|
||||
def attach_interface(self, ctxt, instance, network_id, port_id,
|
||||
requested_ip):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'attach_interface',
|
||||
instance=instance, network_id=network_id,
|
||||
@ -424,13 +416,13 @@ class ComputeAPI(object):
|
||||
|
||||
def attach_volume(self, ctxt, instance, bdm):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'attach_volume', instance=instance, bdm=bdm)
|
||||
|
||||
def change_instance_metadata(self, ctxt, instance, diff):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'change_instance_metadata',
|
||||
instance=instance, diff=diff)
|
||||
@ -438,8 +430,8 @@ class ComputeAPI(object):
|
||||
def check_can_live_migrate_destination(self, ctxt, instance, destination,
|
||||
block_migration, disk_over_commit):
|
||||
version = '4.11'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
if not cell_client.can_send_version(version):
|
||||
client = self.router.by_host(ctxt, destination)
|
||||
if not client.can_send_version(version):
|
||||
# NOTE(eliqiao): This is a new feature that is only available
|
||||
# once all compute nodes support at least version 4.11.
|
||||
# This means the new REST API that supports this needs to handle
|
||||
@ -450,7 +442,7 @@ class ComputeAPI(object):
|
||||
else:
|
||||
version = '4.0'
|
||||
|
||||
cctxt = cell_client.prepare(server=destination, version=version)
|
||||
cctxt = client.prepare(server=destination, version=version)
|
||||
result = cctxt.call(ctxt, 'check_can_live_migrate_destination',
|
||||
instance=instance,
|
||||
block_migration=block_migration,
|
||||
@ -466,13 +458,13 @@ class ComputeAPI(object):
|
||||
def check_can_live_migrate_source(self, ctxt, instance, dest_check_data):
|
||||
dest_check_data_obj = dest_check_data
|
||||
version = '4.8'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
if not cell_client.can_send_version(version):
|
||||
client = self.router.by_instance(ctxt, instance)
|
||||
if not client.can_send_version(version):
|
||||
version = '4.0'
|
||||
if dest_check_data:
|
||||
dest_check_data = dest_check_data.to_legacy_dict()
|
||||
source = _compute_host(None, instance)
|
||||
cctxt = cell_client.prepare(server=source, version=version)
|
||||
cctxt = client.prepare(server=source, version=version)
|
||||
result = cctxt.call(ctxt, 'check_can_live_migrate_source',
|
||||
instance=instance,
|
||||
dest_check_data=dest_check_data)
|
||||
@ -486,7 +478,11 @@ class ComputeAPI(object):
|
||||
|
||||
def check_instance_shared_storage(self, ctxt, instance, data, host=None):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
if not host:
|
||||
client = self.router.by_instance(ctxt, instance)
|
||||
else:
|
||||
client = self.router.by_host(ctxt, host)
|
||||
cctxt = client.prepare(
|
||||
server=_compute_host(host, instance), version=version)
|
||||
return cctxt.call(ctxt, 'check_instance_shared_storage',
|
||||
instance=instance,
|
||||
@ -495,7 +491,11 @@ class ComputeAPI(object):
|
||||
def confirm_resize(self, ctxt, instance, migration, host,
|
||||
reservations=None, cast=True):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
if not host:
|
||||
client = self.router.by_instance(ctxt, instance)
|
||||
else:
|
||||
client = self.router.by_host(ctxt, host)
|
||||
cctxt = client.prepare(
|
||||
server=_compute_host(host, instance), version=version)
|
||||
rpc_method = cctxt.cast if cast else cctxt.call
|
||||
return rpc_method(ctxt, 'confirm_resize',
|
||||
@ -504,7 +504,7 @@ class ComputeAPI(object):
|
||||
|
||||
def detach_interface(self, ctxt, instance, port_id):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'detach_interface',
|
||||
instance=instance, port_id=port_id)
|
||||
@ -512,11 +512,11 @@ class ComputeAPI(object):
|
||||
def detach_volume(self, ctxt, instance, volume_id, attachment_id=None):
|
||||
extra = {'attachment_id': attachment_id}
|
||||
version = '4.7'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
if not cell_client.can_send_version(version):
|
||||
client = self.router.by_instance(ctxt, instance)
|
||||
if not client.can_send_version(version):
|
||||
version = '4.0'
|
||||
extra.pop('attachment_id')
|
||||
cctxt = cell_client.prepare(server=_compute_host(None, instance),
|
||||
cctxt = client.prepare(server=_compute_host(None, instance),
|
||||
version=version)
|
||||
cctxt.cast(ctxt, 'detach_volume',
|
||||
instance=instance, volume_id=volume_id, **extra)
|
||||
@ -524,7 +524,7 @@ class ComputeAPI(object):
|
||||
def finish_resize(self, ctxt, instance, migration, image, disk_info,
|
||||
host, reservations=None):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
cctxt.cast(ctxt, 'finish_resize',
|
||||
instance=instance, migration=migration,
|
||||
@ -533,7 +533,7 @@ class ComputeAPI(object):
|
||||
def finish_revert_resize(self, ctxt, instance, migration, host,
|
||||
reservations=None):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
cctxt.cast(ctxt, 'finish_revert_resize',
|
||||
instance=instance, migration=migration,
|
||||
@ -541,78 +541,78 @@ class ComputeAPI(object):
|
||||
|
||||
def get_console_output(self, ctxt, instance, tail_length):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'get_console_output',
|
||||
instance=instance, tail_length=tail_length)
|
||||
|
||||
def get_console_pool_info(self, ctxt, host, console_type):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
return cctxt.call(ctxt, 'get_console_pool_info',
|
||||
console_type=console_type)
|
||||
|
||||
def get_console_topic(self, ctxt, host):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
return cctxt.call(ctxt, 'get_console_topic')
|
||||
|
||||
def get_diagnostics(self, ctxt, instance):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'get_diagnostics', instance=instance)
|
||||
|
||||
def get_instance_diagnostics(self, ctxt, instance):
|
||||
version = '4.13'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
if not cell_client.can_send_version(version):
|
||||
client = self.router.by_instance(ctxt, instance)
|
||||
if not client.can_send_version(version):
|
||||
version = '4.0'
|
||||
instance = objects_base.obj_to_primitive(instance)
|
||||
cctxt = cell_client.prepare(server=_compute_host(None, instance),
|
||||
cctxt = client.prepare(server=_compute_host(None, instance),
|
||||
version=version)
|
||||
return cctxt.call(ctxt, 'get_instance_diagnostics', instance=instance)
|
||||
|
||||
def get_vnc_console(self, ctxt, instance, console_type):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'get_vnc_console',
|
||||
instance=instance, console_type=console_type)
|
||||
|
||||
def get_spice_console(self, ctxt, instance, console_type):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'get_spice_console',
|
||||
instance=instance, console_type=console_type)
|
||||
|
||||
def get_rdp_console(self, ctxt, instance, console_type):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'get_rdp_console',
|
||||
instance=instance, console_type=console_type)
|
||||
|
||||
def get_mks_console(self, ctxt, instance, console_type):
|
||||
version = '4.3'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'get_mks_console',
|
||||
instance=instance, console_type=console_type)
|
||||
|
||||
def get_serial_console(self, ctxt, instance, console_type):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'get_serial_console',
|
||||
instance=instance, console_type=console_type)
|
||||
|
||||
def validate_console_port(self, ctxt, instance, port, console_type):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'validate_console_port',
|
||||
instance=instance, port=port,
|
||||
@ -628,20 +628,20 @@ class ComputeAPI(object):
|
||||
:param host: This is the host to send the message to.
|
||||
'''
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
return cctxt.call(ctxt, 'host_maintenance_mode',
|
||||
host=host_param, mode=mode)
|
||||
|
||||
def host_power_action(self, ctxt, host, action):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
return cctxt.call(ctxt, 'host_power_action', action=action)
|
||||
|
||||
def inject_network_info(self, ctxt, instance):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'inject_network_info', instance=instance)
|
||||
|
||||
@ -649,16 +649,16 @@ class ComputeAPI(object):
|
||||
migration, migrate_data=None):
|
||||
args = {'migration': migration}
|
||||
version = '4.8'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
if not cell_client.can_send_version(version):
|
||||
client = self.router.by_host(ctxt, host)
|
||||
if not client.can_send_version(version):
|
||||
version = '4.2'
|
||||
if migrate_data:
|
||||
migrate_data = migrate_data.to_legacy_dict(
|
||||
pre_migration_result=True)
|
||||
if not cell_client.can_send_version(version):
|
||||
if not client.can_send_version(version):
|
||||
version = '4.0'
|
||||
args.pop('migration')
|
||||
cctxt = cell_client.prepare(server=host, version=version)
|
||||
cctxt = client.prepare(server=host, version=version)
|
||||
cctxt.cast(ctxt, 'live_migration', instance=instance,
|
||||
dest=dest, block_migration=block_migration,
|
||||
migrate_data=migrate_data, **args)
|
||||
@ -666,31 +666,36 @@ class ComputeAPI(object):
|
||||
def live_migration_force_complete(self, ctxt, instance, migration):
|
||||
version = '4.12'
|
||||
kwargs = {}
|
||||
if not self.client.can_send_version(version):
|
||||
if not migration.source_compute:
|
||||
client = self.router.by_instance(ctxt, instance)
|
||||
else:
|
||||
client = self.router.by_host(ctxt, migration.source_compute)
|
||||
if not client.can_send_version(version):
|
||||
version = '4.9'
|
||||
kwargs['migration_id'] = migration.id
|
||||
cctxt = self.get_cell_client(ctxt).prepare(server=_compute_host(
|
||||
migration.source_compute, instance), version=version)
|
||||
cctxt = client.prepare(
|
||||
server=_compute_host(migration.source_compute, instance),
|
||||
version=version)
|
||||
cctxt.cast(ctxt, 'live_migration_force_complete', instance=instance,
|
||||
**kwargs)
|
||||
|
||||
def live_migration_abort(self, ctxt, instance, migration_id):
|
||||
version = '4.10'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'live_migration_abort', instance=instance,
|
||||
migration_id=migration_id)
|
||||
|
||||
def pause_instance(self, ctxt, instance):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'pause_instance', instance=instance)
|
||||
|
||||
def post_live_migration_at_destination(self, ctxt, instance,
|
||||
block_migration, host):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
cctxt.cast(ctxt, 'post_live_migration_at_destination',
|
||||
instance=instance, block_migration=block_migration)
|
||||
@ -699,12 +704,12 @@ class ComputeAPI(object):
|
||||
host, migrate_data=None):
|
||||
migrate_data_orig = migrate_data
|
||||
version = '4.8'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
if not cell_client.can_send_version(version):
|
||||
client = self.router.by_host(ctxt, host)
|
||||
if not client.can_send_version(version):
|
||||
version = '4.0'
|
||||
if migrate_data:
|
||||
migrate_data = migrate_data.to_legacy_dict()
|
||||
cctxt = cell_client.prepare(server=host, version=version)
|
||||
cctxt = client.prepare(server=host, version=version)
|
||||
result = cctxt.call(ctxt, 'pre_live_migration',
|
||||
instance=instance,
|
||||
block_migration=block_migration,
|
||||
@ -732,18 +737,18 @@ class ComputeAPI(object):
|
||||
'node': node,
|
||||
'clean_shutdown': clean_shutdown}
|
||||
version = '4.1'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
if not cell_client.can_send_version(version):
|
||||
client = self.router.by_host(ctxt, host)
|
||||
if not client.can_send_version(version):
|
||||
version = '4.0'
|
||||
msg_args['instance_type'] = objects_base.obj_to_primitive(
|
||||
instance_type)
|
||||
cctxt = cell_client.prepare(server=host, version=version)
|
||||
cctxt = client.prepare(server=host, version=version)
|
||||
cctxt.cast(ctxt, 'prep_resize', **msg_args)
|
||||
|
||||
def reboot_instance(self, ctxt, instance, block_device_info,
|
||||
reboot_type):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'reboot_instance',
|
||||
instance=instance,
|
||||
@ -762,13 +767,16 @@ class ComputeAPI(object):
|
||||
'scheduled_node': node,
|
||||
'limits': limits}
|
||||
version = '4.5'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
if not cell_client.can_send_version(version):
|
||||
if not host:
|
||||
client = self.router.by_instance(ctxt, instance)
|
||||
else:
|
||||
client = self.router.by_host(ctxt, host)
|
||||
if not client.can_send_version(version):
|
||||
version = '4.0'
|
||||
extra.pop('migration')
|
||||
extra.pop('scheduled_node')
|
||||
extra.pop('limits')
|
||||
cctxt = cell_client.prepare(server=_compute_host(host, instance),
|
||||
cctxt = client.prepare(server=_compute_host(host, instance),
|
||||
version=version)
|
||||
cctxt.cast(ctxt, 'rebuild_instance',
|
||||
instance=instance, new_pass=new_pass,
|
||||
@ -789,7 +797,7 @@ class ComputeAPI(object):
|
||||
:param host: This is the host to send the message to.
|
||||
'''
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
cctxt.cast(ctxt, 'remove_aggregate_host',
|
||||
aggregate=aggregate, host=host_param,
|
||||
@ -797,14 +805,14 @@ class ComputeAPI(object):
|
||||
|
||||
def remove_fixed_ip_from_instance(self, ctxt, instance, address):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'remove_fixed_ip_from_instance',
|
||||
instance=instance, address=address)
|
||||
|
||||
def remove_volume_connection(self, ctxt, instance, volume_id, host):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
return cctxt.call(ctxt, 'remove_volume_connection',
|
||||
instance=instance, volume_id=volume_id)
|
||||
@ -817,13 +825,13 @@ class ComputeAPI(object):
|
||||
'rescue_image_ref': rescue_image_ref,
|
||||
'instance': instance,
|
||||
}
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'rescue_instance', **msg_args)
|
||||
|
||||
def reset_network(self, ctxt, instance):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'reset_network', instance=instance)
|
||||
|
||||
@ -835,25 +843,29 @@ class ComputeAPI(object):
|
||||
'clean_shutdown': clean_shutdown,
|
||||
}
|
||||
version = '4.1'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
if not cell_client.can_send_version(version):
|
||||
client = self.router.by_instance(ctxt, instance)
|
||||
if not client.can_send_version(version):
|
||||
msg_args['instance_type'] = objects_base.obj_to_primitive(
|
||||
instance_type)
|
||||
version = '4.0'
|
||||
cctxt = cell_client.prepare(server=_compute_host(None, instance),
|
||||
cctxt = client.prepare(server=_compute_host(None, instance),
|
||||
version=version)
|
||||
cctxt.cast(ctxt, 'resize_instance', **msg_args)
|
||||
|
||||
def resume_instance(self, ctxt, instance):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'resume_instance', instance=instance)
|
||||
|
||||
def revert_resize(self, ctxt, instance, migration, host,
|
||||
reservations=None):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
if not host:
|
||||
client = self.router.by_instance(ctxt, instance)
|
||||
else:
|
||||
client = self.router.by_host(ctxt, host)
|
||||
cctxt = client.prepare(
|
||||
server=_compute_host(host, instance), version=version)
|
||||
cctxt.cast(ctxt, 'revert_resize',
|
||||
instance=instance, migration=migration,
|
||||
@ -863,34 +875,34 @@ class ComputeAPI(object):
|
||||
destroy_disks=True,
|
||||
migrate_data=None):
|
||||
version = '4.8'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
if not cell_client.can_send_version(version):
|
||||
client = self.router.by_host(ctxt, host)
|
||||
if not client.can_send_version(version):
|
||||
version = '4.0'
|
||||
if migrate_data:
|
||||
migrate_data = migrate_data.to_legacy_dict()
|
||||
extra = {'destroy_disks': destroy_disks,
|
||||
'migrate_data': migrate_data,
|
||||
}
|
||||
cctxt = cell_client.prepare(server=host, version=version)
|
||||
cctxt = client.prepare(server=host, version=version)
|
||||
cctxt.cast(ctxt, 'rollback_live_migration_at_destination',
|
||||
instance=instance, **extra)
|
||||
|
||||
def set_admin_password(self, ctxt, instance, new_pass):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'set_admin_password',
|
||||
instance=instance, new_pass=new_pass)
|
||||
|
||||
def set_host_enabled(self, ctxt, host, enabled):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
return cctxt.call(ctxt, 'set_host_enabled', enabled=enabled)
|
||||
|
||||
def swap_volume(self, ctxt, instance, old_volume_id, new_volume_id):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'swap_volume',
|
||||
instance=instance, old_volume_id=old_volume_id,
|
||||
@ -898,7 +910,7 @@ class ComputeAPI(object):
|
||||
|
||||
def get_host_uptime(self, ctxt, host):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
return cctxt.call(ctxt, 'get_host_uptime')
|
||||
|
||||
@ -909,14 +921,14 @@ class ComputeAPI(object):
|
||||
'device_type': device_type}
|
||||
version = '4.0'
|
||||
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'reserve_block_device_name', **kw)
|
||||
|
||||
def backup_instance(self, ctxt, instance, image_id, backup_type,
|
||||
rotation):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'backup_instance',
|
||||
instance=instance,
|
||||
@ -926,7 +938,7 @@ class ComputeAPI(object):
|
||||
|
||||
def snapshot_instance(self, ctxt, instance, image_id):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'snapshot_instance',
|
||||
instance=instance,
|
||||
@ -934,7 +946,7 @@ class ComputeAPI(object):
|
||||
|
||||
def start_instance(self, ctxt, instance):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'start_instance', instance=instance)
|
||||
|
||||
@ -942,14 +954,14 @@ class ComputeAPI(object):
|
||||
msg_args = {'instance': instance,
|
||||
'clean_shutdown': clean_shutdown}
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
rpc_method = cctxt.cast if do_cast else cctxt.call
|
||||
return rpc_method(ctxt, 'stop_instance', **msg_args)
|
||||
|
||||
def suspend_instance(self, ctxt, instance):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'suspend_instance', instance=instance)
|
||||
|
||||
@ -959,7 +971,7 @@ class ComputeAPI(object):
|
||||
# the method signature has to match with `terminate_instance()`
|
||||
# method of cells rpcapi.
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'terminate_instance',
|
||||
instance=instance, bdms=bdms,
|
||||
@ -967,26 +979,26 @@ class ComputeAPI(object):
|
||||
|
||||
def unpause_instance(self, ctxt, instance):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'unpause_instance', instance=instance)
|
||||
|
||||
def unrescue_instance(self, ctxt, instance):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'unrescue_instance', instance=instance)
|
||||
|
||||
def soft_delete_instance(self, ctxt, instance, reservations=None):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'soft_delete_instance',
|
||||
instance=instance, reservations=reservations)
|
||||
|
||||
def restore_instance(self, ctxt, instance):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'restore_instance', instance=instance)
|
||||
|
||||
@ -995,7 +1007,7 @@ class ComputeAPI(object):
|
||||
msg_args = {'instance': instance, 'image_id': image_id,
|
||||
'clean_shutdown': clean_shutdown}
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'shelve_instance', **msg_args)
|
||||
|
||||
@ -1003,7 +1015,7 @@ class ComputeAPI(object):
|
||||
clean_shutdown=True):
|
||||
msg_args = {'instance': instance, 'clean_shutdown': clean_shutdown}
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'shelve_offload_instance', **msg_args)
|
||||
|
||||
@ -1016,14 +1028,14 @@ class ComputeAPI(object):
|
||||
'filter_properties': filter_properties,
|
||||
'node': node,
|
||||
}
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
cctxt.cast(ctxt, 'unshelve_instance', **msg_kwargs)
|
||||
|
||||
def volume_snapshot_create(self, ctxt, instance, volume_id,
|
||||
create_info):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'volume_snapshot_create', instance=instance,
|
||||
volume_id=volume_id, create_info=create_info)
|
||||
@ -1031,15 +1043,16 @@ class ComputeAPI(object):
|
||||
def volume_snapshot_delete(self, ctxt, instance, volume_id, snapshot_id,
|
||||
delete_info):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'volume_snapshot_delete', instance=instance,
|
||||
volume_id=volume_id, snapshot_id=snapshot_id,
|
||||
delete_info=delete_info)
|
||||
|
||||
def external_instance_event(self, ctxt, instances, events):
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
server=_compute_host(None, instances[0]),
|
||||
instance = instances[0]
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance),
|
||||
version='4.0')
|
||||
cctxt.cast(ctxt, 'external_instance_event', instances=instances,
|
||||
events=events)
|
||||
@ -1050,7 +1063,7 @@ class ComputeAPI(object):
|
||||
block_device_mapping=None, node=None, limits=None):
|
||||
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_host(ctxt, host).prepare(
|
||||
server=host, version=version)
|
||||
cctxt.cast(ctxt, 'build_and_run_instance', instance=instance,
|
||||
image=image, request_spec=request_spec,
|
||||
@ -1064,35 +1077,35 @@ class ComputeAPI(object):
|
||||
|
||||
def quiesce_instance(self, ctxt, instance):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
return cctxt.call(ctxt, 'quiesce_instance', instance=instance)
|
||||
|
||||
def unquiesce_instance(self, ctxt, instance, mapping=None):
|
||||
version = '4.0'
|
||||
cctxt = self.get_cell_client(ctxt).prepare(
|
||||
cctxt = self.router.by_instance(ctxt, instance).prepare(
|
||||
server=_compute_host(None, instance), version=version)
|
||||
cctxt.cast(ctxt, 'unquiesce_instance', instance=instance,
|
||||
mapping=mapping)
|
||||
|
||||
def refresh_instance_security_rules(self, ctxt, instance, host):
|
||||
version = '4.4'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
if not cell_client.can_send_version(version):
|
||||
client = self.router.by_instance(ctxt, instance)
|
||||
if not client.can_send_version(version):
|
||||
version = '4.0'
|
||||
instance = objects_base.obj_to_primitive(instance)
|
||||
cctxt = cell_client.prepare(server=_compute_host(None, instance),
|
||||
cctxt = client.prepare(server=_compute_host(None, instance),
|
||||
version=version)
|
||||
cctxt.cast(ctxt, 'refresh_instance_security_rules',
|
||||
instance=instance)
|
||||
|
||||
def trigger_crash_dump(self, ctxt, instance):
|
||||
version = '4.6'
|
||||
cell_client = self.get_cell_client(ctxt)
|
||||
client = self.router.by_instance(ctxt, instance)
|
||||
|
||||
if not cell_client.can_send_version(version):
|
||||
if not client.can_send_version(version):
|
||||
raise exception.TriggerCrashDumpNotSupported()
|
||||
|
||||
cctxt = cell_client.prepare(server=_compute_host(None, instance),
|
||||
cctxt = client.prepare(server=_compute_host(None, instance),
|
||||
version=version)
|
||||
return cctxt.cast(ctxt, "trigger_crash_dump", instance=instance)
|
||||
|
@ -343,26 +343,18 @@ def authorize_quota_class_context(context, class_name):
|
||||
|
||||
@contextmanager
|
||||
def target_cell(context, cell_mapping):
|
||||
"""Adds database and message queue connection information to the context
|
||||
"""Adds database connection information to the context
|
||||
for communicating with the given target cell.
|
||||
|
||||
:param context: The RequestContext to add connection information
|
||||
:param cell_mapping: A objects.CellMapping object
|
||||
"""
|
||||
original_db_connection = context.db_connection
|
||||
original_mq_connection = context.mq_connection
|
||||
# avoid circular imports
|
||||
# avoid circular import
|
||||
from nova import db
|
||||
from nova import rpc
|
||||
db_connection_string = cell_mapping.database_connection
|
||||
context.db_connection = db.create_context_manager(db_connection_string)
|
||||
# NOTE(melwitt): none:// url is a special value meaning do not switch
|
||||
if not cell_mapping.transport_url.startswith('none'):
|
||||
transport_url = cell_mapping.transport_url
|
||||
context.mq_connection = rpc.create_transport(transport_url)
|
||||
|
||||
try:
|
||||
yield context
|
||||
finally:
|
||||
context.db_connection = original_db_connection
|
||||
context.mq_connection = original_mq_connection
|
||||
|
99
nova/rpc.py
99
nova/rpc.py
@ -31,11 +31,14 @@ import functools
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import timeutils
|
||||
|
||||
import nova.conf
|
||||
import nova.context
|
||||
import nova.exception
|
||||
from nova.i18n import _
|
||||
from nova import objects
|
||||
|
||||
|
||||
CONF = nova.conf.CONF
|
||||
@ -65,20 +68,6 @@ TRANSPORT_ALIASES = {
|
||||
}
|
||||
|
||||
|
||||
def get_cell_client(context, default_client):
|
||||
"""Get a RPCClient object based on a RequestContext.
|
||||
|
||||
:param context: The RequestContext that can contain a Transport
|
||||
:param default_client: The default RPCClient
|
||||
"""
|
||||
if context.mq_connection:
|
||||
return messaging.RPCClient(
|
||||
context.mq_connection, default_client.target,
|
||||
version_cap=default_client.version_cap,
|
||||
serializer=default_client.serializer)
|
||||
return default_client
|
||||
|
||||
|
||||
def init(conf):
|
||||
global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
|
||||
exmods = get_allowed_exmods()
|
||||
@ -363,3 +352,85 @@ class LegacyValidatingNotifier(object):
|
||||
LOG.warning(self.message, {'event_type': event_type})
|
||||
|
||||
getattr(self.notifier, priority)(ctxt, event_type, payload)
|
||||
|
||||
|
||||
class ClientWrapper(object):
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
self.last_access_time = timeutils.utcnow()
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
self.last_access_time = timeutils.utcnow()
|
||||
return self._client
|
||||
|
||||
|
||||
class ClientRouter(periodic_task.PeriodicTasks):
|
||||
"""Creates and caches RPC clients that route to cells or the default.
|
||||
|
||||
The default client connects to the API cell message queue. The rest of the
|
||||
clients connect to compute cell message queues.
|
||||
"""
|
||||
def __init__(self, default_client):
|
||||
super(ClientRouter, self).__init__(CONF)
|
||||
self.clients = {}
|
||||
self.clients['default'] = ClientWrapper(default_client)
|
||||
self.target = default_client.target
|
||||
self.version_cap = default_client.version_cap
|
||||
# NOTE(melwitt): Cells v1 does its own serialization and won't
|
||||
# have a serializer available on the client object.
|
||||
self.serializer = getattr(default_client, 'serializer', None)
|
||||
self.run_periodic_tasks(nova.context.RequestContext())
|
||||
|
||||
def _client(self, context, cell_mapping=None):
|
||||
if cell_mapping:
|
||||
client_id = cell_mapping.uuid
|
||||
else:
|
||||
client_id = 'default'
|
||||
|
||||
try:
|
||||
client = self.clients[client_id].client
|
||||
except KeyError:
|
||||
transport = create_transport(cell_mapping.transport_url)
|
||||
client = messaging.RPCClient(transport, self.target,
|
||||
version_cap=self.version_cap,
|
||||
serializer=self.serializer)
|
||||
self.clients[client_id] = ClientWrapper(client)
|
||||
|
||||
return client
|
||||
|
||||
@periodic_task.periodic_task
|
||||
def _remove_stale_clients(self, context):
|
||||
timeout = 60
|
||||
|
||||
def stale(client_id, last_access_time):
|
||||
if timeutils.is_older_than(last_access_time, timeout):
|
||||
LOG.debug('Removing stale RPC client: %s as it was last '
|
||||
'accessed at %s', client_id, last_access_time)
|
||||
return True
|
||||
return False
|
||||
|
||||
# Never expire the default client
|
||||
items_copy = list(self.clients.items())
|
||||
for client_id, client_wrapper in items_copy:
|
||||
if (client_id != 'default' and
|
||||
stale(client_id, client_wrapper.last_access_time)):
|
||||
del self.clients[client_id]
|
||||
|
||||
def by_instance(self, context, instance):
|
||||
try:
|
||||
cell_mapping = objects.InstanceMapping.get_by_instance_uuid(
|
||||
context, instance.uuid).cell_mapping
|
||||
except nova.exception.InstanceMappingNotFound:
|
||||
# Not a cells v2 deployment
|
||||
cell_mapping = None
|
||||
return self._client(context, cell_mapping=cell_mapping)
|
||||
|
||||
def by_host(self, context, host):
|
||||
try:
|
||||
cell_mapping = objects.HostMapping.get_by_host(
|
||||
context, host).cell_mapping
|
||||
except nova.exception.HostMappingNotFound:
|
||||
# Not a cells v2 deployment
|
||||
cell_mapping = None
|
||||
return self._client(context, cell_mapping=cell_mapping)
|
||||
|
@ -10847,8 +10847,9 @@ class ComputeAPIAggrCallsSchedulerTestCase(test.NoDBTestCase):
|
||||
self.api.delete_aggregate(self.context, 1)
|
||||
delete_aggregate.assert_called_once_with(self.context, agg)
|
||||
|
||||
@mock.patch('nova.compute.rpcapi.ComputeAPI.add_aggregate_host')
|
||||
@mock.patch.object(scheduler_client.SchedulerClient, 'update_aggregates')
|
||||
def test_add_host_to_aggregate(self, update_aggregates):
|
||||
def test_add_host_to_aggregate(self, update_aggregates, mock_add_agg):
|
||||
self.api.is_safe_to_update_az = mock.Mock()
|
||||
self.api._update_az_cache_for_host = mock.Mock()
|
||||
agg = objects.Aggregate(name='fake', metadata={})
|
||||
@ -10859,9 +10860,14 @@ class ComputeAPIAggrCallsSchedulerTestCase(test.NoDBTestCase):
|
||||
return_value=agg)):
|
||||
self.api.add_host_to_aggregate(self.context, 1, 'fakehost')
|
||||
update_aggregates.assert_called_once_with(self.context, [agg])
|
||||
mock_add_agg.assert_called_once_with(self.context, aggregate=agg,
|
||||
host_param='fakehost',
|
||||
host='fakehost')
|
||||
|
||||
@mock.patch('nova.compute.rpcapi.ComputeAPI.remove_aggregate_host')
|
||||
@mock.patch.object(scheduler_client.SchedulerClient, 'update_aggregates')
|
||||
def test_remove_host_from_aggregate(self, update_aggregates):
|
||||
def test_remove_host_from_aggregate(self, update_aggregates,
|
||||
mock_remove_agg):
|
||||
self.api._update_az_cache_for_host = mock.Mock()
|
||||
agg = objects.Aggregate(name='fake', metadata={})
|
||||
agg.delete_host = mock.Mock()
|
||||
@ -10871,6 +10877,9 @@ class ComputeAPIAggrCallsSchedulerTestCase(test.NoDBTestCase):
|
||||
return_value=agg)):
|
||||
self.api.remove_host_from_aggregate(self.context, 1, 'fakehost')
|
||||
update_aggregates.assert_called_once_with(self.context, [agg])
|
||||
mock_remove_agg.assert_called_once_with(self.context, aggregate=agg,
|
||||
host_param='fakehost',
|
||||
host='fakehost')
|
||||
|
||||
|
||||
class ComputeAggrTestCase(BaseTestCase):
|
||||
|
@ -4528,6 +4528,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
|
||||
# revert_resize() and the return value is passed to driver.destroy().
|
||||
# Otherwise we could regress this.
|
||||
|
||||
@mock.patch('nova.compute.rpcapi.ComputeAPI.finish_revert_resize')
|
||||
@mock.patch.object(self.instance, 'revert_migration_context')
|
||||
@mock.patch.object(self.compute.network_api, 'get_instance_nw_info')
|
||||
@mock.patch.object(self.compute, '_is_instance_storage_shared')
|
||||
@ -4552,7 +4553,8 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
|
||||
finish_revert_resize,
|
||||
_is_instance_storage_shared,
|
||||
get_instance_nw_info,
|
||||
revert_migration_context):
|
||||
revert_migration_context,
|
||||
mock_finish_revert):
|
||||
|
||||
self.migration.source_compute = self.instance['host']
|
||||
|
||||
@ -4572,6 +4574,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
|
||||
# should not destroy disks otherwise it should destroy disks.
|
||||
destroy.assert_called_once_with(self.context, self.instance,
|
||||
mock.ANY, mock.ANY, not is_shared)
|
||||
mock_finish_revert.assert_called_once_with(
|
||||
self.context, self.instance, self.migration,
|
||||
self.migration.source_compute, mock.ANY)
|
||||
|
||||
do_test()
|
||||
|
||||
|
@ -52,6 +52,21 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
{'source_type': 'volume', 'destination_type': 'volume',
|
||||
'instance_uuid': self.fake_instance_obj.uuid,
|
||||
'volume_id': 'fake-volume-id'}))
|
||||
# FIXME(melwitt): Temporary while things have no mappings
|
||||
self.patcher1 = mock.patch('nova.objects.InstanceMapping.'
|
||||
'get_by_instance_uuid')
|
||||
self.patcher2 = mock.patch('nova.objects.HostMapping.get_by_host')
|
||||
mock_inst_mapping = self.patcher1.start()
|
||||
mock_host_mapping = self.patcher2.start()
|
||||
mock_inst_mapping.side_effect = exception.InstanceMappingNotFound(
|
||||
uuid=self.fake_instance_obj.uuid)
|
||||
mock_host_mapping.side_effect = exception.HostMappingNotFound(
|
||||
name=self.fake_instance_obj.host)
|
||||
|
||||
def tearDown(self):
|
||||
super(ComputeRpcAPITestCase, self).tearDown()
|
||||
self.patcher1.stop()
|
||||
self.patcher2.stop()
|
||||
|
||||
@mock.patch('nova.objects.Service.get_minimum_version')
|
||||
def test_auto_pin(self, mock_get_min):
|
||||
@ -59,7 +74,7 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
self.flags(compute='auto', group='upgrade_levels')
|
||||
compute_rpcapi.LAST_VERSION = None
|
||||
rpcapi = compute_rpcapi.ComputeAPI()
|
||||
self.assertEqual('4.4', rpcapi.client.version_cap)
|
||||
self.assertEqual('4.4', rpcapi.router.version_cap)
|
||||
mock_get_min.assert_called_once_with(mock.ANY, 'nova-compute')
|
||||
|
||||
@mock.patch('nova.objects.Service.get_minimum_version')
|
||||
@ -76,7 +91,7 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
self.flags(compute='auto', group='upgrade_levels')
|
||||
compute_rpcapi.LAST_VERSION = None
|
||||
rpcapi = compute_rpcapi.ComputeAPI()
|
||||
self.assertEqual('4.11', rpcapi.client.version_cap)
|
||||
self.assertEqual('4.11', rpcapi.router.version_cap)
|
||||
mock_get_min.assert_called_once_with(mock.ANY, 'nova-compute')
|
||||
self.assertIsNone(compute_rpcapi.LAST_VERSION)
|
||||
|
||||
@ -95,11 +110,15 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
|
||||
rpcapi = kwargs.pop('rpcapi_class', compute_rpcapi.ComputeAPI)()
|
||||
self.assertIsNotNone(rpcapi.client)
|
||||
self.assertEqual(rpcapi.client.target.topic, CONF.compute_topic)
|
||||
self.assertIsNotNone(rpcapi.router)
|
||||
self.assertEqual(rpcapi.router.target.topic, CONF.compute_topic)
|
||||
|
||||
orig_prepare = rpcapi.client.prepare
|
||||
base_version = rpcapi.client.target.version
|
||||
# This test wants to run the real prepare function, so must use
|
||||
# a real client object
|
||||
default_client = rpcapi.router.clients['default'].client
|
||||
|
||||
orig_prepare = default_client.prepare
|
||||
base_version = rpcapi.router.target.version
|
||||
expected_version = kwargs.pop('version', base_version)
|
||||
|
||||
expected_kwargs = kwargs.copy()
|
||||
@ -127,13 +146,13 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
expected_kwargs['scheduled_node'] = expected_kwargs.pop('node')
|
||||
|
||||
with test.nested(
|
||||
mock.patch.object(rpcapi.client, rpc_method),
|
||||
mock.patch.object(rpcapi.client, 'prepare'),
|
||||
mock.patch.object(rpcapi.client, 'can_send_version'),
|
||||
mock.patch.object(default_client, rpc_method),
|
||||
mock.patch.object(default_client, 'prepare'),
|
||||
mock.patch.object(default_client, 'can_send_version'),
|
||||
) as (
|
||||
rpc_mock, prepare_mock, csv_mock
|
||||
):
|
||||
prepare_mock.return_value = rpcapi.client
|
||||
prepare_mock.return_value = default_client
|
||||
if '_return_value' in kwargs:
|
||||
rpc_mock.return_value = kwargs.pop('_return_value')
|
||||
del expected_kwargs['_return_value']
|
||||
@ -206,10 +225,13 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
rpcapi = compute_rpcapi.ComputeAPI()
|
||||
cast_mock = mock.Mock()
|
||||
cctxt_mock = mock.Mock(cast=cast_mock)
|
||||
rpcapi.router.by_instance = mock.Mock()
|
||||
mock_client = mock.Mock()
|
||||
rpcapi.router.by_instance.return_value = mock_client
|
||||
with test.nested(
|
||||
mock.patch.object(rpcapi.client, 'can_send_version',
|
||||
mock.patch.object(mock_client, 'can_send_version',
|
||||
return_value=False),
|
||||
mock.patch.object(rpcapi.client, 'prepare',
|
||||
mock.patch.object(mock_client, 'prepare',
|
||||
return_value=cctxt_mock)
|
||||
) as (
|
||||
can_send_mock, prepare_mock
|
||||
@ -313,8 +335,9 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
version = '4.12'
|
||||
rpcapi = compute_rpcapi.ComputeAPI()
|
||||
rpcapi.router.by_host = mock.Mock()
|
||||
mock_client = mock.MagicMock()
|
||||
rpcapi.client = mock_client
|
||||
rpcapi.router.by_host.return_value = mock_client
|
||||
mock_client.can_send_version.return_value = True
|
||||
mock_cctx = mock.MagicMock()
|
||||
mock_client.prepare.return_value = mock_cctx
|
||||
@ -333,8 +356,9 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
version = '4.9'
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
rpcapi = compute_rpcapi.ComputeAPI()
|
||||
rpcapi.router.by_host = mock.Mock()
|
||||
mock_client = mock.MagicMock()
|
||||
rpcapi.client = mock_client
|
||||
rpcapi.router.by_host.return_value = mock_client
|
||||
mock_client.can_send_version.return_value = False
|
||||
mock_cctx = mock.MagicMock()
|
||||
mock_client.prepare.return_value = mock_cctx
|
||||
@ -600,33 +624,51 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
def _test_simple_call(self, method, inargs, callargs, callret,
|
||||
calltype='call', can_send=False):
|
||||
rpc = compute_rpcapi.ComputeAPI()
|
||||
mock_client = mock.Mock()
|
||||
rpc.router.by_instance = mock.Mock()
|
||||
rpc.router.by_instance.return_value = mock_client
|
||||
rpc.router.by_host = mock.Mock()
|
||||
rpc.router.by_host.return_value = mock_client
|
||||
|
||||
@mock.patch.object(rpc, 'client')
|
||||
@mock.patch.object(compute_rpcapi, '_compute_host')
|
||||
def _test(mock_ch, mock_client):
|
||||
def _test(mock_ch):
|
||||
mock_client.can_send_version.return_value = can_send
|
||||
call = getattr(mock_client.prepare.return_value, calltype)
|
||||
call.return_value = callret
|
||||
ctxt = context.RequestContext()
|
||||
result = getattr(rpc, method)(ctxt, **inargs)
|
||||
call.assert_called_once_with(ctxt, method, **callargs)
|
||||
# Get the target of the prepare call: prepare(server=<target>, ...)
|
||||
prepare_target = mock_client.prepare.call_args[1]['server']
|
||||
# If _compute_host(None, instance) was called, then by_instance
|
||||
# should have been called with the instance. Otherwise by_host
|
||||
# should have been called with the same host as the prepare target.
|
||||
if mock_ch.called and mock_ch.call_args[0][0] is None:
|
||||
instance = mock_ch.call_args[0][1]
|
||||
rpc.router.by_instance.assert_called_once_with(ctxt, instance)
|
||||
rpc.router.by_host.assert_not_called()
|
||||
else:
|
||||
rpc.router.by_host.assert_called_once_with(ctxt,
|
||||
prepare_target)
|
||||
rpc.router.by_instance.assert_not_called()
|
||||
return result
|
||||
|
||||
return _test()
|
||||
|
||||
def test_check_can_live_migrate_source_converts_objects(self):
|
||||
obj = migrate_data_obj.LiveMigrateData()
|
||||
inst = self.fake_instance_obj
|
||||
result = self._test_simple_call('check_can_live_migrate_source',
|
||||
inargs={'instance': 'foo',
|
||||
inargs={'instance': inst,
|
||||
'dest_check_data': obj},
|
||||
callargs={'instance': 'foo',
|
||||
callargs={'instance': inst,
|
||||
'dest_check_data': {}},
|
||||
callret=obj)
|
||||
self.assertEqual(obj, result)
|
||||
result = self._test_simple_call('check_can_live_migrate_source',
|
||||
inargs={'instance': 'foo',
|
||||
inargs={'instance': inst,
|
||||
'dest_check_data': obj},
|
||||
callargs={'instance': 'foo',
|
||||
callargs={'instance': inst,
|
||||
'dest_check_data': {}},
|
||||
callret={'foo': 'bar'})
|
||||
self.assertIsInstance(result, migrate_data_obj.LiveMigrateData)
|
||||
@ -635,12 +677,13 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
'detect_implementation')
|
||||
def test_check_can_live_migrate_destination_converts_dict(self,
|
||||
mock_det):
|
||||
inst = self.fake_instance_obj
|
||||
result = self._test_simple_call('check_can_live_migrate_destination',
|
||||
inargs={'instance': 'foo',
|
||||
inargs={'instance': inst,
|
||||
'destination': 'bar',
|
||||
'block_migration': False,
|
||||
'disk_over_commit': False},
|
||||
callargs={'instance': 'foo',
|
||||
callargs={'instance': inst,
|
||||
'block_migration': False,
|
||||
'disk_over_commit': False},
|
||||
callret={'foo': 'bar'})
|
||||
@ -648,14 +691,15 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
|
||||
def test_live_migration_converts_objects(self):
|
||||
obj = migrate_data_obj.LiveMigrateData()
|
||||
inst = self.fake_instance_obj
|
||||
self._test_simple_call('live_migration',
|
||||
inargs={'instance': 'foo',
|
||||
inargs={'instance': inst,
|
||||
'dest': 'foo',
|
||||
'block_migration': False,
|
||||
'host': 'foo',
|
||||
'migration': None,
|
||||
'migrate_data': obj},
|
||||
callargs={'instance': 'foo',
|
||||
callargs={'instance': inst,
|
||||
'dest': 'foo',
|
||||
'block_migration': False,
|
||||
'migrate_data': {
|
||||
@ -666,13 +710,14 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
@mock.patch('nova.objects.migrate_data.LiveMigrateData.from_legacy_dict')
|
||||
def test_pre_live_migration_converts_objects(self, mock_fld):
|
||||
obj = migrate_data_obj.LiveMigrateData()
|
||||
inst = self.fake_instance_obj
|
||||
result = self._test_simple_call('pre_live_migration',
|
||||
inargs={'instance': 'foo',
|
||||
inargs={'instance': inst,
|
||||
'block_migration': False,
|
||||
'disk': None,
|
||||
'host': 'foo',
|
||||
'migrate_data': obj},
|
||||
callargs={'instance': 'foo',
|
||||
callargs={'instance': inst,
|
||||
'block_migration': False,
|
||||
'disk': None,
|
||||
'migrate_data': {}},
|
||||
@ -680,12 +725,12 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
self.assertFalse(mock_fld.called)
|
||||
self.assertEqual(obj, result)
|
||||
result = self._test_simple_call('pre_live_migration',
|
||||
inargs={'instance': 'foo',
|
||||
inargs={'instance': inst,
|
||||
'block_migration': False,
|
||||
'disk': None,
|
||||
'host': 'foo',
|
||||
'migrate_data': obj},
|
||||
callargs={'instance': 'foo',
|
||||
callargs={'instance': inst,
|
||||
'block_migration': False,
|
||||
'disk': None,
|
||||
'migrate_data': {}},
|
||||
@ -696,13 +741,14 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
|
||||
def test_rollback_live_migration_at_destination_converts_objects(self):
|
||||
obj = migrate_data_obj.LiveMigrateData()
|
||||
inst = self.fake_instance_obj
|
||||
method = 'rollback_live_migration_at_destination'
|
||||
self._test_simple_call(method,
|
||||
inargs={'instance': 'foo',
|
||||
inargs={'instance': inst,
|
||||
'host': 'foo',
|
||||
'destroy_disks': False,
|
||||
'migrate_data': obj},
|
||||
callargs={'instance': 'foo',
|
||||
callargs={'instance': inst,
|
||||
'destroy_disks': False,
|
||||
'migrate_data': {}},
|
||||
callret=None,
|
||||
|
@ -279,37 +279,14 @@ class ContextTestCase(test.NoDBTestCase):
|
||||
mock.sentinel.target)
|
||||
|
||||
@mock.patch('nova.db.create_context_manager')
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
def test_target_cell(self, mock_create_transport, mock_create_ctxt_mgr):
|
||||
def test_target_cell(self, mock_create_ctxt_mgr):
|
||||
mock_create_ctxt_mgr.return_value = mock.sentinel.cm
|
||||
mock_create_transport.return_value = mock.sentinel.tp
|
||||
ctxt = context.RequestContext('111',
|
||||
'222',
|
||||
roles=['admin', 'weasel'])
|
||||
# Verify the existing db_connection, if any, is restored
|
||||
ctxt.db_connection = mock.sentinel.db_conn
|
||||
ctxt.mq_connection = mock.sentinel.mq_conn
|
||||
mapping = objects.CellMapping(database_connection='fake://',
|
||||
transport_url='anotherfake://')
|
||||
mapping = objects.CellMapping(database_connection='fake://')
|
||||
with context.target_cell(ctxt, mapping):
|
||||
self.assertEqual(ctxt.db_connection, mock.sentinel.cm)
|
||||
self.assertEqual(ctxt.mq_connection, mock.sentinel.tp)
|
||||
self.assertEqual(mock.sentinel.db_conn, ctxt.db_connection)
|
||||
self.assertEqual(mock.sentinel.mq_conn, ctxt.mq_connection)
|
||||
mock_create_transport.assert_called_once_with(mapping.transport_url)
|
||||
|
||||
@mock.patch('nova.db.create_context_manager')
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
def test_target_cell_transport_url_sentinel(self, mock_create_transport,
|
||||
mock_create_ctxt_mgr):
|
||||
mock_create_ctxt_mgr.return_value = mock.sentinel.cm
|
||||
mock_create_transport.return_value = mock.sentinel.tp
|
||||
ctxt = context.RequestContext('111',
|
||||
'222',
|
||||
roles=['admin', 'weasel'])
|
||||
mapping = objects.CellMapping(database_connection='fake://',
|
||||
transport_url='none://')
|
||||
with context.target_cell(ctxt, mapping):
|
||||
self.assertEqual(ctxt.db_connection, mock.sentinel.cm)
|
||||
self.assertIsNone(ctxt.mq_connection)
|
||||
self.assertFalse(mock_create_transport.called)
|
||||
|
@ -12,16 +12,21 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import copy
|
||||
import datetime
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
import oslo_messaging as messaging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import fixture as utils_fixture
|
||||
import testtools
|
||||
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova import objects
|
||||
from nova import rpc
|
||||
from nova import test
|
||||
from nova.tests import uuidsentinel as uuids
|
||||
|
||||
|
||||
# Make a class that resets all of the global variables in nova.rpc
|
||||
@ -49,53 +54,6 @@ class TestRPC(testtools.TestCase):
|
||||
super(TestRPC, self).setUp()
|
||||
self.useFixture(RPCResetFixture())
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_cell_client(self, mock_rpcclient):
|
||||
default_client = mock.Mock()
|
||||
dynamic_client = mock.Mock()
|
||||
mock_rpcclient.return_value = dynamic_client
|
||||
ctxt = mock.Mock()
|
||||
ctxt.mq_connection = 'fake://'
|
||||
|
||||
class FakeAPI(object):
|
||||
def __init__(self):
|
||||
self.client = default_client
|
||||
|
||||
def rpc_api_function(self, context):
|
||||
rpc.get_cell_client(context, self.client).do_rpc()
|
||||
|
||||
rpcapi = FakeAPI()
|
||||
rpcapi.rpc_api_function(ctxt)
|
||||
# verify a dynamic client was created
|
||||
mock_rpcclient.assert_called_once_with(
|
||||
ctxt.mq_connection, default_client.target,
|
||||
version_cap=default_client.version_cap,
|
||||
serializer=default_client.serializer)
|
||||
# verify dynamic client handled the rpc
|
||||
dynamic_client.do_rpc.assert_called_once_with()
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_cell_client_no_switch(self, mock_rpcclient):
|
||||
default_client = mock.Mock()
|
||||
dynamic_client = mock.Mock()
|
||||
mock_rpcclient.return_value = dynamic_client
|
||||
ctxt = mock.Mock()
|
||||
ctxt.mq_connection = None
|
||||
|
||||
class FakeAPI(object):
|
||||
def __init__(self):
|
||||
self.client = default_client
|
||||
|
||||
def rpc_api_function(self, context):
|
||||
rpc.get_cell_client(context, self.client).do_rpc()
|
||||
|
||||
rpcapi = FakeAPI()
|
||||
rpcapi.rpc_api_function(ctxt)
|
||||
# verify a dynamic client was not created
|
||||
self.assertFalse(mock_rpcclient.called)
|
||||
# verify default client handled the rpc
|
||||
default_client.do_rpc.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(rpc, 'get_allowed_exmods')
|
||||
@mock.patch.object(rpc, 'RequestContextSerializer')
|
||||
@mock.patch.object(messaging, 'get_transport')
|
||||
@ -402,3 +360,182 @@ class TestRequestContextSerializer(test.NoDBTestCase):
|
||||
self.ser.deserialize_context('context')
|
||||
|
||||
mock_req.from_dict.assert_called_once_with('context')
|
||||
|
||||
|
||||
class TestClientRouter(test.NoDBTestCase):
|
||||
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_instance(self, mock_rpcclient, mock_create, mock_get):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
cm = objects.CellMapping(uuid=uuids.cell_mapping,
|
||||
transport_url='fake:///')
|
||||
mock_get.return_value = objects.InstanceMapping(cell_mapping=cm)
|
||||
instance = objects.Instance(uuid=uuids.instance)
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_instance(ctxt, instance)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, instance.uuid)
|
||||
# verify a client was created by ClientRouter
|
||||
mock_rpcclient.assert_called_once_with(
|
||||
mock_create.return_value, default_client.target,
|
||||
version_cap=default_client.version_cap,
|
||||
serializer=default_client.serializer)
|
||||
# verify cell client was returned
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
# reset and check that cached client is returned the second time
|
||||
mock_rpcclient.reset_mock()
|
||||
mock_create.reset_mock()
|
||||
mock_get.reset_mock()
|
||||
|
||||
client = router.by_instance(ctxt, instance)
|
||||
mock_get.assert_called_once_with(ctxt, instance.uuid)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
@mock.patch('nova.objects.HostMapping.get_by_host')
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_host(self, mock_rpcclient, mock_create, mock_get):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
cm = objects.CellMapping(uuid=uuids.cell_mapping,
|
||||
transport_url='fake:///')
|
||||
mock_get.return_value = objects.HostMapping(cell_mapping=cm)
|
||||
host = 'fake-host'
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_host(ctxt, host)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, host)
|
||||
# verify a client was created by ClientRouter
|
||||
mock_rpcclient.assert_called_once_with(
|
||||
mock_create.return_value, default_client.target,
|
||||
version_cap=default_client.version_cap,
|
||||
serializer=default_client.serializer)
|
||||
# verify cell client was returned
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
# reset and check that cached client is returned the second time
|
||||
mock_rpcclient.reset_mock()
|
||||
mock_create.reset_mock()
|
||||
mock_get.reset_mock()
|
||||
|
||||
client = router.by_host(ctxt, host)
|
||||
mock_get.assert_called_once_with(ctxt, host)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid',
|
||||
side_effect=exception.InstanceMappingNotFound(uuid=uuids.instance))
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_instance_not_found(self, mock_rpcclient, mock_create,
|
||||
mock_get):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
instance = objects.Instance(uuid=uuids.instance)
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_instance(ctxt, instance)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, instance.uuid)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
# verify default client was returned
|
||||
self.assertEqual(default_client, client)
|
||||
|
||||
@mock.patch('nova.objects.HostMapping.get_by_host',
|
||||
side_effect=exception.HostMappingNotFound(name='fake-host'))
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_host_not_found(self, mock_rpcclient, mock_create, mock_get):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
host = 'fake-host'
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_host(ctxt, host)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, host)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
# verify default client was returned
|
||||
self.assertEqual(default_client, client)
|
||||
|
||||
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_remove_stale_clients(self, mock_rpcclient, mock_create, mock_get):
|
||||
t0 = datetime.datetime(2016, 8, 9, 0, 0, 0)
|
||||
time_fixture = self.useFixture(utils_fixture.TimeFixture(t0))
|
||||
|
||||
default_client = mock.Mock()
|
||||
ctxt = mock.Mock()
|
||||
|
||||
cm1 = objects.CellMapping(uuid=uuids.cell_mapping1,
|
||||
transport_url='fake:///')
|
||||
cm2 = objects.CellMapping(uuid=uuids.cell_mapping2,
|
||||
transport_url='fake:///')
|
||||
cm3 = objects.CellMapping(uuid=uuids.cell_mapping3,
|
||||
transport_url='fake:///')
|
||||
mock_get.side_effect = [objects.InstanceMapping(cell_mapping=cm1),
|
||||
objects.InstanceMapping(cell_mapping=cm2),
|
||||
objects.InstanceMapping(cell_mapping=cm3),
|
||||
objects.InstanceMapping(cell_mapping=cm3)]
|
||||
instance1 = objects.Instance(uuid=uuids.instance1)
|
||||
instance2 = objects.Instance(uuid=uuids.instance2)
|
||||
instance3 = objects.Instance(uuid=uuids.instance3)
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
cell1_client = router.by_instance(ctxt, instance1)
|
||||
cell2_client = router.by_instance(ctxt, instance2)
|
||||
|
||||
# default client, cell1 client, cell2 client
|
||||
self.assertEqual(3, len(router.clients))
|
||||
expected = {'default': default_client,
|
||||
uuids.cell_mapping1: cell1_client,
|
||||
uuids.cell_mapping2: cell2_client}
|
||||
for client_id, client in expected.items():
|
||||
self.assertEqual(client, router.clients[client_id].client)
|
||||
|
||||
# expire cell1 client and cell2 client
|
||||
time_fixture.advance_time_seconds(80)
|
||||
|
||||
# add cell3 client
|
||||
cell3_client = router.by_instance(ctxt, instance3)
|
||||
|
||||
router._remove_stale_clients(ctxt)
|
||||
|
||||
# default client, cell3 client
|
||||
expected = {'default': default_client,
|
||||
uuids.cell_mapping3: cell3_client}
|
||||
self.assertEqual(2, len(router.clients))
|
||||
for client_id, client in expected.items():
|
||||
self.assertEqual(client, router.clients[client_id].client)
|
||||
|
||||
# expire cell3 client
|
||||
time_fixture.advance_time_seconds(80)
|
||||
|
||||
# access cell3 client to refresh it
|
||||
cell3_client = router.by_instance(ctxt, instance3)
|
||||
|
||||
router._remove_stale_clients(ctxt)
|
||||
|
||||
# default client and cell3 client should be there
|
||||
self.assertEqual(2, len(router.clients))
|
||||
for client_id, client in expected.items():
|
||||
self.assertEqual(client, router.clients[client_id].client)
|
||||
|
Loading…
Reference in New Issue
Block a user