Add message queue switching through RequestContext

This adds message queue connection information to the RequestContext
which can be used by nova-api to communicate with a targeted cell
message queue with each query.

A function 'get_cell_client' can be called in rpc functions to enable them
to use message queue transport information from a RequestContext. The
function creates a rpc client object dynamically if message queue
connection information is found in the RequestContext and falls back on
the default rpc client.

Example usage:

    def get_cell_client(self, context):
        return rpc.get_cell_client(context, self.client)

    def build_and_run_instances(self, ctxt, instance, host, image, ...)
        cctxt = self.get_cell_client(ctxt).prepare(...)
        cctxt.cast(...)

Implements blueprint cells-mq-connection-switching

Change-Id: Idef670d5b73c9cef8501a0593eccd785b708bd2b
This commit is contained in:
melanie witt 2016-02-02 00:40:31 +00:00 committed by melanie witt
parent 0458f3e78e
commit bdf984a7ce
7 changed files with 284 additions and 139 deletions

View File

@ -374,6 +374,9 @@ class ComputeAPI(object):
version_cap=version_cap,
serializer=serializer)
def get_cell_client(self, context):
return rpc.get_cell_client(context, self.client)
def add_aggregate_host(self, ctxt, aggregate, host_param, host,
slave_info=None):
'''Add aggregate host.
@ -385,44 +388,46 @@ class ComputeAPI(object):
:param host: This is the host to send the message to.
'''
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'add_aggregate_host',
aggregate=aggregate, host=host_param,
slave_info=slave_info)
def add_fixed_ip_to_instance(self, ctxt, instance, network_id):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'add_fixed_ip_to_instance',
instance=instance, network_id=network_id)
def attach_interface(self, ctxt, instance, network_id, port_id,
requested_ip):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'attach_interface',
instance=instance, network_id=network_id,
port_id=port_id, requested_ip=requested_ip)
def attach_volume(self, ctxt, instance, bdm):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'attach_volume', instance=instance, bdm=bdm)
def change_instance_metadata(self, ctxt, instance, diff):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'change_instance_metadata',
instance=instance, diff=diff)
def check_can_live_migrate_destination(self, ctxt, instance, destination,
block_migration, disk_over_commit):
version = '4.11'
if not self.client.can_send_version(version):
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
# NOTE(eliqiao): This is a new feature that is only available
# once all compute nodes support at least version 4.11.
# This means the new REST API that supports this needs to handle
@ -433,7 +438,7 @@ class ComputeAPI(object):
else:
version = '4.0'
cctxt = self.client.prepare(server=destination, version=version)
cctxt = cell_client.prepare(server=destination, version=version)
result = cctxt.call(ctxt, 'check_can_live_migrate_destination',
instance=instance,
block_migration=block_migration,
@ -449,12 +454,13 @@ class ComputeAPI(object):
def check_can_live_migrate_source(self, ctxt, instance, dest_check_data):
dest_check_data_obj = dest_check_data
version = '4.8'
if not self.client.can_send_version(version):
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
version = '4.0'
if dest_check_data:
dest_check_data = dest_check_data.to_legacy_dict()
source = _compute_host(None, instance)
cctxt = self.client.prepare(server=source, version=version)
cctxt = cell_client.prepare(server=source, version=version)
result = cctxt.call(ctxt, 'check_can_live_migrate_source',
instance=instance,
dest_check_data=dest_check_data)
@ -468,8 +474,8 @@ class ComputeAPI(object):
def check_instance_shared_storage(self, ctxt, instance, data, host=None):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(host, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(host, instance), version=version)
return cctxt.call(ctxt, 'check_instance_shared_storage',
instance=instance,
data=data)
@ -477,8 +483,8 @@ class ComputeAPI(object):
def confirm_resize(self, ctxt, instance, migration, host,
reservations=None, cast=True):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(host, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(host, instance), version=version)
rpc_method = cctxt.cast if cast else cctxt.call
return rpc_method(ctxt, 'confirm_resize',
instance=instance, migration=migration,
@ -486,18 +492,19 @@ class ComputeAPI(object):
def detach_interface(self, ctxt, instance, port_id):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'detach_interface',
instance=instance, port_id=port_id)
def detach_volume(self, ctxt, instance, volume_id, attachment_id=None):
extra = {'attachment_id': attachment_id}
version = '4.7'
if not self.client.can_send_version(version):
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
version = '4.0'
extra.pop('attachment_id')
cctxt = self.client.prepare(server=_compute_host(None, instance),
cctxt = cell_client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'detach_volume',
instance=instance, volume_id=volume_id, **extra)
@ -505,7 +512,8 @@ class ComputeAPI(object):
def finish_resize(self, ctxt, instance, migration, image, disk_info,
host, reservations=None):
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'finish_resize',
instance=instance, migration=migration,
image=image, disk_info=disk_info, reservations=reservations)
@ -513,33 +521,36 @@ class ComputeAPI(object):
def finish_revert_resize(self, ctxt, instance, migration, host,
reservations=None):
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'finish_revert_resize',
instance=instance, migration=migration,
reservations=reservations)
def get_console_output(self, ctxt, instance, tail_length):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_console_output',
instance=instance, tail_length=tail_length)
def get_console_pool_info(self, ctxt, console_type, host):
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'get_console_pool_info',
console_type=console_type)
def get_console_topic(self, ctxt, host):
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'get_console_topic')
def get_diagnostics(self, ctxt, instance):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_diagnostics', instance=instance)
def get_instance_diagnostics(self, ctxt, instance):
@ -547,49 +558,49 @@ class ComputeAPI(object):
instance_p = jsonutils.to_primitive(instance)
kwargs = {'instance': instance_p}
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_instance_diagnostics', **kwargs)
def get_vnc_console(self, ctxt, instance, console_type):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_vnc_console',
instance=instance, console_type=console_type)
def get_spice_console(self, ctxt, instance, console_type):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_spice_console',
instance=instance, console_type=console_type)
def get_rdp_console(self, ctxt, instance, console_type):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_rdp_console',
instance=instance, console_type=console_type)
def get_mks_console(self, ctxt, instance, console_type):
version = '4.3'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_mks_console',
instance=instance, console_type=console_type)
def get_serial_console(self, ctxt, instance, console_type):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'get_serial_console',
instance=instance, console_type=console_type)
def validate_console_port(self, ctxt, instance, port, console_type):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'validate_console_port',
instance=instance, port=port,
console_type=console_type)
@ -604,34 +615,37 @@ class ComputeAPI(object):
:param host: This is the host to send the message to.
'''
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'host_maintenance_mode',
host=host_param, mode=mode)
def host_power_action(self, ctxt, action, host):
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'host_power_action', action=action)
def inject_network_info(self, ctxt, instance):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'inject_network_info', instance=instance)
def live_migration(self, ctxt, instance, dest, block_migration, host,
migration, migrate_data=None):
args = {'migration': migration}
version = '4.8'
if not self.client.can_send_version(version):
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
version = '4.2'
if migrate_data:
migrate_data = migrate_data.to_legacy_dict(
pre_migration_result=True)
if not self.client.can_send_version(version):
if not cell_client.can_send_version(version):
version = '4.0'
args.pop('migration')
cctxt = self.client.prepare(server=host, version=version)
cctxt = cell_client.prepare(server=host, version=version)
cctxt.cast(ctxt, 'live_migration', instance=instance,
dest=dest, block_migration=block_migration,
migrate_data=migrate_data, **args)
@ -642,28 +656,29 @@ class ComputeAPI(object):
if not self.client.can_send_version(version):
version = '4.9'
kwargs['migration_id'] = migration.id
cctxt = self.client.prepare(server=_compute_host(
cctxt = self.get_cell_client(ctxt).prepare(server=_compute_host(
migration.source_compute, instance), version=version)
cctxt.cast(ctxt, 'live_migration_force_complete', instance=instance,
**kwargs)
def live_migration_abort(self, ctxt, instance, migration_id):
version = '4.10'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'live_migration_abort', instance=instance,
migration_id=migration_id)
def pause_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'pause_instance', instance=instance)
def post_live_migration_at_destination(self, ctxt, instance,
block_migration, host):
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'post_live_migration_at_destination',
instance=instance, block_migration=block_migration)
@ -671,11 +686,12 @@ class ComputeAPI(object):
host, migrate_data=None):
migrate_data_orig = migrate_data
version = '4.8'
if not self.client.can_send_version(version):
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
version = '4.0'
if migrate_data:
migrate_data = migrate_data.to_legacy_dict()
cctxt = self.client.prepare(server=host, version=version)
cctxt = cell_client.prepare(server=host, version=version)
result = cctxt.call(ctxt, 'pre_live_migration',
instance=instance,
block_migration=block_migration,
@ -703,18 +719,19 @@ class ComputeAPI(object):
'node': node,
'clean_shutdown': clean_shutdown}
version = '4.1'
if not self.client.can_send_version(version):
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
version = '4.0'
msg_args['instance_type'] = objects_base.obj_to_primitive(
instance_type)
cctxt = self.client.prepare(server=host, version=version)
cctxt = cell_client.prepare(server=host, version=version)
cctxt.cast(ctxt, 'prep_resize', **msg_args)
def reboot_instance(self, ctxt, instance, block_device_info,
reboot_type):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'reboot_instance',
instance=instance,
block_device_info=block_device_info,
@ -732,12 +749,13 @@ class ComputeAPI(object):
'scheduled_node': node,
'limits': limits}
version = '4.5'
if not self.client.can_send_version(version):
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
version = '4.0'
extra.pop('migration')
extra.pop('scheduled_node')
extra.pop('limits')
cctxt = self.client.prepare(server=_compute_host(host, instance),
cctxt = cell_client.prepare(server=_compute_host(host, instance),
version=version)
cctxt.cast(ctxt, 'rebuild_instance',
instance=instance, new_pass=new_pass,
@ -758,21 +776,23 @@ class ComputeAPI(object):
:param host: This is the host to send the message to.
'''
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'remove_aggregate_host',
aggregate=aggregate, host=host_param,
slave_info=slave_info)
def remove_fixed_ip_from_instance(self, ctxt, instance, address):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'remove_fixed_ip_from_instance',
instance=instance, address=address)
def remove_volume_connection(self, ctxt, volume_id, instance, host):
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'remove_volume_connection',
instance=instance, volume_id=volume_id)
@ -784,14 +804,14 @@ class ComputeAPI(object):
'rescue_image_ref': rescue_image_ref,
'instance': instance,
}
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'rescue_instance', **msg_args)
def reset_network(self, ctxt, instance):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'reset_network', instance=instance)
def resize_instance(self, ctxt, instance, migration, image, instance_type,
@ -802,25 +822,26 @@ class ComputeAPI(object):
'clean_shutdown': clean_shutdown,
}
version = '4.1'
if not self.client.can_send_version(version):
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
msg_args['instance_type'] = objects_base.obj_to_primitive(
instance_type)
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
cctxt = cell_client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'resize_instance', **msg_args)
def resume_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'resume_instance', instance=instance)
def revert_resize(self, ctxt, instance, migration, host,
reservations=None):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(host, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(host, instance), version=version)
cctxt.cast(ctxt, 'revert_resize',
instance=instance, migration=migration,
reservations=reservations)
@ -829,40 +850,43 @@ class ComputeAPI(object):
destroy_disks=True,
migrate_data=None):
version = '4.8'
if not self.client.can_send_version(version):
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
version = '4.0'
if migrate_data:
migrate_data = migrate_data.to_legacy_dict()
extra = {'destroy_disks': destroy_disks,
'migrate_data': migrate_data,
}
cctxt = self.client.prepare(server=host, version=version)
cctxt = cell_client.prepare(server=host, version=version)
cctxt.cast(ctxt, 'rollback_live_migration_at_destination',
instance=instance, **extra)
def set_admin_password(self, ctxt, instance, new_pass):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'set_admin_password',
instance=instance, new_pass=new_pass)
def set_host_enabled(self, ctxt, enabled, host):
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'set_host_enabled', enabled=enabled)
def swap_volume(self, ctxt, instance, old_volume_id, new_volume_id):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'swap_volume',
instance=instance, old_volume_id=old_volume_id,
new_volume_id=new_volume_id)
def get_host_uptime(self, ctxt, host):
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
return cctxt.call(ctxt, 'get_host_uptime')
def reserve_block_device_name(self, ctxt, instance, device, volume_id,
@ -872,15 +896,15 @@ class ComputeAPI(object):
'device_type': device_type}
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'reserve_block_device_name', **kw)
def backup_instance(self, ctxt, instance, image_id, backup_type,
rotation):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'backup_instance',
instance=instance,
image_id=image_id,
@ -889,31 +913,31 @@ class ComputeAPI(object):
def snapshot_instance(self, ctxt, instance, image_id):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'snapshot_instance',
instance=instance,
image_id=image_id)
def start_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'start_instance', instance=instance)
def stop_instance(self, ctxt, instance, do_cast=True, clean_shutdown=True):
msg_args = {'instance': instance,
'clean_shutdown': clean_shutdown}
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
rpc_method = cctxt.cast if do_cast else cctxt.call
return rpc_method(ctxt, 'stop_instance', **msg_args)
def suspend_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'suspend_instance', instance=instance)
def terminate_instance(self, ctxt, instance, bdms, reservations=None,
@ -922,35 +946,35 @@ class ComputeAPI(object):
# the method signature has to match with `terminate_instance()`
# method of cells rpcapi.
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'terminate_instance',
instance=instance, bdms=bdms,
reservations=reservations)
def unpause_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'unpause_instance', instance=instance)
def unrescue_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'unrescue_instance', instance=instance)
def soft_delete_instance(self, ctxt, instance, reservations=None):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'soft_delete_instance',
instance=instance, reservations=reservations)
def restore_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'restore_instance', instance=instance)
def shelve_instance(self, ctxt, instance, image_id=None,
@ -958,16 +982,16 @@ class ComputeAPI(object):
msg_args = {'instance': instance, 'image_id': image_id,
'clean_shutdown': clean_shutdown}
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'shelve_instance', **msg_args)
def shelve_offload_instance(self, ctxt, instance,
clean_shutdown=True):
msg_args = {'instance': instance, 'clean_shutdown': clean_shutdown}
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'shelve_offload_instance', **msg_args)
def unshelve_instance(self, ctxt, instance, host, image=None,
@ -979,28 +1003,29 @@ class ComputeAPI(object):
'filter_properties': filter_properties,
'node': node,
}
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'unshelve_instance', **msg_kwargs)
def volume_snapshot_create(self, ctxt, instance, volume_id,
create_info):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'volume_snapshot_create', instance=instance,
volume_id=volume_id, create_info=create_info)
def volume_snapshot_delete(self, ctxt, instance, volume_id, snapshot_id,
delete_info):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'volume_snapshot_delete', instance=instance,
volume_id=volume_id, snapshot_id=snapshot_id,
delete_info=delete_info)
def external_instance_event(self, ctxt, instances, events):
cctxt = self.client.prepare(
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instances[0]),
version='4.0')
cctxt.cast(ctxt, 'external_instance_event', instances=instances,
@ -1012,7 +1037,8 @@ class ComputeAPI(object):
block_device_mapping=None, node=None, limits=None):
version = '4.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=host, version=version)
cctxt.cast(ctxt, 'build_and_run_instance', instance=instance,
image=image, request_spec=request_spec,
filter_properties=filter_properties,
@ -1025,33 +1051,35 @@ class ComputeAPI(object):
def quiesce_instance(self, ctxt, instance):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
return cctxt.call(ctxt, 'quiesce_instance', instance=instance)
def unquiesce_instance(self, ctxt, instance, mapping=None):
version = '4.0'
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt = self.get_cell_client(ctxt).prepare(
server=_compute_host(None, instance), version=version)
cctxt.cast(ctxt, 'unquiesce_instance', instance=instance,
mapping=mapping)
def refresh_instance_security_rules(self, ctxt, host, instance):
version = '4.4'
if not self.client.can_send_version(version):
cell_client = self.get_cell_client(ctxt)
if not cell_client.can_send_version(version):
version = '4.0'
instance = objects_base.obj_to_primitive(instance)
cctxt = self.client.prepare(server=_compute_host(None, instance),
cctxt = cell_client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'refresh_instance_security_rules',
instance=instance)
def trigger_crash_dump(self, ctxt, instance):
version = '4.6'
cell_client = self.get_cell_client(ctxt)
if not self.client.can_send_version(version):
if not cell_client.can_send_version(version):
raise exception.TriggerCrashDumpNotSupported()
cctxt = self.client.prepare(server=_compute_host(None, instance),
cctxt = cell_client.prepare(server=_compute_host(None, instance),
version=version)
return cctxt.cast(ctxt, "trigger_crash_dump", instance=instance)

View File

@ -143,11 +143,13 @@ class RequestContext(context.RequestContext):
self.project_name = project_name
self.is_admin = is_admin
# NOTE(dheeraj): The following attribute is used by cellsv2 to store
# NOTE(dheeraj): The following attributes are used by cellsv2 to store
# connection information for connecting to the target cell.
# It is only manipulated using the target_cell contextmanager
# provided by this module
self.db_connection = None
self.mq_connection = None
self.user_auth_plugin = user_auth_plugin
if self.is_admin is None:
self.is_admin = policy.check_is_admin(self)
@ -282,19 +284,26 @@ def authorize_quota_class_context(context, class_name):
@contextmanager
def target_cell(context, cell_mapping):
"""Adds database connection information to the context for communicating
with the given target cell.
"""Adds database and message queue connection information to the context
for communicating with the given target cell.
:param context: The RequestContext to add database connection information
:param context: The RequestContext to add connection information
:param cell_mapping: A objects.CellMapping object
"""
original_db_connection = context.db_connection
# avoid circular import
original_mq_connection = context.mq_connection
# avoid circular imports
from nova import db
connection_string = cell_mapping.database_connection
context.db_connection = db.create_context_manager(connection_string)
from nova import rpc
db_connection_string = cell_mapping.database_connection
context.db_connection = db.create_context_manager(db_connection_string)
# NOTE(melwitt): none:// url is a special value meaning do not switch
if not cell_mapping.transport_url.startswith('none'):
transport_url = cell_mapping.transport_url
context.mq_connection = rpc.create_transport(transport_url)
try:
yield context
finally:
context.db_connection = original_db_connection
context.mq_connection = original_mq_connection

View File

@ -65,6 +65,20 @@ TRANSPORT_ALIASES = {
}
def get_cell_client(context, default_client):
"""Get a RPCClient object based on a RequestContext.
:param context: The RequestContext that can contain a Transport
:param default_client: The default RPCClient
"""
if context.mq_connection:
return messaging.RPCClient(
context.mq_connection, default_client.target,
version_cap=default_client.version_cap,
serializer=default_client.serializer)
return default_client
def init(conf):
global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
exmods = get_allowed_exmods()
@ -185,6 +199,14 @@ def get_versioned_notifier(publisher_id):
return NOTIFIER.prepare(publisher_id=publisher_id)
def create_transport(url):
exmods = get_allowed_exmods()
return messaging.get_transport(CONF,
url=url,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
class LegacyValidatingNotifier(object):
"""Wraps an oslo.messaging Notifier and checks for allowed event_types."""

View File

@ -40,9 +40,13 @@ class ConnectionSwitchTestCase(test.TestCase):
pass
def test_connection_switch(self):
# Make a request context with a cell mapping
mapping = objects.CellMapping(database_connection=self.fake_conn)
ctxt = context.RequestContext('fake-user', 'fake-project')
# Make a request context with a cell mapping
mapping = objects.CellMapping(context=ctxt,
uuid=uuidutils.generate_uuid(),
database_connection=self.fake_conn,
transport_url='none:///')
mapping.create()
# Create an instance in the cell database
uuid = uuidutils.generate_uuid()
with context.target_cell(ctxt, mapping):

View File

@ -606,7 +606,7 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
mock_client.can_send_version.return_value = can_send
call = getattr(mock_client.prepare.return_value, calltype)
call.return_value = callret
ctxt = mock.MagicMock()
ctxt = context.RequestContext()
result = getattr(rpc, method)(ctxt, **inargs)
call.assert_called_once_with(ctxt, method, **callargs)
return result

View File

@ -229,14 +229,37 @@ class ContextTestCase(test.NoDBTestCase):
self.assertEqual(values, values2)
@mock.patch('nova.db.create_context_manager')
def test_target_cell(self, mock_create_ctxt_mgr):
@mock.patch('nova.rpc.create_transport')
def test_target_cell(self, mock_create_transport, mock_create_ctxt_mgr):
mock_create_ctxt_mgr.return_value = mock.sentinel.cm
mock_create_transport.return_value = mock.sentinel.tp
ctxt = context.RequestContext('111',
'222',
roles=['admin', 'weasel'])
# Verify the existing db_connection, if any, is restored
ctxt.db_connection = mock.sentinel.db_conn
mapping = objects.CellMapping(database_connection='fake://')
ctxt.mq_connection = mock.sentinel.mq_conn
mapping = objects.CellMapping(database_connection='fake://',
transport_url='anotherfake://')
with context.target_cell(ctxt, mapping):
self.assertEqual(ctxt.db_connection, mock.sentinel.cm)
self.assertEqual(ctxt.mq_connection, mock.sentinel.tp)
self.assertEqual(mock.sentinel.db_conn, ctxt.db_connection)
self.assertEqual(mock.sentinel.mq_conn, ctxt.mq_connection)
mock_create_transport.assert_called_once_with(mapping.transport_url)
@mock.patch('nova.db.create_context_manager')
@mock.patch('nova.rpc.create_transport')
def test_target_cell_transport_url_sentinel(self, mock_create_transport,
mock_create_ctxt_mgr):
mock_create_ctxt_mgr.return_value = mock.sentinel.cm
mock_create_transport.return_value = mock.sentinel.tp
ctxt = context.RequestContext('111',
'222',
roles=['admin', 'weasel'])
mapping = objects.CellMapping(database_connection='fake://',
transport_url='none://')
with context.target_cell(ctxt, mapping):
self.assertEqual(ctxt.db_connection, mock.sentinel.cm)
self.assertIsNone(ctxt.mq_connection)
self.assertFalse(mock_create_transport.called)

View File

@ -49,6 +49,53 @@ class TestRPC(testtools.TestCase):
super(TestRPC, self).setUp()
self.useFixture(RPCResetFixture())
@mock.patch('oslo_messaging.RPCClient')
def test_cell_client(self, mock_rpcclient):
default_client = mock.Mock()
dynamic_client = mock.Mock()
mock_rpcclient.return_value = dynamic_client
ctxt = mock.Mock()
ctxt.mq_connection = 'fake://'
class FakeAPI(object):
def __init__(self):
self.client = default_client
def rpc_api_function(self, context):
rpc.get_cell_client(context, self.client).do_rpc()
rpcapi = FakeAPI()
rpcapi.rpc_api_function(ctxt)
# verify a dynamic client was created
mock_rpcclient.assert_called_once_with(
ctxt.mq_connection, default_client.target,
version_cap=default_client.version_cap,
serializer=default_client.serializer)
# verify dynamic client handled the rpc
dynamic_client.do_rpc.assert_called_once_with()
@mock.patch('oslo_messaging.RPCClient')
def test_cell_client_no_switch(self, mock_rpcclient):
default_client = mock.Mock()
dynamic_client = mock.Mock()
mock_rpcclient.return_value = dynamic_client
ctxt = mock.Mock()
ctxt.mq_connection = None
class FakeAPI(object):
def __init__(self):
self.client = default_client
def rpc_api_function(self, context):
rpc.get_cell_client(context, self.client).do_rpc()
rpcapi = FakeAPI()
rpcapi.rpc_api_function(ctxt)
# verify a dynamic client was not created
self.assertFalse(mock_rpcclient.called)
# verify default client handled the rpc
default_client.do_rpc.assert_called_once_with()
@mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_transport')
@ -249,6 +296,18 @@ class TestRPC(testtools.TestCase):
mock_prep.assert_called_once_with(publisher_id='service.foo')
self.assertEqual('notifier', notifier)
@mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(messaging, 'get_transport')
def test_create_transport(self, mock_transport, mock_exmods):
exmods = mock_exmods.return_value
transport = rpc.create_transport(mock.sentinel.url)
self.assertEqual(mock_transport.return_value, transport)
mock_exmods.assert_called_once_with()
mock_transport.assert_called_once_with(rpc.CONF,
url=mock.sentinel.url,
allowed_remote_exmods=exmods,
aliases=rpc.TRANSPORT_ALIASES)
def _test_init(self, mock_notif, mock_noti_trans, mock_trans, mock_ser,
mock_exmods, notif_format, expected_driver_topic_kwargs):
legacy_notifier = mock.Mock()