Port all rpcapi modules to oslo.messaging interface

Add a temporary nova.rpcclient.RPCClient helper class which translates
oslo.messaging.rpc.RPCClient compatible calls into calls on a RpcProxy
object.

Use this new class to port all of the rpcapi modules over to the new
RPCClient so that the final port of Nova over to oslo.messaging will be
smaller and easier to review.

This patch contains no functional changes at all, except that all client
side RPCs go through this temporary helper class.

blueprint: oslo-messaging
Change-Id: Iee86c36bcc474a604993618b8a2255af8c3d2f48
This commit is contained in:
Mark McLoughlin 2013-08-04 07:22:24 +01:00
parent dd5a734c5a
commit 33b9e6f47c
15 changed files with 1005 additions and 946 deletions

View File

@ -21,8 +21,7 @@ Base RPC client and server common to all services.
from oslo.config import cfg
from nova.openstack.common import jsonutils
from nova.openstack.common import rpc
import nova.openstack.common.rpc.proxy as rpc_proxy
from nova import rpcclient
CONF = cfg.CONF
@ -34,7 +33,7 @@ CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
_NAMESPACE = 'baseapi'
class BaseAPI(rpc_proxy.RpcProxy):
class BaseAPI(rpcclient.RpcProxy):
"""Client side of the base rpc API.
API version history:
@ -63,18 +62,17 @@ class BaseAPI(rpc_proxy.RpcProxy):
super(BaseAPI, self).__init__(topic=topic,
default_version=self.BASE_RPC_API_VERSION,
version_cap=version_cap)
self.namespace = _NAMESPACE
self.client = self.get_client(namespace=_NAMESPACE)
def ping(self, context, arg, timeout=None):
arg_p = jsonutils.to_primitive(arg)
msg = self.make_namespaced_msg('ping', self.namespace, arg=arg_p)
return self.call(context, msg, timeout=timeout)
cctxt = self.client.prepare(timeout=timeout)
return cctxt.call(context, 'ping', arg=arg_p)
def get_backdoor_port(self, context, host):
msg = self.make_namespaced_msg('get_backdoor_port', self.namespace)
return self.call(context, msg,
topic=rpc.queue_get_for(context, self.topic, host),
version='1.1')
cctxt = self.client.prepare(server=host, version='1.1')
return cctxt.call(context, 'get_backdoor_port')
class BaseRPCAPI(object):

View File

@ -25,7 +25,7 @@ from nova.cells import driver
from nova.openstack.common.gettextutils import _
from nova.openstack.common import rpc
from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
from nova.openstack.common.rpc import proxy as rpc_proxy
from nova import rpcclient
cell_rpc_driver_opts = [
cfg.StrOpt('rpc_driver_queue_base',
@ -108,7 +108,7 @@ class CellsRPCDriver(driver.BaseCellsDriver):
self.intercell_rpcapi.send_message_to_cell(cell_state, message)
class InterCellRPCAPI(rpc_proxy.RpcProxy):
class InterCellRPCAPI(rpcclient.RpcProxy):
"""Client side of the Cell<->Cell RPC API.
The CellsRPCDriver uses this to make calls to another cell.
@ -131,6 +131,11 @@ class InterCellRPCAPI(rpc_proxy.RpcProxy):
super(InterCellRPCAPI, self).__init__(None, default_version,
version_cap=version_cap)
def _get_client(self, next_hop, topic):
server_params = self._get_server_params_for_cell(next_hop)
cctxt = self.get_client(server_params=server_params)
return cctxt.prepare(topic=topic)
@staticmethod
def _get_server_params_for_cell(next_hop):
"""Turn the DB information for a cell into the parameters
@ -146,18 +151,13 @@ class InterCellRPCAPI(rpc_proxy.RpcProxy):
fanout, do it. The topic that is used will be
'CONF.rpc_driver_queue_base.<message_type>'.
"""
ctxt = message.ctxt
json_message = message.to_json()
rpc_message = self.make_msg('process_message', message=json_message)
topic_base = CONF.cells.rpc_driver_queue_base
topic = '%s.%s' % (topic_base, message.message_type)
server_params = self._get_server_params_for_cell(cell_state)
cctxt = self._get_client(cell_state, topic)
if message.fanout:
self.fanout_cast_to_server(ctxt, server_params,
rpc_message, topic=topic)
else:
self.cast_to_server(ctxt, server_params,
rpc_message, topic=topic)
cctxt = cctxt.prepare(fanout=message.fanout)
return cctxt.cast(message.ctxt, 'process_message',
message=message.to_json())
class InterCellRPCDispatcher(object):

View File

@ -29,7 +29,7 @@ from nova.objects import base as objects_base
from nova.openstack.common.gettextutils import _
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
from nova.openstack.common.rpc import proxy as rpc_proxy
from nova import rpcclient
LOG = logging.getLogger(__name__)
@ -42,7 +42,7 @@ rpcapi_cap_opt = cfg.StrOpt('cells',
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
class CellsAPI(rpc_proxy.RpcProxy):
class CellsAPI(rpcclient.RpcProxy):
'''Cells client-side RPC API
API version history:
@ -95,6 +95,7 @@ class CellsAPI(rpc_proxy.RpcProxy):
default_version=self.BASE_RPC_API_VERSION,
serializer=objects_base.NovaObjectSerializer(),
version_cap=version_cap)
self.client = self.get_client()
def cast_compute_api_method(self, ctxt, cell_name, method,
*args, **kwargs):
@ -102,10 +103,10 @@ class CellsAPI(rpc_proxy.RpcProxy):
method_info = {'method': method,
'method_args': args,
'method_kwargs': kwargs}
self.cast(ctxt, self.make_msg('run_compute_api_method',
cell_name=cell_name,
method_info=method_info,
call=False))
self.client.cast(ctxt, 'run_compute_api_method',
cell_name=cell_name,
method_info=method_info,
call=False)
def call_compute_api_method(self, ctxt, cell_name, method,
*args, **kwargs):
@ -113,16 +114,16 @@ class CellsAPI(rpc_proxy.RpcProxy):
method_info = {'method': method,
'method_args': args,
'method_kwargs': kwargs}
return self.call(ctxt, self.make_msg('run_compute_api_method',
cell_name=cell_name,
method_info=method_info,
call=True))
return self.client.call(ctxt, 'run_compute_api_method',
cell_name=cell_name,
method_info=method_info,
call=True)
# NOTE(alaski): Deprecated and should be removed later.
def schedule_run_instance(self, ctxt, **kwargs):
"""Schedule a new instance for creation."""
self.cast(ctxt, self.make_msg('schedule_run_instance',
host_sched_kwargs=kwargs))
self.client.cast(ctxt, 'schedule_run_instance',
host_sched_kwargs=kwargs)
def build_instances(self, ctxt, **kwargs):
"""Build instances."""
@ -132,9 +133,9 @@ class CellsAPI(rpc_proxy.RpcProxy):
build_inst_kwargs['instances'] = instances_p
build_inst_kwargs['image'] = jsonutils.to_primitive(
build_inst_kwargs['image'])
self.cast(ctxt, self.make_msg('build_instances',
build_inst_kwargs=build_inst_kwargs),
version='1.8')
cctxt = self.client.prepare(version='1.8')
cctxt.cast(ctxt, 'build_instances',
build_inst_kwargs=build_inst_kwargs)
def instance_update_at_top(self, ctxt, instance):
"""Update instance at API level."""
@ -142,16 +143,14 @@ class CellsAPI(rpc_proxy.RpcProxy):
return
# Make sure we have a dict, not a SQLAlchemy model
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('instance_update_at_top',
instance=instance_p))
self.client.cast(ctxt, 'instance_update_at_top', instance=instance_p)
def instance_destroy_at_top(self, ctxt, instance):
"""Destroy instance at API level."""
if not CONF.cells.enable:
return
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('instance_destroy_at_top',
instance=instance_p))
self.client.cast(ctxt, 'instance_destroy_at_top', instance=instance_p)
def instance_delete_everywhere(self, ctxt, instance, delete_type):
"""Delete instance everywhere. delete_type may be 'soft'
@ -161,17 +160,16 @@ class CellsAPI(rpc_proxy.RpcProxy):
if not CONF.cells.enable:
return
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('instance_delete_everywhere',
instance=instance_p,
delete_type=delete_type))
self.client.cast(ctxt, 'instance_delete_everywhere',
instance=instance_p, delete_type=delete_type)
def instance_fault_create_at_top(self, ctxt, instance_fault):
"""Create an instance fault at the top."""
if not CONF.cells.enable:
return
instance_fault_p = jsonutils.to_primitive(instance_fault)
self.cast(ctxt, self.make_msg('instance_fault_create_at_top',
instance_fault=instance_fault_p))
self.client.cast(ctxt, 'instance_fault_create_at_top',
instance_fault=instance_fault_p)
def bw_usage_update_at_top(self, ctxt, uuid, mac, start_period,
bw_in, bw_out, last_ctr_in, last_ctr_out, last_refreshed=None):
@ -186,8 +184,8 @@ class CellsAPI(rpc_proxy.RpcProxy):
'last_ctr_in': last_ctr_in,
'last_ctr_out': last_ctr_out,
'last_refreshed': last_refreshed}
self.cast(ctxt, self.make_msg('bw_usage_update_at_top',
bw_update_info=bw_update_info))
self.client.cast(ctxt, 'bw_usage_update_at_top',
bw_update_info=bw_update_info)
def instance_info_cache_update_at_top(self, ctxt, instance_info_cache):
"""Broadcast up that an instance's info_cache has changed."""
@ -196,49 +194,45 @@ class CellsAPI(rpc_proxy.RpcProxy):
iicache = jsonutils.to_primitive(instance_info_cache)
instance = {'uuid': iicache['instance_uuid'],
'info_cache': iicache}
self.cast(ctxt, self.make_msg('instance_update_at_top',
instance=instance))
self.client.cast(ctxt, 'instance_update_at_top', instance=instance)
def get_cell_info_for_neighbors(self, ctxt):
"""Get information about our neighbor cells from the manager."""
if not CONF.cells.enable:
return []
return self.call(ctxt, self.make_msg('get_cell_info_for_neighbors'),
version='1.1')
cctxt = self.client.prepare(version='1.1')
return cctxt.call(ctxt, 'get_cell_info_for_neighbors')
def sync_instances(self, ctxt, project_id=None, updated_since=None,
deleted=False):
"""Ask all cells to sync instance data."""
if not CONF.cells.enable:
return
return self.cast(ctxt, self.make_msg('sync_instances',
project_id=project_id,
updated_since=updated_since,
deleted=deleted),
version='1.1')
cctxt = self.client.prepare(version='1.1')
return cctxt.cast(ctxt, 'sync_instances',
project_id=project_id,
updated_since=updated_since,
deleted=deleted)
def service_get_all(self, ctxt, filters=None):
"""Ask all cells for their list of services."""
return self.call(ctxt,
self.make_msg('service_get_all',
filters=filters),
version='1.2')
cctxt = self.client.prepare(version='1.2')
return cctxt.call(ctxt, 'service_get_all', filters=filters)
def service_get_by_compute_host(self, ctxt, host_name):
"""Get the service entry for a host in a particular cell. The
cell name should be encoded within the host_name.
"""
return self.call(ctxt, self.make_msg('service_get_by_compute_host',
host_name=host_name),
version='1.2')
cctxt = self.client.prepare(version='1.2')
return cctxt.call(ctxt, 'service_get_by_compute_host',
host_name=host_name)
def get_host_uptime(self, context, host_name):
"""Gets the host uptime in a particular cell. The cell name should
be encoded within the host_name
"""
return self.call(context, self.make_msg('get_host_uptime',
host_name=host_name),
version='1.17')
cctxt = self.client.prepare(version='1.17')
return cctxt.call(context, 'get_host_uptime', host_name=host_name)
def service_update(self, ctxt, host_name, binary, params_to_update):
"""
@ -250,99 +244,95 @@ class CellsAPI(rpc_proxy.RpcProxy):
:param binary: The name of the executable that the service runs as
:param params_to_update: eg. {'disabled': True}
"""
return self.call(ctxt, self.make_msg(
'service_update', host_name=host_name,
binary=binary, params_to_update=params_to_update),
version='1.7')
cctxt = self.client.prepare(version='1.7')
return cctxt.call(ctxt, 'service_update',
host_name=host_name,
binary=binary,
params_to_update=params_to_update)
def proxy_rpc_to_manager(self, ctxt, rpc_message, topic, call=False,
timeout=None):
"""Proxy RPC to a compute manager. The host in the topic
should be encoded with the target cell name.
"""
return self.call(ctxt, self.make_msg('proxy_rpc_to_manager',
topic=topic,
rpc_message=rpc_message,
call=call,
timeout=timeout),
timeout=timeout,
version='1.2')
cctxt = self.client.prepare(version='1.2', timeout=timeout)
return cctxt.call(ctxt, 'proxy_rpc_to_manager',
topic=topic,
rpc_message=rpc_message,
call=call,
timeout=timeout)
def task_log_get_all(self, ctxt, task_name, period_beginning,
period_ending, host=None, state=None):
"""Get the task logs from the DB in child cells."""
return self.call(ctxt, self.make_msg('task_log_get_all',
task_name=task_name,
period_beginning=period_beginning,
period_ending=period_ending,
host=host, state=state),
version='1.3')
cctxt = self.client.prepare(version='1.3')
return cctxt.call(ctxt, 'task_log_get_all',
task_name=task_name,
period_beginning=period_beginning,
period_ending=period_ending,
host=host, state=state)
def compute_node_get(self, ctxt, compute_id):
"""Get a compute node by ID in a specific cell."""
return self.call(ctxt, self.make_msg('compute_node_get',
compute_id=compute_id),
version='1.4')
cctxt = self.client.prepare(version='1.4')
return cctxt.call(ctxt, 'compute_node_get', compute_id=compute_id)
def compute_node_get_all(self, ctxt, hypervisor_match=None):
"""Return list of compute nodes in all cells, optionally
filtering by hypervisor host.
"""
return self.call(ctxt,
self.make_msg('compute_node_get_all',
hypervisor_match=hypervisor_match),
version='1.4')
cctxt = self.client.prepare(version='1.4')
return cctxt.call(ctxt, 'compute_node_get_all',
hypervisor_match=hypervisor_match)
def compute_node_stats(self, ctxt):
"""Return compute node stats from all cells."""
return self.call(ctxt, self.make_msg('compute_node_stats'),
version='1.4')
cctxt = self.client.prepare(version='1.4')
return cctxt.call(ctxt, 'compute_node_stats')
def actions_get(self, ctxt, instance):
if not instance['cell_name']:
raise exception.InstanceUnknownCell(instance_uuid=instance['uuid'])
return self.call(ctxt, self.make_msg('actions_get',
cell_name=instance['cell_name'],
instance_uuid=instance['uuid']),
version='1.5')
cctxt = self.client.prepare(version='1.5')
return cctxt.call(ctxt, 'actions_get',
cell_name=instance['cell_name'],
instance_uuid=instance['uuid'])
def action_get_by_request_id(self, ctxt, instance, request_id):
if not instance['cell_name']:
raise exception.InstanceUnknownCell(instance_uuid=instance['uuid'])
return self.call(ctxt, self.make_msg('action_get_by_request_id',
cell_name=instance['cell_name'],
instance_uuid=instance['uuid'],
request_id=request_id),
version='1.5')
cctxt = self.client.prepare(version='1.5')
return cctxt.call(ctxt, 'action_get_by_request_id',
cell_name=instance['cell_name'],
instance_uuid=instance['uuid'],
request_id=request_id)
def action_events_get(self, ctxt, instance, action_id):
if not instance['cell_name']:
raise exception.InstanceUnknownCell(instance_uuid=instance['uuid'])
return self.call(ctxt, self.make_msg('action_events_get',
cell_name=instance['cell_name'],
action_id=action_id),
version='1.5')
cctxt = self.client.prepare(version='1.5')
return cctxt.call(ctxt, 'action_events_get',
cell_name=instance['cell_name'],
action_id=action_id)
def consoleauth_delete_tokens(self, ctxt, instance_uuid):
"""Delete consoleauth tokens for an instance in API cells."""
self.cast(ctxt, self.make_msg('consoleauth_delete_tokens',
instance_uuid=instance_uuid),
version='1.6')
cctxt = self.client.prepare(version='1.6')
cctxt.cast(ctxt, 'consoleauth_delete_tokens',
instance_uuid=instance_uuid)
def validate_console_port(self, ctxt, instance_uuid, console_port,
console_type):
"""Validate console port with child cell compute node."""
return self.call(ctxt,
self.make_msg('validate_console_port',
instance_uuid=instance_uuid,
console_port=console_port,
console_type=console_type),
version='1.6')
cctxt = self.client.prepare(version='1.6')
return cctxt.call(ctxt, 'validate_console_port',
instance_uuid=instance_uuid,
console_port=console_port,
console_type=console_type)
def get_capacities(self, ctxt, cell_name=None):
return self.call(ctxt,
self.make_msg('get_capacities', cell_name=cell_name),
version='1.9')
cctxt = self.client.prepare(version='1.9')
return cctxt.call(ctxt, 'get_capacities', cell_name=cell_name)
def bdm_update_or_create_at_top(self, ctxt, bdm, create=None):
"""Create or update a block device mapping in API cells. If
@ -352,10 +342,10 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
cctxt = self.client.prepare(version='1.10')
try:
self.cast(ctxt, self.make_msg('bdm_update_or_create_at_top',
bdm=bdm, create=create),
version='1.10')
cctxt.cast(ctxt, 'bdm_update_or_create_at_top',
bdm=bdm, create=create)
except Exception:
LOG.exception(_("Failed to notify cells of BDM update/create."))
@ -366,19 +356,19 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
cctxt = self.client.prepare(version='1.10')
try:
self.cast(ctxt, self.make_msg('bdm_destroy_at_top',
instance_uuid=instance_uuid,
device_name=device_name,
volume_id=volume_id),
version='1.10')
cctxt.cast(ctxt, 'bdm_destroy_at_top',
instance_uuid=instance_uuid,
device_name=device_name,
volume_id=volume_id)
except Exception:
LOG.exception(_("Failed to notify cells of BDM destroy."))
def get_migrations(self, ctxt, filters):
"""Get all migrations applying the filters."""
return self.call(ctxt, self.make_msg('get_migrations',
filters=filters), version='1.11')
cctxt = self.client.prepare(version='1.11')
return cctxt.call(ctxt, 'get_migrations', filters=filters)
def instance_update_from_api(self, ctxt, instance, expected_vm_state,
expected_task_state, admin_state_reset):
@ -388,13 +378,12 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('instance_update_from_api',
instance=instance,
expected_vm_state=expected_vm_state,
expected_task_state=expected_task_state,
admin_state_reset=admin_state_reset),
version='1.16')
cctxt = self.client.prepare(version='1.16')
cctxt.cast(ctxt, 'instance_update_from_api',
instance=instance,
expected_vm_state=expected_vm_state,
expected_task_state=expected_task_state,
admin_state_reset=admin_state_reset)
def start_instance(self, ctxt, instance):
"""Start an instance in its cell.
@ -403,9 +392,8 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('start_instance', instance=instance),
version='1.12')
cctxt = self.client.prepare(version='1.12')
cctxt.cast(ctxt, 'start_instance', instance=instance)
def stop_instance(self, ctxt, instance, do_cast=True):
"""Stop an instance in its cell.
@ -414,33 +402,27 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
method = do_cast and self.cast or self.call
return method(ctxt,
self.make_msg('stop_instance', instance=instance,
do_cast=do_cast),
version='1.12')
cctxt = self.client.prepare(version='1.12')
method = do_cast and cctxt.cast or cctxt.call
return method(ctxt, 'stop_instance',
instance=instance, do_cast=do_cast)
def cell_create(self, ctxt, values):
return self.call(ctxt,
self.make_msg('cell_create', values=values),
version='1.13')
cctxt = self.client.prepare(version='1.13')
return cctxt.call(ctxt, 'cell_create', values=values)
def cell_update(self, ctxt, cell_name, values):
return self.call(ctxt,
self.make_msg('cell_update',
cell_name=cell_name,
values=values),
version='1.13')
cctxt = self.client.prepare(version='1.13')
return cctxt.call(ctxt, 'cell_update',
cell_name=cell_name, values=values)
def cell_delete(self, ctxt, cell_name):
return self.call(ctxt,
self.make_msg('cell_delete', cell_name=cell_name),
version='1.13')
cctxt = self.client.prepare(version='1.13')
return cctxt.call(ctxt, 'cell_delete', cell_name=cell_name)
def cell_get(self, ctxt, cell_name):
return self.call(ctxt,
self.make_msg('cell_get', cell_name=cell_name),
version='1.13')
cctxt = self.client.prepare(version='1.13')
return cctxt.call(ctxt, 'cell_get', cell_name=cell_name)
def reboot_instance(self, ctxt, instance, block_device_info,
reboot_type):
@ -450,10 +432,9 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('reboot_instance', instance=instance,
reboot_type=reboot_type),
version='1.14')
cctxt = self.client.prepare(version='1.14')
cctxt.cast(ctxt, 'reboot_instance', instance=instance,
reboot_type=reboot_type)
def pause_instance(self, ctxt, instance):
"""Pause an instance in its cell.
@ -462,9 +443,8 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('pause_instance', instance=instance),
version='1.19')
cctxt = self.client.prepare(version='1.19')
cctxt.cast(ctxt, 'pause_instance', instance=instance)
def unpause_instance(self, ctxt, instance):
"""Unpause an instance in its cell.
@ -473,9 +453,8 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('unpause_instance', instance=instance),
version='1.19')
cctxt = self.client.prepare(version='1.19')
cctxt.cast(ctxt, 'unpause_instance', instance=instance)
def suspend_instance(self, ctxt, instance):
"""Suspend an instance in its cell.
@ -484,9 +463,8 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('suspend_instance', instance=instance),
version='1.15')
cctxt = self.client.prepare(version='1.15')
cctxt.cast(ctxt, 'suspend_instance', instance=instance)
def resume_instance(self, ctxt, instance):
"""Resume an instance in its cell.
@ -495,9 +473,8 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('resume_instance', instance=instance),
version='1.15')
cctxt = self.client.prepare(version='1.15')
cctxt.cast(ctxt, 'resume_instance', instance=instance)
def terminate_instance(self, ctxt, instance, bdms, reservations=None):
"""Delete an instance in its cell.
@ -506,9 +483,8 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('terminate_instance', instance=instance),
version='1.18')
cctxt = self.client.prepare(version='1.18')
cctxt.cast(ctxt, 'terminate_instance', instance=instance)
def soft_delete_instance(self, ctxt, instance, reservations=None):
"""Soft-delete an instance in its cell.
@ -517,39 +493,36 @@ class CellsAPI(rpc_proxy.RpcProxy):
"""
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('soft_delete_instance', instance=instance),
version='1.18')
cctxt = self.client.prepare(version='1.18')
cctxt.cast(ctxt, 'soft_delete_instance', instance=instance)
def resize_instance(self, ctxt, instance, extra_instance_updates,
scheduler_hint, flavor, reservations):
if not CONF.cells.enable:
return
flavor_p = jsonutils.to_primitive(flavor)
self.cast(ctxt,
self.make_msg('resize_instance', instance=instance,
flavor=flavor_p,
extra_instance_updates=extra_instance_updates),
version='1.20')
cctxt = self.client.prepare(version='1.20')
cctxt.cast(ctxt, 'resize_instance',
instance=instance, flavor=flavor_p,
extra_instance_updates=extra_instance_updates)
def live_migrate_instance(self, ctxt, instance, host_name,
block_migration, disk_over_commit):
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('live_migrate_instance', instance=instance,
block_migration=block_migration,
disk_over_commit=disk_over_commit,
host_name=host_name),
version='1.20')
cctxt = self.client.prepare(version='1.20')
cctxt.cast(ctxt, 'live_migrate_instance',
instance=instance,
block_migration=block_migration,
disk_over_commit=disk_over_commit,
host_name=host_name)
def revert_resize(self, ctxt, instance, migration, host,
reservations):
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('revert_resize', instance=instance),
version='1.21')
cctxt = self.client.prepare(version='1.21')
cctxt.cast(ctxt, 'revert_resize', instance=instance)
def confirm_resize(self, ctxt, instance, migration, host,
reservations, cast=True):
@ -560,40 +533,36 @@ class CellsAPI(rpc_proxy.RpcProxy):
# Also, the compute api method normally takes an optional
# 'migration_ref' argument. But this is only used from the manager
# back to the API... which would happen in the child cell.
self.cast(ctxt,
self.make_msg('confirm_resize', instance=instance),
version='1.21')
cctxt = self.client.prepare(version='1.21')
cctxt.cast(ctxt, 'confirm_resize', instance=instance)
def reset_network(self, ctxt, instance):
"""Reset networking for an instance."""
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('reset_network', instance=instance),
version='1.22')
cctxt = self.client.prepare(version='1.22')
cctxt.cast(ctxt, 'reset_network', instance=instance)
def inject_network_info(self, ctxt, instance):
"""Inject networking for an instance."""
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('inject_network_info', instance=instance),
version='1.23')
cctxt = self.client.prepare(version='1.23')
cctxt.cast(ctxt, 'inject_network_info', instance=instance)
def snapshot_instance(self, ctxt, instance, image_id):
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('snapshot_instance', instance=instance,
image_id=image_id),
version='1.24')
cctxt = self.client.prepare(version='1.24')
cctxt.cast(ctxt, 'snapshot_instance',
instance=instance, image_id=image_id)
def backup_instance(self, ctxt, instance, image_id, backup_type, rotation):
if not CONF.cells.enable:
return
self.cast(ctxt,
self.make_msg('backup_instance', instance=instance,
image_id=image_id,
backup_type=backup_type,
rotation=rotation),
version='1.24')
cctxt = self.client.prepare(version='1.24')
cctxt.cast(ctxt, 'backup_instance',
instance=instance,
image_id=image_id,
backup_type=backup_type,
rotation=rotation)

View File

@ -20,7 +20,7 @@ Client side of the cert manager RPC API.
from oslo.config import cfg
import nova.openstack.common.rpc.proxy
from nova import rpcclient
rpcapi_opts = [
cfg.StrOpt('cert_topic',
@ -36,7 +36,7 @@ rpcapi_cap_opt = cfg.StrOpt('cert',
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
class CertAPI(nova.openstack.common.rpc.proxy.RpcProxy):
class CertAPI(rpcclient.RpcProxy):
'''Client side of the cert rpc API.
API version history:
@ -70,34 +70,31 @@ class CertAPI(nova.openstack.common.rpc.proxy.RpcProxy):
topic=CONF.cert_topic,
default_version=self.BASE_RPC_API_VERSION,
version_cap=version_cap)
self.client = self.get_client()
def revoke_certs_by_user(self, ctxt, user_id):
return self.call(ctxt, self.make_msg('revoke_certs_by_user',
user_id=user_id))
return self.client.call(ctxt, 'revoke_certs_by_user', user_id=user_id)
def revoke_certs_by_project(self, ctxt, project_id):
return self.call(ctxt, self.make_msg('revoke_certs_by_project',
project_id=project_id))
return self.client.call(ctxt, 'revoke_certs_by_project',
project_id=project_id)
def revoke_certs_by_user_and_project(self, ctxt, user_id, project_id):
return self.call(ctxt,
self.make_msg('revoke_certs_by_user_and_project',
user_id=user_id, project_id=project_id))
return self.client.call(ctxt, 'revoke_certs_by_user_and_project',
user_id=user_id, project_id=project_id)
def generate_x509_cert(self, ctxt, user_id, project_id):
return self.call(ctxt, self.make_msg('generate_x509_cert',
user_id=user_id,
project_id=project_id))
return self.client.call(ctxt, 'generate_x509_cert',
user_id=user_id,
project_id=project_id)
def fetch_ca(self, ctxt, project_id):
return self.call(ctxt, self.make_msg('fetch_ca',
project_id=project_id))
return self.client.call(ctxt, 'fetch_ca', project_id=project_id)
def fetch_crl(self, ctxt, project_id):
return self.call(ctxt, self.make_msg('fetch_crl',
project_id=project_id))
return self.client.call(ctxt, 'fetch_crl', project_id=project_id)
def decrypt_text(self, ctxt, project_id, text):
return self.call(ctxt, self.make_msg('decrypt_text',
project_id=project_id,
text=text))
return self.client.call(ctxt, 'decrypt_text',
project_id=project_id,
text=text)

View File

@ -24,6 +24,7 @@ from nova.compute import rpcapi as compute_rpcapi
from nova import exception
from nova.objects import service as service_obj
from nova.openstack.common import excutils
from nova import rpcclient
check_instance_state = compute_api.check_instance_state
@ -89,25 +90,34 @@ class ConductorTaskRPCAPIRedirect(object):
return _noop_rpc_wrapper
class RPCClientCellsProxy(rpcclient.RPCClient):
def __init__(self, proxy, *args, **kwargs):
super(RPCClientCellsProxy, self).__init__(proxy, *args, **kwargs)
self.cells_rpcapi = cells_rpcapi.CellsAPI()
def cast(self, ctxt, method, **kwargs):
msg = self.proxy.make_msg(method, **kwargs)
self.proxy._set_version(msg, self.kwargs.get('version'))
topic = self.proxy._get_topic(self.kwargs.get('topic'))
self.cells_rpcapi.proxy_rpc_to_manager(ctxt, msg, topic)
def call(self, ctxt, method, **kwargs):
msg = self.proxy.make_msg(method, **kwargs)
self.proxy._set_version(msg, self.kwargs.get('version'))
topic = self.proxy._get_topic(self.kwargs.get('topic'))
timeout = self.kwargs.get('timeout')
return self.cells_rpcapi.proxy_rpc_to_manager(ctxt, msg, topic,
call=True,
timeout=timeout)
class ComputeRPCProxyAPI(compute_rpcapi.ComputeAPI):
"""Class used to substitute Compute RPC API that will proxy
via the cells manager to a compute manager in a child cell.
"""
def __init__(self, *args, **kwargs):
super(ComputeRPCProxyAPI, self).__init__(*args, **kwargs)
self.cells_rpcapi = cells_rpcapi.CellsAPI()
def cast(self, ctxt, msg, topic=None, version=None):
self._set_version(msg, version)
topic = self._get_topic(topic)
self.cells_rpcapi.proxy_rpc_to_manager(ctxt, msg, topic)
def call(self, ctxt, msg, topic=None, version=None, timeout=None):
self._set_version(msg, version)
topic = self._get_topic(topic)
return self.cells_rpcapi.proxy_rpc_to_manager(ctxt, msg, topic,
call=True,
timeout=timeout)
def get_client(self):
return RPCClientCellsProxy(self)
class ComputeCellsAPI(compute_api.API):

View File

@ -24,9 +24,7 @@ from nova import exception
from nova.objects import base as objects_base
from nova.openstack.common.gettextutils import _
from nova.openstack.common import jsonutils
from nova.openstack.common import rpc
import nova.openstack.common.rpc
import nova.openstack.common.rpc.proxy
from nova import rpcclient
rpcapi_opts = [
cfg.StrOpt('compute_topic',
@ -42,28 +40,26 @@ rpcapi_cap_opt = cfg.StrOpt('compute',
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
def _compute_topic(topic, ctxt, host, instance):
'''Get the topic to use for a message.
def _compute_host(host, instance):
'''Get the destination host for a message.
:param topic: the base topic
:param ctxt: request context
:param host: explicit host to send the message to.
:param instance: If an explicit host was not specified, use
instance['host']
:returns: A topic string
:returns: A host
'''
if not host:
if not instance:
raise exception.NovaException(_('No compute host specified'))
host = instance['host']
if not host:
raise exception.NovaException(_('Unable to find host for '
'Instance %s') % instance['uuid'])
return rpc.queue_get_for(ctxt, topic, host)
if host:
return host
if not instance:
raise exception.NovaException(_('No compute host specified'))
if not instance['host']:
raise exception.NovaException(_('Unable to find host for '
'Instance %s') % instance['uuid'])
return instance['host']
class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
class ComputeAPI(rpcclient.RpcProxy):
'''Client side of the compute rpc API.
API version history:
@ -227,6 +223,7 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
default_version=self.BASE_RPC_API_VERSION,
serializer=objects_base.NovaObjectSerializer(),
version_cap=version_cap)
self.client = self.get_client()
def add_aggregate_host(self, ctxt, aggregate, host_param, host,
slave_info=None):
@ -240,84 +237,77 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''
aggregate_p = jsonutils.to_primitive(aggregate)
self.cast(ctxt, self.make_msg('add_aggregate_host',
aggregate=aggregate_p, host=host_param,
slave_info=slave_info),
topic=_compute_topic(self.topic, ctxt, host, None),
version='2.14')
cctxt = self.client.prepare(server=host, version='2.14')
cctxt.cast(ctxt, 'add_aggregate_host',
aggregate=aggregate_p, host=host_param,
slave_info=slave_info)
def add_fixed_ip_to_instance(self, ctxt, instance, network_id):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('add_fixed_ip_to_instance',
instance=instance_p, network_id=network_id),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'add_fixed_ip_to_instance',
instance=instance_p, network_id=network_id)
def attach_interface(self, ctxt, instance, network_id, port_id,
requested_ip):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt, self.make_msg('attach_interface',
instance=instance_p, network_id=network_id,
port_id=port_id, requested_ip=requested_ip),
topic=_compute_topic(self.topic, ctxt, None, instance),
version='2.25')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.25')
return cctxt.call(ctxt, 'attach_interface',
instance=instance_p, network_id=network_id,
port_id=port_id, requested_ip=requested_ip)
def attach_volume(self, ctxt, instance, volume_id, mountpoint):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('attach_volume',
instance=instance_p, volume_id=volume_id,
mountpoint=mountpoint),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'attach_volume',
instance=instance_p, volume_id=volume_id,
mountpoint=mountpoint)
def change_instance_metadata(self, ctxt, instance, diff):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('change_instance_metadata',
instance=instance_p, diff=diff),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'change_instance_metadata',
instance=instance_p, diff=diff)
def check_can_live_migrate_destination(self, ctxt, instance, destination,
block_migration, disk_over_commit):
if self.can_send_version('2.38'):
if self.client.can_send_version('2.38'):
version = '2.38'
else:
version = '2.0'
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
return self.call(ctxt,
self.make_msg('check_can_live_migrate_destination',
instance=instance,
block_migration=block_migration,
disk_over_commit=disk_over_commit),
topic=_compute_topic(self.topic,
ctxt, destination, None),
version=version)
cctxt = self.client.prepare(server=destination, version=version)
return cctxt.call(ctxt, 'check_can_live_migrate_destination',
instance=instance,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
def check_can_live_migrate_source(self, ctxt, instance, dest_check_data):
if self.can_send_version('2.38'):
if self.client.can_send_version('2.38'):
version = '2.38'
else:
version = '2.0'
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
return self.call(ctxt, self.make_msg('check_can_live_migrate_source',
instance=instance,
dest_check_data=dest_check_data),
topic=_compute_topic(self.topic, ctxt, None,
instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
return cctxt.call(ctxt, 'check_can_live_migrate_source',
instance=instance,
dest_check_data=dest_check_data)
def check_instance_shared_storage(self, ctxt, instance, data):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt, self.make_msg('check_instance_shared_storage',
instance=instance_p,
data=data),
topic=_compute_topic(self.topic, ctxt, None,
instance),
version='2.28')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.28')
return cctxt.call(ctxt, 'check_instance_shared_storage',
instance=instance_p,
data=data)
def confirm_resize(self, ctxt, instance, migration, host,
reservations=None, cast=True):
rpc_method = self.cast if cast else self.call
if self.can_send_version('2.39'):
if self.client.can_send_version('2.39'):
version = '2.39'
else:
instance = jsonutils.to_primitive(
@ -325,85 +315,85 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
migration = jsonutils.to_primitive(
objects_base.obj_to_primitive(migration))
version = '2.7'
return rpc_method(ctxt, self.make_msg('confirm_resize',
instance=instance, migration=migration,
reservations=reservations),
topic=_compute_topic(self.topic, ctxt, host, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(host, instance),
version=version)
rpc_method = cctxt.cast if cast else cctxt.call
return rpc_method(ctxt, 'confirm_resize',
instance=instance, migration=migration,
reservations=reservations)
def detach_interface(self, ctxt, instance, port_id):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('detach_interface',
instance=instance_p, port_id=port_id),
topic=_compute_topic(self.topic, ctxt, None, instance),
version='2.25')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.25')
cctxt.cast(ctxt, 'detach_interface',
instance=instance_p, port_id=port_id)
def detach_volume(self, ctxt, instance, volume_id):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('detach_volume',
instance=instance_p, volume_id=volume_id),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'detach_volume',
instance=instance_p, volume_id=volume_id)
def finish_resize(self, ctxt, instance, migration, image, disk_info,
host, reservations=None):
instance_p = jsonutils.to_primitive(instance)
migration_p = jsonutils.to_primitive(migration)
self.cast(ctxt, self.make_msg('finish_resize',
instance=instance_p, migration=migration_p,
image=image, disk_info=disk_info, reservations=reservations),
topic=_compute_topic(self.topic, ctxt, host, None),
version='2.8')
cctxt = self.client.prepare(server=host, version='2.8')
cctxt.cast(ctxt, 'finish_resize',
instance=instance_p, migration=migration_p,
image=image, disk_info=disk_info, reservations=reservations)
def finish_revert_resize(self, ctxt, instance, migration, host,
reservations=None):
instance_p = jsonutils.to_primitive(instance)
migration_p = jsonutils.to_primitive(migration)
self.cast(ctxt, self.make_msg('finish_revert_resize',
instance=instance_p, migration=migration_p,
reservations=reservations),
topic=_compute_topic(self.topic, ctxt, host, None),
version='2.13')
cctxt = self.client.prepare(server=host, version='2.13')
cctxt.cast(ctxt, 'finish_revert_resize',
instance=instance_p, migration=migration_p,
reservations=reservations)
def get_console_output(self, ctxt, instance, tail_length):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt, self.make_msg('get_console_output',
instance=instance_p, tail_length=tail_length),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
return cctxt.call(ctxt, 'get_console_output',
instance=instance_p, tail_length=tail_length)
def get_console_pool_info(self, ctxt, console_type, host):
return self.call(ctxt, self.make_msg('get_console_pool_info',
console_type=console_type),
topic=_compute_topic(self.topic, ctxt, host, None))
cctxt = self.client.prepare(server=host)
return cctxt.call(ctxt, 'get_console_pool_info',
console_type=console_type)
def get_console_topic(self, ctxt, host):
return self.call(ctxt, self.make_msg('get_console_topic'),
topic=_compute_topic(self.topic, ctxt, host, None))
cctxt = self.client.prepare(server=host)
return cctxt.call(ctxt, 'get_console_topic')
def get_diagnostics(self, ctxt, instance):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt, self.make_msg('get_diagnostics',
instance=instance_p),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
return cctxt.call(ctxt, 'get_diagnostics',
instance=instance_p)
def get_vnc_console(self, ctxt, instance, console_type):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt, self.make_msg('get_vnc_console',
instance=instance_p, console_type=console_type),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
return cctxt.call(ctxt, 'get_vnc_console',
instance=instance_p, console_type=console_type)
def get_spice_console(self, ctxt, instance, console_type):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt, self.make_msg('get_spice_console',
instance=instance_p, console_type=console_type),
topic=_compute_topic(self.topic, ctxt, None, instance),
version='2.24')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.24')
return cctxt.call(ctxt, 'get_spice_console',
instance=instance_p, console_type=console_type)
def validate_console_port(self, ctxt, instance, port, console_type):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt, self.make_msg('validate_console_port',
instance=instance_p, port=port, console_type=console_type),
topic=_compute_topic(self.topic, ctxt, None, instance),
version='2.26')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.26')
return cctxt.call(ctxt, 'validate_console_port',
instance=instance_p, port=port,
console_type=console_type)
def host_maintenance_mode(self, ctxt, host_param, mode, host):
'''Set host maintenance mode
@ -414,88 +404,82 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
:param mode:
:param host: This is the host to send the message to.
'''
return self.call(ctxt, self.make_msg('host_maintenance_mode',
host=host_param, mode=mode),
topic=_compute_topic(self.topic, ctxt, host, None))
cctxt = self.client.prepare(server=host)
return cctxt.call(ctxt, 'host_maintenance_mode',
host=host_param, mode=mode)
def host_power_action(self, ctxt, action, host):
topic = _compute_topic(self.topic, ctxt, host, None)
return self.call(ctxt, self.make_msg('host_power_action',
action=action), topic)
cctxt = self.client.prepare(server=host)
return cctxt.call(ctxt, 'host_power_action', action=action)
def inject_file(self, ctxt, instance, path, file_contents):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('inject_file',
instance=instance_p, path=path,
file_contents=file_contents),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'inject_file',
instance=instance_p, path=path,
file_contents=file_contents)
def inject_network_info(self, ctxt, instance):
if self.can_send_version('2.41'):
if self.client.can_send_version('2.41'):
version = '2.41'
else:
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
version = '2.0'
self.cast(ctxt, self.make_msg('inject_network_info',
instance=instance),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'inject_network_info', instance=instance)
def live_migration(self, ctxt, instance, dest, block_migration, host,
migrate_data=None):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('live_migration', instance=instance_p,
dest=dest, block_migration=block_migration,
migrate_data=migrate_data),
topic=_compute_topic(self.topic, ctxt, host, None))
cctxt = self.client.prepare(server=host)
cctxt.cast(ctxt, 'live_migration', instance=instance_p,
dest=dest, block_migration=block_migration,
migrate_data=migrate_data)
def pause_instance(self, ctxt, instance):
if self.can_send_version('2.36'):
if self.client.can_send_version('2.36'):
version = '2.36'
else:
version = '2.0'
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
self.cast(ctxt, self.make_msg('pause_instance',
instance=instance),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'pause_instance', instance=instance)
def post_live_migration_at_destination(self, ctxt, instance,
block_migration, host):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt,
self.make_msg('post_live_migration_at_destination',
instance=instance_p, block_migration=block_migration),
_compute_topic(self.topic, ctxt, host, None))
cctxt = self.client.prepare(server=host)
return cctxt.call(ctxt,
'post_live_migration_at_destination',
instance=instance_p, block_migration=block_migration)
def power_off_instance(self, ctxt, instance):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('power_off_instance',
instance=instance_p),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'power_off_instance', instance=instance_p)
def power_on_instance(self, ctxt, instance):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('power_on_instance',
instance=instance_p),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'power_on_instance', instance=instance_p)
def pre_live_migration(self, ctxt, instance, block_migration, disk,
host, migrate_data=None):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt, self.make_msg('pre_live_migration',
instance=instance_p,
block_migration=block_migration,
disk=disk, migrate_data=migrate_data),
_compute_topic(self.topic, ctxt, host, None),
version='2.21')
cctxt = self.client.prepare(server=host, version='2.21')
return cctxt.call(ctxt, 'pre_live_migration',
instance=instance_p,
block_migration=block_migration,
disk=disk, migrate_data=migrate_data)
def prep_resize(self, ctxt, image, instance, instance_type, host,
reservations=None, request_spec=None,
filter_properties=None, node=None):
if self.can_send_version('2.43'):
if self.client.can_send_version('2.43'):
version = '2.43'
else:
instance = jsonutils.to_primitive(
@ -503,49 +487,47 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
version = '2.20'
instance_type_p = jsonutils.to_primitive(instance_type)
image_p = jsonutils.to_primitive(image)
self.cast(ctxt,
self.make_msg('prep_resize',
instance=instance,
instance_type=instance_type_p,
image=image_p, reservations=reservations,
request_spec=request_spec,
filter_properties=filter_properties,
node=node),
_compute_topic(self.topic, ctxt, host, None),
version=version)
cctxt = self.client.prepare(server=host, version=version)
cctxt.cast(ctxt, 'prep_resize',
instance=instance,
instance_type=instance_type_p,
image=image_p, reservations=reservations,
request_spec=request_spec,
filter_properties=filter_properties,
node=node)
def reboot_instance(self, ctxt, instance, block_device_info,
reboot_type):
if not self.can_send_version('2.32'):
if not self.client.can_send_version('2.32'):
version = '2.23'
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
else:
version = '2.32'
self.cast(ctxt, self.make_msg('reboot_instance',
instance=instance,
block_device_info=block_device_info,
reboot_type=reboot_type),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'reboot_instance',
instance=instance,
block_device_info=block_device_info,
reboot_type=reboot_type)
def rebuild_instance(self, ctxt, instance, new_pass, injected_files,
image_ref, orig_image_ref, orig_sys_metadata, bdms,
recreate=False, on_shared_storage=False, host=None):
instance_p = jsonutils.to_primitive(instance)
bdms_p = jsonutils.to_primitive(bdms)
self.cast(ctxt, self.make_msg('rebuild_instance',
instance=instance_p, new_pass=new_pass,
injected_files=injected_files, image_ref=image_ref,
orig_image_ref=orig_image_ref,
orig_sys_metadata=orig_sys_metadata, bdms=bdms_p,
recreate=recreate, on_shared_storage=on_shared_storage),
topic=_compute_topic(self.topic, ctxt, host, instance),
version='2.22')
cctxt = self.client.prepare(server=_compute_host(host, instance),
version='2.22')
cctxt.cast(ctxt, 'rebuild_instance',
instance=instance_p, new_pass=new_pass,
injected_files=injected_files, image_ref=image_ref,
orig_image_ref=orig_image_ref,
orig_sys_metadata=orig_sys_metadata, bdms=bdms_p,
recreate=recreate, on_shared_storage=on_shared_storage)
def refresh_provider_fw_rules(self, ctxt, host):
self.cast(ctxt, self.make_msg('refresh_provider_fw_rules'),
_compute_topic(self.topic, ctxt, host, None))
cctxt = self.client.prepare(server=host)
cctxt.cast(ctxt, 'refresh_provider_fw_rules')
def remove_aggregate_host(self, ctxt, aggregate, host_param, host,
slave_info=None):
@ -559,70 +541,67 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''
aggregate_p = jsonutils.to_primitive(aggregate)
self.cast(ctxt, self.make_msg('remove_aggregate_host',
aggregate=aggregate_p, host=host_param,
slave_info=slave_info),
topic=_compute_topic(self.topic, ctxt, host, None),
version='2.15')
cctxt = self.client.prepare(server=host, version='2.15')
cctxt.cast(ctxt, 'remove_aggregate_host',
aggregate=aggregate_p, host=host_param,
slave_info=slave_info)
def remove_fixed_ip_from_instance(self, ctxt, instance, address):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('remove_fixed_ip_from_instance',
instance=instance_p, address=address),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'remove_fixed_ip_from_instance',
instance=instance_p, address=address)
def remove_volume_connection(self, ctxt, instance, volume_id, host):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt, self.make_msg('remove_volume_connection',
instance=instance_p, volume_id=volume_id),
topic=_compute_topic(self.topic, ctxt, host, None))
cctxt = self.client.prepare(server=host)
return cctxt.call(ctxt, 'remove_volume_connection',
instance=instance_p, volume_id=volume_id)
def rescue_instance(self, ctxt, instance, rescue_password):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('rescue_instance',
instance=instance_p,
rescue_password=rescue_password),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'rescue_instance',
instance=instance_p,
rescue_password=rescue_password)
def reset_network(self, ctxt, instance):
if self.can_send_version('2.40'):
if self.client.can_send_version('2.40'):
version = '2.40'
else:
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
version = '2.0'
self.cast(ctxt, self.make_msg('reset_network',
instance=instance),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'reset_network', instance=instance)
def resize_instance(self, ctxt, instance, migration, image, instance_type,
reservations=None):
topic = _compute_topic(self.topic, ctxt, None, instance)
instance_p = jsonutils.to_primitive(instance)
migration_p = jsonutils.to_primitive(migration)
instance_type_p = jsonutils.to_primitive(instance_type)
self.cast(ctxt, self.make_msg('resize_instance',
instance=instance_p, migration=migration_p,
image=image, reservations=reservations,
instance_type=instance_type_p), topic,
version='2.16')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.16')
cctxt.cast(ctxt, 'resize_instance',
instance=instance_p, migration=migration_p,
image=image, reservations=reservations,
instance_type=instance_type_p)
def resume_instance(self, ctxt, instance):
if self.can_send_version('2.33'):
if self.client.can_send_version('2.33'):
version = '2.33'
else:
version = '2.0'
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
self.cast(ctxt, self.make_msg('resume_instance',
instance=instance),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'resume_instance', instance=instance)
def revert_resize(self, ctxt, instance, migration, host,
reservations=None):
if self.can_send_version('2.39'):
if self.client.can_send_version('2.39'):
version = '2.39'
else:
instance = jsonutils.to_primitive(
@ -630,17 +609,17 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
migration = jsonutils.to_primitive(
objects_base.obj_to_primitive(migration))
version = '2.12'
self.cast(ctxt, self.make_msg('revert_resize',
instance=instance, migration=migration,
reservations=reservations),
topic=_compute_topic(self.topic, ctxt, host, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(host, instance),
version=version)
cctxt.cast(ctxt, 'revert_resize',
instance=instance, migration=migration,
reservations=reservations)
def rollback_live_migration_at_destination(self, ctxt, instance, host):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('rollback_live_migration_at_destination',
instance=instance_p),
topic=_compute_topic(self.topic, ctxt, host, None))
cctxt = self.client.prepare(server=host)
cctxt.cast(ctxt, 'rollback_live_migration_at_destination',
instance=instance_p)
def run_instance(self, ctxt, instance, host, request_spec,
filter_properties, requested_networks,
@ -654,55 +633,53 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'admin_password': admin_password,
'is_first_time': is_first_time, 'node': node}
if self.can_send_version('2.37'):
if self.client.can_send_version('2.37'):
version = '2.37'
msg_kwargs['legacy_bdm_in_spec'] = legacy_bdm_in_spec
else:
version = '2.19'
msg = self.make_msg('run_instance', **msg_kwargs)
self.cast(ctxt, msg,
topic=_compute_topic(self.topic, ctxt, host, None),
version=version)
cctxt = self.client.prepare(server=host, version=version)
cctxt.cast(ctxt, 'run_instance', **msg_kwargs)
def set_admin_password(self, ctxt, instance, new_pass):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt, self.make_msg('set_admin_password',
instance=instance_p, new_pass=new_pass),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
return cctxt.call(ctxt, 'set_admin_password',
instance=instance_p, new_pass=new_pass)
def set_host_enabled(self, ctxt, enabled, host):
topic = _compute_topic(self.topic, ctxt, host, None)
return self.call(ctxt, self.make_msg('set_host_enabled',
enabled=enabled), topic)
cctxt = self.client.prepare(server=host)
return cctxt.call(ctxt, 'set_host_enabled', enabled=enabled)
def swap_volume(self, ctxt, instance, old_volume_id, new_volume_id):
self.cast(ctxt, self.make_msg('swap_volume',
instance=instance, old_volume_id=old_volume_id,
new_volume_id=new_volume_id),
topic=_compute_topic(self.topic, ctxt, None, instance),
version='2.34')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.34')
cctxt.cast(ctxt, 'swap_volume',
instance=instance, old_volume_id=old_volume_id,
new_volume_id=new_volume_id)
def get_host_uptime(self, ctxt, host):
topic = _compute_topic(self.topic, ctxt, host, None)
return self.call(ctxt, self.make_msg('get_host_uptime'), topic)
cctxt = self.client.prepare(server=host)
return cctxt.call(ctxt, 'get_host_uptime')
def reserve_block_device_name(self, ctxt, instance, device, volume_id):
instance_p = jsonutils.to_primitive(instance)
return self.call(ctxt, self.make_msg('reserve_block_device_name',
instance=instance_p, device=device, volume_id=volume_id),
topic=_compute_topic(self.topic, ctxt, None, instance),
version='2.3')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.3')
return cctxt.call(ctxt, 'reserve_block_device_name',
instance=instance_p, device=device,
volume_id=volume_id)
def live_snapshot_instance(self, ctxt, instance, image_id):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('live_snapshot_instance',
instance=instance_p, image_id=image_id),
topic=_compute_topic(self.topic, ctxt, None, instance),
version='2.30')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.30')
cctxt.cast(ctxt, 'live_snapshot_instance',
instance=instance_p, image_id=image_id)
def backup_instance(self, ctxt, instance, image_id, backup_type,
rotation):
if self.can_send_version('2.42'):
if self.client.can_send_version('2.42'):
version = '2.42'
method = 'backup_instance'
extra_kwargs = dict()
@ -712,17 +689,17 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
method = 'snapshot_instance'
extra_kwargs = dict(image_type='backup')
version = '2.0'
self.cast(ctxt, self.make_msg(method,
instance=instance,
image_id=image_id,
backup_type=backup_type,
rotation=rotation,
**extra_kwargs),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, method,
instance=instance,
image_id=image_id,
backup_type=backup_type,
rotation=rotation,
**extra_kwargs)
def snapshot_instance(self, ctxt, instance, image_id):
if self.can_send_version('2.42'):
if self.client.can_send_version('2.42'):
version = '2.42'
extra_kwargs = dict()
else:
@ -732,135 +709,127 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
backup_type=None,
rotation=None)
version = '2.0'
self.cast(ctxt, self.make_msg('snapshot_instance',
instance=instance,
image_id=image_id,
**extra_kwargs),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'snapshot_instance',
instance=instance,
image_id=image_id,
**extra_kwargs)
def start_instance(self, ctxt, instance):
if self.can_send_version('2.29'):
if self.client.can_send_version('2.29'):
version = '2.29'
else:
version = '2.0'
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
self.cast(ctxt, self.make_msg('start_instance',
instance=instance),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'start_instance', instance=instance)
def stop_instance(self, ctxt, instance, do_cast=True):
if self.can_send_version('2.29'):
if self.client.can_send_version('2.29'):
version = '2.29'
else:
version = '2.0'
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
rpc_method = self.cast if do_cast else self.call
return rpc_method(ctxt, self.make_msg('stop_instance',
instance=instance),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
rpc_method = cctxt.cast if do_cast else cctxt.call
return rpc_method(ctxt, 'stop_instance', instance=instance)
def suspend_instance(self, ctxt, instance):
if self.can_send_version('2.33'):
if self.client.can_send_version('2.33'):
version = '2.33'
else:
version = '2.0'
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
self.cast(ctxt, self.make_msg('suspend_instance',
instance=instance),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'suspend_instance', instance=instance)
def terminate_instance(self, ctxt, instance, bdms, reservations=None):
if self.can_send_version('2.35'):
if self.client.can_send_version('2.35'):
version = '2.35'
else:
version = '2.27'
instance = jsonutils.to_primitive(instance)
bdms_p = jsonutils.to_primitive(bdms)
self.cast(ctxt, self.make_msg('terminate_instance',
instance=instance, bdms=bdms_p,
reservations=reservations),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'terminate_instance',
instance=instance, bdms=bdms_p,
reservations=reservations)
def unpause_instance(self, ctxt, instance):
if self.can_send_version('2.36'):
if self.client.can_send_version('2.36'):
version = '2.36'
else:
version = '2.0'
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
self.cast(ctxt, self.make_msg('unpause_instance',
instance=instance),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'unpause_instance', instance=instance)
def unrescue_instance(self, ctxt, instance):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('unrescue_instance',
instance=instance_p),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'unrescue_instance', instance=instance_p)
def soft_delete_instance(self, ctxt, instance, reservations=None):
if self.can_send_version('2.35'):
if self.client.can_send_version('2.35'):
version = '2.35'
else:
version = '2.27'
instance = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('soft_delete_instance',
instance=instance, reservations=reservations),
topic=_compute_topic(self.topic, ctxt, None, instance),
version=version)
cctxt = self.client.prepare(server=_compute_host(None, instance),
version=version)
cctxt.cast(ctxt, 'soft_delete_instance',
instance=instance, reservations=reservations)
def restore_instance(self, ctxt, instance):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('restore_instance',
instance=instance_p),
topic=_compute_topic(self.topic, ctxt, None, instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'restore_instance', instance=instance_p)
def shelve_instance(self, ctxt, instance, image_id=None):
self.cast(ctxt, self.make_msg('shelve_instance',
instance=instance, image_id=image_id),
topic=_compute_topic(self.topic, ctxt, None, instance),
version='2.31')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.31')
cctxt.cast(ctxt, 'shelve_instance',
instance=instance, image_id=image_id)
def shelve_offload_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('shelve_offload_instance',
instance=instance),
topic=_compute_topic(self.topic, ctxt, None, instance),
version='2.31')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.31')
cctxt.cast(ctxt, 'shelve_offload_instance', instance=instance)
def unshelve_instance(self, ctxt, instance, host, image=None):
self.cast(ctxt, self.make_msg('unshelve_instance',
instance=instance, image=image),
topic=_compute_topic(self.topic, ctxt, host, None),
version='2.31')
cctxt = self.client.prepare(server=host, version='2.31')
cctxt.cast(ctxt, 'unshelve_instance',
instance=instance, image=image)
def volume_snapshot_create(self, ctxt, instance, volume_id,
create_info):
instance_p = jsonutils.to_primitive(instance)
msg = self.make_msg('volume_snapshot_create', instance=instance_p,
volume_id=volume_id, create_info=create_info)
topic = _compute_topic(self.topic, ctxt, None, instance)
self.cast(ctxt, msg, topic=topic, version='2.44')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.44')
cctxt.cast(ctxt, 'volume_snapshot_create', instance=instance_p,
volume_id=volume_id, create_info=create_info)
def volume_snapshot_delete(self, ctxt, instance, volume_id, snapshot_id,
delete_info):
instance_p = jsonutils.to_primitive(instance)
msg = self.make_msg('volume_snapshot_delete', instance=instance_p,
volume_id=volume_id, snapshot_id=snapshot_id,
delete_info=delete_info)
topic = _compute_topic(self.topic, ctxt, None, instance)
self.cast(ctxt, msg, topic=topic, version='2.44')
cctxt = self.client.prepare(server=_compute_host(None, instance),
version='2.44')
cctxt.cast(ctxt, 'volume_snapshot_delete', instance=instance_p,
volume_id=volume_id, snapshot_id=snapshot_id,
delete_info=delete_info)
class SecurityGroupAPI(nova.openstack.common.rpc.proxy.RpcProxy):
class SecurityGroupAPI(rpcclient.RpcProxy):
'''Client side of the security group rpc API.
API version history:
@ -885,21 +854,21 @@ class SecurityGroupAPI(nova.openstack.common.rpc.proxy.RpcProxy):
super(SecurityGroupAPI, self).__init__(
topic=CONF.compute_topic,
default_version=self.BASE_RPC_API_VERSION)
self.client = self.get_client()
def refresh_security_group_rules(self, ctxt, security_group_id, host):
self.cast(ctxt, self.make_msg('refresh_security_group_rules',
security_group_id=security_group_id),
topic=_compute_topic(self.topic, ctxt, host, None))
cctxt = self.client.prepare(server=host)
cctxt.cast(ctxt, 'refresh_security_group_rules',
security_group_id=security_group_id)
def refresh_security_group_members(self, ctxt, security_group_id,
host):
self.cast(ctxt, self.make_msg('refresh_security_group_members',
security_group_id=security_group_id),
topic=_compute_topic(self.topic, ctxt, host, None))
cctxt = self.client.prepare(server=host)
cctxt.cast(ctxt, 'refresh_security_group_members',
security_group_id=security_group_id)
def refresh_instance_security_rules(self, ctxt, host, instance):
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('refresh_instance_security_rules',
instance=instance_p),
topic=_compute_topic(self.topic, ctxt, instance['host'],
instance))
cctxt = self.client.prepare(server=_compute_host(None, instance))
cctxt.cast(ctxt, 'refresh_instance_security_rules',
instance=instance_p)

View File

@ -19,7 +19,7 @@ from oslo.config import cfg
from nova.objects import base as objects_base
from nova.openstack.common import jsonutils
from nova.openstack.common.rpc import common as rpc_common
import nova.openstack.common.rpc.proxy
from nova import rpcclient
CONF = cfg.CONF
@ -28,7 +28,7 @@ rpcapi_cap_opt = cfg.StrOpt('conductor',
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
class ConductorAPI(rpcclient.RpcProxy):
"""Client side of the conductor RPC API
API version history:
@ -125,89 +125,94 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
default_version=self.BASE_RPC_API_VERSION,
serializer=objects_base.NovaObjectSerializer(),
version_cap=version_cap)
self.client = self.get_client()
def instance_update(self, context, instance_uuid, updates,
service=None):
updates_p = jsonutils.to_primitive(updates)
return self.call(context,
self.make_msg('instance_update',
instance_uuid=instance_uuid,
updates=updates_p,
service=service),
version='1.38')
cctxt = self.client.prepare(version='1.38')
return cctxt.call(context, 'instance_update',
instance_uuid=instance_uuid,
updates=updates_p,
service=service)
def instance_get(self, context, instance_id):
msg = self.make_msg('instance_get',
instance_id=instance_id)
return self.call(context, msg, version='1.24')
cctxt = self.client.prepare(version='1.24')
return cctxt.call(context, 'instance_get', instance_id=instance_id)
def instance_get_by_uuid(self, context, instance_uuid,
columns_to_join=None):
if self.can_send_version('1.49'):
if self.client.can_send_version('1.49'):
version = '1.49'
msg = self.make_msg('instance_get_by_uuid',
instance_uuid=instance_uuid,
columns_to_join=columns_to_join)
kwargs = {'instance_uuid': instance_uuid,
'columns_to_join': columns_to_join}
else:
version = '1.2'
msg = self.make_msg('instance_get_by_uuid',
instance_uuid=instance_uuid)
return self.call(context, msg, version=version)
kwargs = {'instance_uuid': instance_uuid}
cctxt = self.client.prepare(version=version)
return cctxt.call(context, 'instance_get_by_uuid', **kwargs)
def migration_get(self, context, migration_id):
msg = self.make_msg('migration_get', migration_id=migration_id)
return self.call(context, msg, version='1.4')
cctxt = self.client.prepare(version='1.4')
return cctxt.call(context, 'migration_get', migration_id=migration_id)
def migration_get_in_progress_by_host_and_node(self, context,
host, node):
msg = self.make_msg('migration_get_in_progress_by_host_and_node',
host=host, node=node)
return self.call(context, msg, version='1.31')
cctxt = self.client.prepare(version='1.31')
return cctxt.call(context,
'migration_get_in_progress_by_host_and_node',
host=host, node=node)
def migration_update(self, context, migration, status):
migration_p = jsonutils.to_primitive(migration)
msg = self.make_msg('migration_update', migration=migration_p,
status=status)
return self.call(context, msg, version='1.1')
cctxt = self.client.prepare(version='1.1')
return cctxt.call(context, 'migration_update',
migration=migration_p,
status=status)
def aggregate_host_add(self, context, aggregate, host):
aggregate_p = jsonutils.to_primitive(aggregate)
msg = self.make_msg('aggregate_host_add', aggregate=aggregate_p,
host=host)
return self.call(context, msg, version='1.3')
cctxt = self.client.prepare(version='1.3')
return cctxt.call(context, 'aggregate_host_add',
aggregate=aggregate_p,
host=host)
def aggregate_host_delete(self, context, aggregate, host):
aggregate_p = jsonutils.to_primitive(aggregate)
msg = self.make_msg('aggregate_host_delete', aggregate=aggregate_p,
host=host)
return self.call(context, msg, version='1.3')
cctxt = self.client.prepare(version='1.3')
return cctxt.call(context, 'aggregate_host_delete',
aggregate=aggregate_p,
host=host)
def aggregate_get(self, context, aggregate_id):
msg = self.make_msg('aggregate_get', aggregate_id=aggregate_id)
return self.call(context, msg, version='1.11')
cctxt = self.client.prepare(version='1.11')
return cctxt.call(context, 'aggregate_get', aggregate_id=aggregate_id)
def aggregate_get_by_host(self, context, host, key=None):
msg = self.make_msg('aggregate_get_by_host', host=host, key=key)
return self.call(context, msg, version='1.7')
cctxt = self.client.prepare(version='1.7')
return cctxt.call(context, 'aggregate_get_by_host', host=host, key=key)
def aggregate_metadata_add(self, context, aggregate, metadata,
set_delete=False):
aggregate_p = jsonutils.to_primitive(aggregate)
msg = self.make_msg('aggregate_metadata_add', aggregate=aggregate_p,
metadata=metadata,
set_delete=set_delete)
return self.call(context, msg, version='1.7')
cctxt = self.client.prepare(version='1.7')
return cctxt.call(context, 'aggregate_metadata_add',
aggregate=aggregate_p,
metadata=metadata,
set_delete=set_delete)
def aggregate_metadata_delete(self, context, aggregate, key):
aggregate_p = jsonutils.to_primitive(aggregate)
msg = self.make_msg('aggregate_metadata_delete', aggregate=aggregate_p,
key=key)
return self.call(context, msg, version='1.7')
cctxt = self.client.prepare(version='1.7')
return cctxt.call(context, 'aggregate_metadata_delete',
aggregate=aggregate_p,
key=key)
def aggregate_metadata_get_by_host(self, context, host, key):
msg = self.make_msg('aggregate_metadata_get_by_host', host=host,
key=key)
return self.call(context, msg, version='1.42')
cctxt = self.client.prepare(version='1.42')
return cctxt.call(context, 'aggregate_metadata_get_by_host',
host=host,
key=key)
def bw_usage_update(self, context, uuid, mac, start_period,
bw_in=None, bw_out=None,
@ -218,197 +223,202 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
last_ctr_out=last_ctr_out,
last_refreshed=last_refreshed)
if self.can_send_version('1.54'):
if self.client.can_send_version('1.54'):
version = '1.54'
msg_kwargs['update_cells'] = update_cells
else:
version = '1.5'
msg = self.make_msg('bw_usage_update', **msg_kwargs)
return self.call(context, msg, version=version)
cctxt = self.client.prepare(version=version)
return cctxt.call(context, 'bw_usage_update', **msg_kwargs)
def security_group_get_by_instance(self, context, instance):
instance_p = jsonutils.to_primitive(instance)
msg = self.make_msg('security_group_get_by_instance',
instance=instance_p)
return self.call(context, msg, version='1.8')
cctxt = self.client.prepare(version='1.8')
return cctxt.call(context, 'security_group_get_by_instance',
instance=instance_p)
def security_group_rule_get_by_security_group(self, context, secgroup):
secgroup_p = jsonutils.to_primitive(secgroup)
msg = self.make_msg('security_group_rule_get_by_security_group',
secgroup=secgroup_p)
return self.call(context, msg, version='1.8')
cctxt = self.client.prepare(version='1.8')
return cctxt.call(context, 'security_group_rule_get_by_security_group',
secgroup=secgroup_p)
def provider_fw_rule_get_all(self, context):
msg = self.make_msg('provider_fw_rule_get_all')
return self.call(context, msg, version='1.9')
cctxt = self.client.prepare(version='1.9')
return cctxt.call(context, 'provider_fw_rule_get_all')
def agent_build_get_by_triple(self, context, hypervisor, os, architecture):
msg = self.make_msg('agent_build_get_by_triple',
hypervisor=hypervisor, os=os,
architecture=architecture)
return self.call(context, msg, version='1.10')
cctxt = self.client.prepare(version='1.10')
return cctxt.call(context, 'agent_build_get_by_triple',
hypervisor=hypervisor, os=os,
architecture=architecture)
def block_device_mapping_update_or_create(self, context, values,
create=None):
msg = self.make_msg('block_device_mapping_update_or_create',
values=values, create=create)
return self.call(context, msg, version='1.12')
cctxt = self.client.prepare(version='1.12')
return cctxt.call(context, 'block_device_mapping_update_or_create',
values=values, create=create)
def block_device_mapping_get_all_by_instance(self, context, instance,
legacy=True):
instance_p = jsonutils.to_primitive(instance)
if self.can_send_version('1.51'):
if self.client.can_send_version('1.51'):
version = '1.51'
msg = self.make_msg('block_device_mapping_get_all_by_instance',
instance=instance_p, legacy=legacy)
kwargs = {'legacy': legacy}
elif legacy:
# If the remote side is >= 1.51, it defaults to legacy=True.
# If it's older, it only understands the legacy format.
version = '1.13'
msg = self.make_msg('block_device_mapping_get_all_by_instance',
instance=instance_p)
kwargs = {}
else:
# If we require new style data, but can't ask for it, then we must
# fail here.
raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
return self.call(context, msg, version=version)
cctxt = self.client.prepare(version=version)
return cctxt.call(context, 'block_device_mapping_get_all_by_instance',
instance=instance_p, **kwargs)
def block_device_mapping_destroy(self, context, bdms=None,
instance=None, volume_id=None,
device_name=None):
bdms_p = jsonutils.to_primitive(bdms)
instance_p = jsonutils.to_primitive(instance)
msg = self.make_msg('block_device_mapping_destroy',
bdms=bdms_p,
instance=instance_p, volume_id=volume_id,
device_name=device_name)
return self.call(context, msg, version='1.14')
cctxt = self.client.prepare(version='1.14')
return cctxt.call(context, 'block_device_mapping_destroy',
bdms=bdms_p, instance=instance_p,
volume_id=volume_id, device_name=device_name)
def instance_get_all_by_filters(self, context, filters, sort_key,
sort_dir, columns_to_join=None):
msg = self.make_msg('instance_get_all_by_filters',
filters=filters, sort_key=sort_key,
sort_dir=sort_dir, columns_to_join=columns_to_join)
return self.call(context, msg, version='1.47')
cctxt = self.client.prepare(version='1.47')
return cctxt.call(context, 'instance_get_all_by_filters',
filters=filters, sort_key=sort_key,
sort_dir=sort_dir, columns_to_join=columns_to_join)
def instance_get_active_by_window_joined(self, context, begin, end=None,
project_id=None, host=None):
msg = self.make_msg('instance_get_active_by_window_joined',
begin=begin, end=end, project_id=project_id,
host=host)
return self.call(context, msg, version='1.35')
cctxt = self.client.prepare(version='1.35')
return cctxt.call(context, 'instance_get_active_by_window_joined',
begin=begin, end=end, project_id=project_id,
host=host)
def instance_destroy(self, context, instance):
instance_p = jsonutils.to_primitive(instance)
msg = self.make_msg('instance_destroy', instance=instance_p)
self.call(context, msg, version='1.16')
cctxt = self.client.prepare(version='1.16')
cctxt.call(context, 'instance_destroy', instance=instance_p)
def instance_info_cache_delete(self, context, instance):
instance_p = jsonutils.to_primitive(instance)
msg = self.make_msg('instance_info_cache_delete', instance=instance_p)
self.call(context, msg, version='1.17')
cctxt = self.client.prepare(version='1.17')
cctxt.call(context, 'instance_info_cache_delete', instance=instance_p)
def instance_type_get(self, context, instance_type_id):
msg = self.make_msg('instance_type_get',
instance_type_id=instance_type_id)
return self.call(context, msg, version='1.18')
cctxt = self.client.prepare(version='1.18')
return cctxt.call(context, 'instance_type_get',
instance_type_id=instance_type_id)
def vol_get_usage_by_time(self, context, start_time):
start_time_p = jsonutils.to_primitive(start_time)
msg = self.make_msg('vol_get_usage_by_time', start_time=start_time_p)
return self.call(context, msg, version='1.19')
cctxt = self.client.prepare(version='1.19')
return cctxt.call(context, 'vol_get_usage_by_time',
start_time=start_time_p)
def vol_usage_update(self, context, vol_id, rd_req, rd_bytes, wr_req,
wr_bytes, instance, last_refreshed=None,
update_totals=False):
instance_p = jsonutils.to_primitive(instance)
msg = self.make_msg('vol_usage_update', vol_id=vol_id, rd_req=rd_req,
rd_bytes=rd_bytes, wr_req=wr_req,
wr_bytes=wr_bytes,
instance=instance_p, last_refreshed=last_refreshed,
update_totals=update_totals)
return self.call(context, msg, version='1.19')
cctxt = self.client.prepare(version='1.19')
return cctxt.call(context, 'vol_usage_update',
vol_id=vol_id, rd_req=rd_req,
rd_bytes=rd_bytes, wr_req=wr_req,
wr_bytes=wr_bytes,
instance=instance_p, last_refreshed=last_refreshed,
update_totals=update_totals)
def service_get_all_by(self, context, topic=None, host=None, binary=None):
msg = self.make_msg('service_get_all_by', topic=topic, host=host,
binary=binary)
return self.call(context, msg, version='1.28')
cctxt = self.client.prepare(version='1.28')
return cctxt.call(context, 'service_get_all_by',
topic=topic, host=host, binary=binary)
def instance_get_all_by_host(self, context, host, node=None,
columns_to_join=None):
msg = self.make_msg('instance_get_all_by_host', host=host, node=node,
columns_to_join=columns_to_join)
return self.call(context, msg, version='1.47')
cctxt = self.client.prepare(version='1.47')
return cctxt.call(context, 'instance_get_all_by_host',
host=host, node=node,
columns_to_join=columns_to_join)
def instance_fault_create(self, context, values):
msg = self.make_msg('instance_fault_create', values=values)
return self.call(context, msg, version='1.36')
cctxt = self.client.prepare(version='1.36')
return cctxt.call(context, 'instance_fault_create', values=values)
def action_event_start(self, context, values):
values_p = jsonutils.to_primitive(values)
msg = self.make_msg('action_event_start', values=values_p)
return self.call(context, msg, version='1.25')
cctxt = self.client.prepare(version='1.25')
return cctxt.call(context, 'action_event_start', values=values_p)
def action_event_finish(self, context, values):
values_p = jsonutils.to_primitive(values)
msg = self.make_msg('action_event_finish', values=values_p)
return self.call(context, msg, version='1.25')
cctxt = self.client.prepare(version='1.25')
return cctxt.call(context, 'action_event_finish', values=values_p)
def instance_info_cache_update(self, context, instance, values):
instance_p = jsonutils.to_primitive(instance)
msg = self.make_msg('instance_info_cache_update',
instance=instance_p,
values=values)
return self.call(context, msg, version='1.26')
cctxt = self.client.prepare(version='1.26')
return cctxt.call(context, 'instance_info_cache_update',
instance=instance_p, values=values)
def service_create(self, context, values):
msg = self.make_msg('service_create', values=values)
return self.call(context, msg, version='1.27')
cctxt = self.client.prepare(version='1.27')
return cctxt.call(context, 'service_create', values=values)
def service_destroy(self, context, service_id):
msg = self.make_msg('service_destroy', service_id=service_id)
return self.call(context, msg, version='1.29')
cctxt = self.client.prepare(version='1.29')
return cctxt.call(context, 'service_destroy', service_id=service_id)
def compute_node_create(self, context, values):
msg = self.make_msg('compute_node_create', values=values)
return self.call(context, msg, version='1.33')
cctxt = self.client.prepare(version='1.33')
return cctxt.call(context, 'compute_node_create', values=values)
def compute_node_update(self, context, node, values, prune_stats=False):
node_p = jsonutils.to_primitive(node)
msg = self.make_msg('compute_node_update', node=node_p, values=values,
prune_stats=prune_stats)
return self.call(context, msg, version='1.33')
cctxt = self.client.prepare(version='1.33')
return cctxt.call(context, 'compute_node_update',
node=node_p, values=values,
prune_stats=prune_stats)
def compute_node_delete(self, context, node):
node_p = jsonutils.to_primitive(node)
msg = self.make_msg('compute_node_delete', node=node_p)
return self.call(context, msg, version='1.44')
cctxt = self.client.prepare(version='1.44')
return cctxt.call(context, 'compute_node_delete', node=node_p)
def service_update(self, context, service, values):
service_p = jsonutils.to_primitive(service)
msg = self.make_msg('service_update', service=service_p, values=values)
return self.call(context, msg, version='1.34')
cctxt = self.client.prepare(version='1.34')
return cctxt.call(context, 'service_update',
service=service_p, values=values)
def task_log_get(self, context, task_name, begin, end, host, state=None):
msg = self.make_msg('task_log_get', task_name=task_name,
begin=begin, end=end, host=host, state=state)
return self.call(context, msg, version='1.37')
cctxt = self.client.prepare(version='1.37')
return cctxt.call(context, 'task_log_get',
task_name=task_name, begin=begin, end=end,
host=host, state=state)
def task_log_begin_task(self, context, task_name, begin, end, host,
task_items=None, message=None):
msg = self.make_msg('task_log_begin_task', task_name=task_name,
begin=begin, end=end, host=host,
task_items=task_items, message=message)
return self.call(context, msg, version='1.37')
cctxt = self.client.prepare(version='1.37')
return cctxt.call(context, 'task_log_begin_task',
task_name=task_name,
begin=begin, end=end, host=host,
task_items=task_items, message=message)
def task_log_end_task(self, context, task_name, begin, end, host, errors,
message=None):
msg = self.make_msg('task_log_end_task', task_name=task_name,
begin=begin, end=end, host=host, errors=errors,
message=message)
return self.call(context, msg, version='1.37')
cctxt = self.client.prepare(version='1.37')
return cctxt.call(context, 'task_log_end_task',
task_name=task_name, begin=begin, end=end,
host=host, errors=errors, message=message)
def notify_usage_exists(self, context, instance, current_period=False,
ignore_missing_network_data=True,
@ -416,76 +426,81 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
instance_p = jsonutils.to_primitive(instance)
system_metadata_p = jsonutils.to_primitive(system_metadata)
extra_usage_info_p = jsonutils.to_primitive(extra_usage_info)
msg = self.make_msg('notify_usage_exists', instance=instance_p,
current_period=current_period,
ignore_missing_network_data=ignore_missing_network_data,
system_metadata=system_metadata_p,
extra_usage_info=extra_usage_info_p)
return self.call(context, msg, version='1.39')
cctxt = self.client.prepare(version='1.39')
return cctxt.call(
context, 'notify_usage_exists',
instance=instance_p,
current_period=current_period,
ignore_missing_network_data=ignore_missing_network_data,
system_metadata=system_metadata_p,
extra_usage_info=extra_usage_info_p)
def security_groups_trigger_handler(self, context, event, args):
args_p = jsonutils.to_primitive(args)
msg = self.make_msg('security_groups_trigger_handler', event=event,
args=args_p)
return self.call(context, msg, version='1.40')
cctxt = self.client.prepare(version='1.40')
return cctxt.call(context, 'security_groups_trigger_handler',
event=event, args=args_p)
def security_groups_trigger_members_refresh(self, context, group_ids):
msg = self.make_msg('security_groups_trigger_members_refresh',
group_ids=group_ids)
return self.call(context, msg, version='1.40')
cctxt = self.client.prepare(version='1.40')
return cctxt.call(context, 'security_groups_trigger_members_refresh',
group_ids=group_ids)
def network_migrate_instance_start(self, context, instance, migration):
instance_p = jsonutils.to_primitive(instance)
migration_p = jsonutils.to_primitive(migration)
msg = self.make_msg('network_migrate_instance_start',
instance=instance_p, migration=migration_p)
return self.call(context, msg, version='1.41')
cctxt = self.client.prepare(version='1.41')
return cctxt.call(context, 'network_migrate_instance_start',
instance=instance_p, migration=migration_p)
def network_migrate_instance_finish(self, context, instance, migration):
instance_p = jsonutils.to_primitive(instance)
migration_p = jsonutils.to_primitive(migration)
msg = self.make_msg('network_migrate_instance_finish',
instance=instance_p, migration=migration_p)
return self.call(context, msg, version='1.41')
cctxt = self.client.prepare(version='1.41')
return cctxt.call(context, 'network_migrate_instance_finish',
instance=instance_p, migration=migration_p)
def quota_commit(self, context, reservations, project_id=None,
user_id=None):
reservations_p = jsonutils.to_primitive(reservations)
msg = self.make_msg('quota_commit', reservations=reservations_p,
project_id=project_id, user_id=user_id)
return self.call(context, msg, version='1.45')
cctxt = self.client.prepare(version='1.45')
return cctxt.call(context, 'quota_commit',
reservations=reservations_p,
project_id=project_id, user_id=user_id)
def quota_rollback(self, context, reservations, project_id=None,
user_id=None):
reservations_p = jsonutils.to_primitive(reservations)
msg = self.make_msg('quota_rollback', reservations=reservations_p,
project_id=project_id, user_id=user_id)
return self.call(context, msg, version='1.45')
cctxt = self.client.prepare(version='1.45')
return cctxt.call(context, 'quota_rollback',
reservations=reservations_p,
project_id=project_id, user_id=user_id)
def get_ec2_ids(self, context, instance):
instance_p = jsonutils.to_primitive(instance)
msg = self.make_msg('get_ec2_ids', instance=instance_p)
return self.call(context, msg, version='1.42')
cctxt = self.client.prepare(version='1.42')
return cctxt.call(context, 'get_ec2_ids',
instance=instance_p)
def compute_unrescue(self, context, instance):
instance_p = jsonutils.to_primitive(instance)
msg = self.make_msg('compute_unrescue', instance=instance_p)
return self.call(context, msg, version='1.48')
cctxt = self.client.prepare(version='1.48')
return cctxt.call(context, 'compute_unrescue', instance=instance_p)
def object_class_action(self, context, objname, objmethod, objver,
args, kwargs):
msg = self.make_msg('object_class_action', objname=objname,
objmethod=objmethod, objver=objver,
args=args, kwargs=kwargs)
return self.call(context, msg, version='1.50')
cctxt = self.client.prepare(version='1.50')
return cctxt.call(context, 'object_class_action',
objname=objname, objmethod=objmethod,
objver=objver, args=args, kwargs=kwargs)
def object_action(self, context, objinst, objmethod, args, kwargs):
msg = self.make_msg('object_action', objinst=objinst,
objmethod=objmethod, args=args, kwargs=kwargs)
return self.call(context, msg, version='1.50')
cctxt = self.client.prepare(version='1.50')
return cctxt.call(context, 'object_action', objinst=objinst,
objmethod=objmethod, args=args, kwargs=kwargs)
class ComputeTaskAPI(nova.openstack.common.rpc.proxy.RpcProxy):
class ComputeTaskAPI(rpcclient.RpcProxy):
"""Client side of the conductor 'compute' namespaced RPC API
API version history:
@ -507,37 +522,42 @@ class ComputeTaskAPI(nova.openstack.common.rpc.proxy.RpcProxy):
topic=CONF.conductor.topic,
default_version=self.BASE_RPC_API_VERSION,
serializer=objects_base.NovaObjectSerializer())
self.client = self.get_client(namespace=self.RPC_API_NAMESPACE)
def migrate_server(self, context, instance, scheduler_hint, live, rebuild,
flavor, block_migration, disk_over_commit,
reservations=None):
if self.can_send_version('1.6'):
if self.client.can_send_version('1.6'):
version = '1.6'
else:
instance = jsonutils.to_primitive(
objects_base.obj_to_primitive(instance))
version = '1.4'
flavor_p = jsonutils.to_primitive(flavor)
msg = self.make_msg('migrate_server', instance=instance,
scheduler_hint=scheduler_hint, live=live, rebuild=rebuild,
flavor=flavor_p, block_migration=block_migration,
disk_over_commit=disk_over_commit, reservations=reservations)
return self.call(context, msg, version=version)
cctxt = self.client.prepare(version=version)
return cctxt.call(context, 'migrate_server',
instance=instance, scheduler_hint=scheduler_hint,
live=live, rebuild=rebuild, flavor=flavor_p,
block_migration=block_migration,
disk_over_commit=disk_over_commit,
reservations=reservations)
def build_instances(self, context, instances, image, filter_properties,
admin_password, injected_files, requested_networks,
security_groups, block_device_mapping, legacy_bdm=True):
instances_p = [jsonutils.to_primitive(inst) for inst in instances]
image_p = jsonutils.to_primitive(image)
msg = self.make_msg('build_instances', instances=instances_p,
image=image_p, filter_properties=filter_properties,
admin_password=admin_password, injected_files=injected_files,
requested_networks=requested_networks,
security_groups=security_groups,
block_device_mapping=block_device_mapping,
legacy_bdm=legacy_bdm)
self.cast(context, msg, version='1.5')
cctxt = self.client.prepare(version='1.5')
cctxt.cast(context, 'build_instances',
instances=instances_p, image=image_p,
filter_properties=filter_properties,
admin_password=admin_password,
injected_files=injected_files,
requested_networks=requested_networks,
security_groups=security_groups,
block_device_mapping=block_device_mapping,
legacy_bdm=legacy_bdm)
def unshelve_instance(self, context, instance):
msg = self.make_msg('unshelve_instance', instance=instance)
self.cast(context, msg, version='1.3')
cctxt = self.client.prepare(version='1.3')
cctxt.cast(context, 'unshelve_instance', instance=instance)

View File

@ -20,7 +20,7 @@ Client side of the console RPC API.
from oslo.config import cfg
import nova.openstack.common.rpc.proxy
from nova import rpcclient
rpcapi_opts = [
cfg.StrOpt('console_topic',
@ -36,7 +36,7 @@ rpcapi_cap_opt = cfg.StrOpt('console',
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
class ConsoleAPI(nova.openstack.common.rpc.proxy.RpcProxy):
class ConsoleAPI(rpcclient.RpcProxy):
'''Client side of the console rpc API.
API version history:
@ -71,9 +71,10 @@ class ConsoleAPI(nova.openstack.common.rpc.proxy.RpcProxy):
topic=topic,
default_version=self.BASE_RPC_API_VERSION,
version_cap=version_cap)
self.client = self.get_client()
def add_console(self, ctxt, instance_id):
self.cast(ctxt, self.make_msg('add_console', instance_id=instance_id))
self.client.cast(ctxt, 'add_console', instance_id=instance_id)
def remove_console(self, ctxt, console_id):
self.cast(ctxt, self.make_msg('remove_console', console_id=console_id))
self.client.cast(ctxt, 'remove_console', console_id=console_id)

View File

@ -20,7 +20,7 @@ Client side of the consoleauth RPC API.
from oslo.config import cfg
import nova.openstack.common.rpc.proxy
from nova import rpcclient
CONF = cfg.CONF
@ -29,7 +29,7 @@ rpcapi_cap_opt = cfg.StrOpt('consoleauth',
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
class ConsoleAuthAPI(nova.openstack.common.rpc.proxy.RpcProxy):
class ConsoleAuthAPI(rpcclient.RpcProxy):
'''Client side of the consoleauth rpc API.
API version history:
@ -65,24 +65,25 @@ class ConsoleAuthAPI(nova.openstack.common.rpc.proxy.RpcProxy):
topic=CONF.consoleauth_topic,
default_version=self.BASE_RPC_API_VERSION,
version_cap=version_cap)
self.client = self.get_client()
def authorize_console(self, ctxt, token, console_type, host, port,
internal_access_path, instance_uuid=None):
# The remote side doesn't return anything, but we want to block
# until it completes.
return self.call(ctxt,
self.make_msg('authorize_console',
token=token, console_type=console_type,
host=host, port=port,
internal_access_path=internal_access_path,
instance_uuid=instance_uuid),
version="1.2")
cctxt = self.client.prepare(version='1.2')
return cctxt.call(ctxt,
'authorize_console',
token=token, console_type=console_type,
host=host, port=port,
internal_access_path=internal_access_path,
instance_uuid=instance_uuid)
def check_token(self, ctxt, token):
return self.call(ctxt, self.make_msg('check_token', token=token))
return self.client.call(ctxt, 'check_token', token=token)
def delete_tokens_for_instance(self, ctxt, instance_uuid):
return self.cast(ctxt,
self.make_msg('delete_tokens_for_instance',
instance_uuid=instance_uuid),
version="1.2")
cctxt = self.client.prepare(version='1.2')
return cctxt.cast(ctxt,
'delete_tokens_for_instance',
instance_uuid=instance_uuid)

View File

@ -21,8 +21,7 @@ Client side of the network RPC API.
from oslo.config import cfg
from nova.openstack.common import jsonutils
from nova.openstack.common import rpc
from nova.openstack.common.rpc import proxy as rpc_proxy
from nova import rpcclient
rpcapi_opts = [
cfg.StrOpt('network_topic',
@ -42,7 +41,7 @@ rpcapi_cap_opt = cfg.StrOpt('network',
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
class NetworkAPI(rpc_proxy.RpcProxy):
class NetworkAPI(rpcclient.RpcProxy):
'''Client side of the network rpc API.
API version history:
@ -87,197 +86,202 @@ class NetworkAPI(rpc_proxy.RpcProxy):
topic=topic,
default_version=self.BASE_RPC_API_VERSION,
version_cap=version_cap)
self.client = self.get_client()
def get_all_networks(self, ctxt):
return self.call(ctxt, self.make_msg('get_all_networks'))
return self.client.call(ctxt, 'get_all_networks')
def get_network(self, ctxt, network_uuid):
return self.call(ctxt, self.make_msg('get_network',
network_uuid=network_uuid))
return self.client.call(ctxt, 'get_network',
network_uuid=network_uuid)
# TODO(russellb): Convert this to named arguments. It's a pretty large
# list, so unwinding it all is probably best done in its own patch so it's
# easier to review.
def create_networks(self, ctxt, **kwargs):
return self.call(ctxt, self.make_msg('create_networks', **kwargs))
return self.client.call(ctxt, 'create_networks', **kwargs)
def delete_network(self, ctxt, uuid, fixed_range):
return self.call(ctxt, self.make_msg('delete_network',
uuid=uuid, fixed_range=fixed_range))
return self.client.call(ctxt, 'delete_network',
uuid=uuid, fixed_range=fixed_range)
def disassociate_network(self, ctxt, network_uuid):
return self.call(ctxt, self.make_msg('disassociate_network',
network_uuid=network_uuid))
return self.client.call(ctxt, 'disassociate_network',
network_uuid=network_uuid)
def get_fixed_ip(self, ctxt, id):
return self.call(ctxt, self.make_msg('get_fixed_ip', id=id))
return self.client.call(ctxt, 'get_fixed_ip', id=id)
def get_fixed_ip_by_address(self, ctxt, address):
return self.call(ctxt, self.make_msg('get_fixed_ip_by_address',
address=address))
return self.client.call(ctxt, 'get_fixed_ip_by_address',
address=address)
def get_floating_ip(self, ctxt, id):
return self.call(ctxt, self.make_msg('get_floating_ip', id=id))
return self.client.call(ctxt, 'get_floating_ip', id=id)
def get_floating_ip_pools(self, ctxt):
return self.call(ctxt, self.make_msg('get_floating_ip_pools'),
version="1.7")
cctxt = self.client.prepare(version="1.7")
return cctxt.call(ctxt, 'get_floating_ip_pools')
def get_floating_ip_by_address(self, ctxt, address):
return self.call(ctxt, self.make_msg('get_floating_ip_by_address',
address=address))
return self.client.call(ctxt, 'get_floating_ip_by_address',
address=address)
def get_floating_ips_by_project(self, ctxt):
return self.call(ctxt, self.make_msg('get_floating_ips_by_project'))
return self.client.call(ctxt, 'get_floating_ips_by_project')
def get_floating_ips_by_fixed_address(self, ctxt, fixed_address):
return self.call(ctxt, self.make_msg(
'get_floating_ips_by_fixed_address',
fixed_address=fixed_address))
return self.client.call(ctxt, 'get_floating_ips_by_fixed_address',
fixed_address=fixed_address)
def get_instance_id_by_floating_address(self, ctxt, address):
return self.call(ctxt, self.make_msg(
'get_instance_id_by_floating_address',
address=address))
return self.client.call(ctxt, 'get_instance_id_by_floating_address',
address=address)
def get_vifs_by_instance(self, ctxt, instance_id):
# NOTE(vish): When the db calls are converted to store network
# data by instance_uuid, this should pass uuid instead.
return self.call(ctxt, self.make_msg('get_vifs_by_instance',
instance_id=instance_id))
return self.client.call(ctxt, 'get_vifs_by_instance',
instance_id=instance_id)
def get_vif_by_mac_address(self, ctxt, mac_address):
return self.call(ctxt, self.make_msg('get_vif_by_mac_address',
mac_address=mac_address))
return self.client.call(ctxt, 'get_vif_by_mac_address',
mac_address=mac_address)
def allocate_floating_ip(self, ctxt, project_id, pool, auto_assigned):
return self.call(ctxt, self.make_msg('allocate_floating_ip',
project_id=project_id, pool=pool, auto_assigned=auto_assigned))
return self.client.call(ctxt, 'allocate_floating_ip',
project_id=project_id, pool=pool,
auto_assigned=auto_assigned)
def deallocate_floating_ip(self, ctxt, address, affect_auto_assigned):
return self.call(ctxt, self.make_msg('deallocate_floating_ip',
address=address, affect_auto_assigned=affect_auto_assigned))
return self.client.call(ctxt, 'deallocate_floating_ip',
address=address,
affect_auto_assigned=affect_auto_assigned)
def associate_floating_ip(self, ctxt, floating_address, fixed_address,
affect_auto_assigned):
return self.call(ctxt, self.make_msg('associate_floating_ip',
floating_address=floating_address, fixed_address=fixed_address,
affect_auto_assigned=affect_auto_assigned))
return self.client.call(ctxt, 'associate_floating_ip',
floating_address=floating_address,
fixed_address=fixed_address,
affect_auto_assigned=affect_auto_assigned)
def disassociate_floating_ip(self, ctxt, address, affect_auto_assigned):
return self.call(ctxt, self.make_msg('disassociate_floating_ip',
address=address, affect_auto_assigned=affect_auto_assigned))
return self.client.call(ctxt, 'disassociate_floating_ip',
address=address,
affect_auto_assigned=affect_auto_assigned)
def allocate_for_instance(self, ctxt, instance_id, project_id, host,
rxtx_factor, vpn, requested_networks, macs=None,
dhcp_options=None):
if CONF.multi_host:
topic = rpc.queue_get_for(ctxt, self.topic, host)
cctxt = self.client.prepare(version='1.9', server=host)
else:
topic = None
return self.call(ctxt, self.make_msg('allocate_for_instance',
instance_id=instance_id, project_id=project_id, host=host,
rxtx_factor=rxtx_factor, vpn=vpn,
requested_networks=requested_networks,
macs=jsonutils.to_primitive(macs)),
topic=topic, version='1.9')
cctxt = self.client.prepare(version='1.9')
return cctxt.call(ctxt, 'allocate_for_instance',
instance_id=instance_id, project_id=project_id,
host=host, rxtx_factor=rxtx_factor, vpn=vpn,
requested_networks=requested_networks,
macs=jsonutils.to_primitive(macs))
def deallocate_for_instance(self, ctxt, instance_id, project_id,
host, requested_networks=None):
cctxt = self.client
if CONF.multi_host:
topic = rpc.queue_get_for(ctxt, self.topic, host)
else:
topic = None
return self.call(ctxt, self.make_msg('deallocate_for_instance',
instance_id=instance_id, project_id=project_id,
host=host, requested_networks=requested_networks),
topic=topic)
cctxt = cctxt.prepare(server=host)
return cctxt.call(ctxt, 'deallocate_for_instance',
instance_id=instance_id, project_id=project_id,
host=host, requested_networks=requested_networks)
def add_fixed_ip_to_instance(self, ctxt, instance_id, rxtx_factor,
host, network_id):
return self.call(ctxt, self.make_msg('add_fixed_ip_to_instance',
instance_id=instance_id, rxtx_factor=rxtx_factor,
host=host, network_id=network_id), version='1.9')
cctxt = self.client.prepare(version='1.9')
return cctxt.call(ctxt, 'add_fixed_ip_to_instance',
instance_id=instance_id, rxtx_factor=rxtx_factor,
host=host, network_id=network_id)
def remove_fixed_ip_from_instance(self, ctxt, instance_id, rxtx_factor,
host, address):
return self.call(ctxt, self.make_msg('remove_fixed_ip_from_instance',
instance_id=instance_id, rxtx_factor=rxtx_factor,
host=host, address=address), version='1.9')
cctxt = self.client.prepare(version='1.9')
return cctxt.call(ctxt, 'remove_fixed_ip_from_instance',
instance_id=instance_id, rxtx_factor=rxtx_factor,
host=host, address=address)
def add_network_to_project(self, ctxt, project_id, network_uuid):
return self.call(ctxt, self.make_msg('add_network_to_project',
project_id=project_id, network_uuid=network_uuid))
return self.client.call(ctxt, 'add_network_to_project',
project_id=project_id,
network_uuid=network_uuid)
def associate(self, ctxt, network_uuid, associations):
return self.call(ctxt, self.make_msg('associate',
network_uuid=network_uuid, associations=associations),
self.topic, version='1.5')
cctxt = self.client.prepare(version='1.5')
return cctxt.call(ctxt, 'associate',
network_uuid=network_uuid,
associations=associations)
def get_instance_nw_info(self, ctxt, instance_id, rxtx_factor, host,
project_id):
return self.call(ctxt, self.make_msg('get_instance_nw_info',
instance_id=instance_id, rxtx_factor=rxtx_factor, host=host,
project_id=project_id), version='1.9')
cctxt = self.client.prepare(version='1.9')
return cctxt.call(ctxt, 'get_instance_nw_info',
instance_id=instance_id, rxtx_factor=rxtx_factor,
host=host, project_id=project_id)
def validate_networks(self, ctxt, networks):
return self.call(ctxt, self.make_msg('validate_networks',
networks=networks))
return self.client.call(ctxt, 'validate_networks', networks=networks)
def get_instance_uuids_by_ip_filter(self, ctxt, filters):
return self.call(ctxt, self.make_msg('get_instance_uuids_by_ip_filter',
filters=filters))
return self.client.call(ctxt, 'get_instance_uuids_by_ip_filter',
filters=filters)
def get_dns_domains(self, ctxt):
return self.call(ctxt, self.make_msg('get_dns_domains'))
return self.client.call(ctxt, 'get_dns_domains')
def add_dns_entry(self, ctxt, address, name, dns_type, domain):
return self.call(ctxt, self.make_msg('add_dns_entry', address=address,
name=name, dns_type=dns_type, domain=domain))
return self.client.call(ctxt, 'add_dns_entry',
address=address, name=name,
dns_type=dns_type, domain=domain)
def modify_dns_entry(self, ctxt, address, name, domain):
return self.call(ctxt, self.make_msg('modify_dns_entry',
address=address, name=name, domain=domain))
return self.client.call(ctxt, 'modify_dns_entry',
address=address, name=name, domain=domain)
def delete_dns_entry(self, ctxt, name, domain):
return self.call(ctxt, self.make_msg('delete_dns_entry',
name=name, domain=domain))
return self.client.call(ctxt, 'delete_dns_entry',
name=name, domain=domain)
def delete_dns_domain(self, ctxt, domain):
return self.call(ctxt, self.make_msg('delete_dns_domain',
domain=domain))
return self.client.call(ctxt, 'delete_dns_domain', domain=domain)
def get_dns_entries_by_address(self, ctxt, address, domain):
return self.call(ctxt, self.make_msg('get_dns_entries_by_address',
address=address, domain=domain))
return self.client.call(ctxt, 'get_dns_entries_by_address',
address=address, domain=domain)
def get_dns_entries_by_name(self, ctxt, name, domain):
return self.call(ctxt, self.make_msg('get_dns_entries_by_name',
name=name, domain=domain))
return self.client.call(ctxt, 'get_dns_entries_by_name',
name=name, domain=domain)
def create_private_dns_domain(self, ctxt, domain, av_zone):
return self.call(ctxt, self.make_msg('create_private_dns_domain',
domain=domain, av_zone=av_zone))
return self.client.call(ctxt, 'create_private_dns_domain',
domain=domain, av_zone=av_zone)
def create_public_dns_domain(self, ctxt, domain, project):
return self.call(ctxt, self.make_msg('create_public_dns_domain',
domain=domain, project=project))
return self.client.call(ctxt, 'create_public_dns_domain',
domain=domain, project=project)
def setup_networks_on_host(self, ctxt, instance_id, host, teardown):
# NOTE(tr3buchet): the call is just to wait for completion
return self.call(ctxt, self.make_msg('setup_networks_on_host',
instance_id=instance_id, host=host, teardown=teardown))
return self.client.call(ctxt, 'setup_networks_on_host',
instance_id=instance_id, host=host,
teardown=teardown)
def set_network_host(self, ctxt, network_ref):
network_ref_p = jsonutils.to_primitive(network_ref)
return self.call(ctxt, self.make_msg('set_network_host',
network_ref=network_ref_p))
return self.client.call(ctxt, 'set_network_host',
network_ref=network_ref_p)
def rpc_setup_network_on_host(self, ctxt, network_id, teardown, host):
# NOTE(tr3buchet): the call is just to wait for completion
return self.call(ctxt, self.make_msg('rpc_setup_network_on_host',
network_id=network_id, teardown=teardown),
topic=rpc.queue_get_for(ctxt, self.topic, host))
cctxt = self.client.prepare(server=host)
return cctxt.call(ctxt, 'rpc_setup_network_on_host',
network_id=network_id, teardown=teardown)
# NOTE(russellb): Ideally this would not have a prefix of '_' since it is
# a part of the rpc API. However, this is how it was being called when the
@ -285,19 +289,19 @@ class NetworkAPI(rpc_proxy.RpcProxy):
# changed if there was ever a 2.0.
def _rpc_allocate_fixed_ip(self, ctxt, instance_id, network_id, address,
vpn, host):
return self.call(ctxt, self.make_msg('_rpc_allocate_fixed_ip',
instance_id=instance_id, network_id=network_id,
address=address, vpn=vpn),
topic=rpc.queue_get_for(ctxt, self.topic, host))
cctxt = self.client.prepare(server=host)
return cctxt.call(ctxt, '_rpc_allocate_fixed_ip',
instance_id=instance_id, network_id=network_id,
address=address, vpn=vpn)
def deallocate_fixed_ip(self, ctxt, address, host):
return self.call(ctxt, self.make_msg('deallocate_fixed_ip',
address=address, host=host),
topic=rpc.queue_get_for(ctxt, self.topic, host))
cctxt = self.client.prepare(server=host)
return cctxt.call(ctxt, 'deallocate_fixed_ip',
address=address, host=host)
def update_dns(self, ctxt, network_ids):
return self.fanout_cast(ctxt, self.make_msg('update_dns',
network_ids=network_ids), version='1.3')
cctxt = self.client.prepare(fanout=True, version='1.3')
return cctxt.cast(ctxt, 'update_dns', network_ids=network_ids)
# NOTE(russellb): Ideally this would not have a prefix of '_' since it is
# a part of the rpc API. However, this is how it was being called when the
@ -305,11 +309,11 @@ class NetworkAPI(rpc_proxy.RpcProxy):
# changed if there was ever a 2.0.
def _associate_floating_ip(self, ctxt, floating_address, fixed_address,
interface, host, instance_uuid=None):
return self.call(ctxt, self.make_msg('_associate_floating_ip',
floating_address=floating_address, fixed_address=fixed_address,
interface=interface, instance_uuid=instance_uuid),
topic=rpc.queue_get_for(ctxt, self.topic, host),
version='1.6')
cctxt = self.client.prepare(server=host, version='1.6')
return cctxt.call(ctxt, '_associate_floating_ip',
floating_address=floating_address,
fixed_address=fixed_address,
interface=interface, instance_uuid=instance_uuid)
# NOTE(russellb): Ideally this would not have a prefix of '_' since it is
# a part of the rpc API. However, this is how it was being called when the
@ -317,46 +321,39 @@ class NetworkAPI(rpc_proxy.RpcProxy):
# changed if there was ever a 2.0.
def _disassociate_floating_ip(self, ctxt, address, interface, host,
instance_uuid=None):
return self.call(ctxt, self.make_msg('_disassociate_floating_ip',
address=address, interface=interface,
instance_uuid=instance_uuid),
topic=rpc.queue_get_for(ctxt, self.topic, host),
version='1.6')
cctxt = self.client.prepare(server=host, version='1.6')
return cctxt.call(ctxt, '_disassociate_floating_ip',
address=address, interface=interface,
instance_uuid=instance_uuid)
def lease_fixed_ip(self, ctxt, address, host):
self.cast(ctxt, self.make_msg('lease_fixed_ip', address=address),
topic=rpc.queue_get_for(ctxt, self.topic, host))
cctxt = self.client.prepare(server=host)
cctxt.cast(ctxt, 'lease_fixed_ip', address=address)
def release_fixed_ip(self, ctxt, address, host):
self.cast(ctxt, self.make_msg('release_fixed_ip', address=address),
topic=rpc.queue_get_for(ctxt, self.topic, host))
cctxt = self.client.prepare(server=host)
cctxt.cast(ctxt, 'release_fixed_ip', address=address)
def migrate_instance_start(self, ctxt, instance_uuid, rxtx_factor,
project_id, source_compute, dest_compute,
floating_addresses, host=None):
topic = rpc.queue_get_for(ctxt, self.topic, host)
return self.call(ctxt, self.make_msg(
'migrate_instance_start',
instance_uuid=instance_uuid,
rxtx_factor=rxtx_factor,
project_id=project_id,
source=source_compute,
dest=dest_compute,
floating_addresses=floating_addresses),
topic=topic,
version='1.2')
cctxt = self.client.prepare(server=host, version='1.2')
return cctxt.call(ctxt, 'migrate_instance_start',
instance_uuid=instance_uuid,
rxtx_factor=rxtx_factor,
project_id=project_id,
source=source_compute,
dest=dest_compute,
floating_addresses=floating_addresses)
def migrate_instance_finish(self, ctxt, instance_uuid, rxtx_factor,
project_id, source_compute, dest_compute,
floating_addresses, host=None):
topic = rpc.queue_get_for(ctxt, self.topic, host)
return self.call(ctxt, self.make_msg(
'migrate_instance_finish',
instance_uuid=instance_uuid,
rxtx_factor=rxtx_factor,
project_id=project_id,
source=source_compute,
dest=dest_compute,
floating_addresses=floating_addresses),
topic=topic,
version='1.2')
cctxt = self.client.prepare(server=host, version='1.2')
return cctxt.call(ctxt, 'migrate_instance_finish',
instance_uuid=instance_uuid,
rxtx_factor=rxtx_factor,
project_id=project_id,
source=source_compute,
dest=dest_compute,
floating_addresses=floating_addresses)

View File

@ -23,8 +23,6 @@ from nova.objects import utils as obj_utils
from nova.openstack.common.gettextutils import _
from nova.openstack.common import log as logging
from nova.openstack.common.rpc import common as rpc_common
import nova.openstack.common.rpc.dispatcher
import nova.openstack.common.rpc.proxy
import nova.openstack.common.rpc.serializer

96
nova/rpcclient.py Normal file
View File

@ -0,0 +1,96 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A temporary helper which emulates oslo.messaging.rpc.RPCClient.
The most tedious part of porting to oslo.messaging is porting the code which
sub-classes RpcProxy to use RPCClient instead.
This helper method allows us to do that tedious porting as a standalone commit
so that the commit which switches us to oslo.messaging is smaller and easier
to review. This file will be removed as part of that commit.
"""
from nova.openstack.common.rpc import proxy
class RPCClient(object):
def __init__(self, proxy, namespace=None, server_params=None):
super(RPCClient, self).__init__()
self.proxy = proxy
self.namespace = namespace
self.server_params = server_params
self.kwargs = {}
self.fanout = None
def prepare(self, **kwargs):
# Clone ourselves
ret = self.__class__(self.proxy, self.namespace, self.server_params)
ret.kwargs.update(self.kwargs)
ret.fanout = self.fanout
# Update according to supplied kwargs
ret.kwargs.update(kwargs)
server = ret.kwargs.pop('server', None)
if server:
ret.kwargs['topic'] = '%s.%s' % (self.proxy.topic, server)
fanout = ret.kwargs.pop('fanout', None)
if fanout:
ret.fanout = True
return ret
def _invoke(self, cast_or_call, ctxt, method, **kwargs):
try:
msg = self.proxy.make_namespaced_msg(method,
self.namespace,
**kwargs)
return cast_or_call(ctxt, msg, **self.kwargs)
finally:
self.kwargs = {}
self.fanout = None
def cast(self, ctxt, method, **kwargs):
if self.server_params:
def cast_to_server(ctxt, msg, **kwargs):
if self.fanout:
return self.proxy.fanout_cast_to_server(
ctxt, self.server_params, msg, **kwargs)
else:
return self.proxy.cast_to_server(
ctxt, self.server_params, msg, **kwargs)
caster = cast_to_server
else:
caster = self.proxy.fanout_cast if self.fanout else self.proxy.cast
self._invoke(caster, ctxt, method, **kwargs)
def call(self, ctxt, method, **kwargs):
return self._invoke(self.proxy.call, ctxt, method, **kwargs)
def can_send_version(self, version):
return self.proxy.can_send_version(version)
class RpcProxy(proxy.RpcProxy):
def get_client(self, namespace=None, server_params=None):
return RPCClient(self,
namespace=namespace,
server_params=server_params)

View File

@ -21,7 +21,7 @@ Client side of the scheduler manager RPC API.
from oslo.config import cfg
from nova.openstack.common import jsonutils
import nova.openstack.common.rpc.proxy
from nova import rpcclient
rpcapi_opts = [
cfg.StrOpt('scheduler_topic',
@ -37,7 +37,7 @@ rpcapi_cap_opt = cfg.StrOpt('scheduler',
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
class SchedulerAPI(rpcclient.RpcProxy):
'''Client side of the scheduler rpc API.
API version history:
@ -94,11 +94,12 @@ class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
super(SchedulerAPI, self).__init__(topic=CONF.scheduler_topic,
default_version=self.BASE_RPC_API_VERSION,
version_cap=version_cap)
self.client = self.get_client()
def select_destinations(self, ctxt, request_spec, filter_properties):
return self.call(ctxt, self.make_msg('select_destinations',
request_spec=request_spec, filter_properties=filter_properties),
version='2.7')
cctxt = self.client.prepare(version='2.7')
return cctxt.call(ctxt, 'select_destinations',
request_spec=request_spec, filter_properties=filter_properties)
def run_instance(self, ctxt, request_spec, admin_password,
injected_files, requested_networks, is_first_time,
@ -110,11 +111,11 @@ class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'requested_networks': requested_networks,
'is_first_time': is_first_time,
'filter_properties': filter_properties}
if self.can_send_version('2.9'):
if self.client.can_send_version('2.9'):
version = '2.9'
msg_kwargs['legacy_bdm_in_spec'] = legacy_bdm_in_spec
return self.cast(ctxt, self.make_msg('run_instance', **msg_kwargs),
version=version)
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'run_instance', **msg_kwargs)
def prep_resize(self, ctxt, instance, instance_type, image,
request_spec, filter_properties, reservations):
@ -122,24 +123,24 @@ class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
instance_type_p = jsonutils.to_primitive(instance_type)
reservations_p = jsonutils.to_primitive(reservations)
image_p = jsonutils.to_primitive(image)
self.cast(ctxt, self.make_msg('prep_resize',
instance=instance_p, instance_type=instance_type_p,
image=image_p, request_spec=request_spec,
filter_properties=filter_properties,
reservations=reservations_p))
self.client.cast(ctxt, 'prep_resize',
instance=instance_p, instance_type=instance_type_p,
image=image_p, request_spec=request_spec,
filter_properties=filter_properties,
reservations=reservations_p)
def update_service_capabilities(self, ctxt, service_name, host,
capabilities):
#NOTE(jogo) This is deprecated, but is used by the deprecated
# publish_service_capabilities call. So this can begin its removal
# process once publish_service_capabilities is removed.
self.fanout_cast(ctxt, self.make_msg('update_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities),
version='2.4')
cctxt = self.client.prepare(fanout=True, version='2.4')
cctxt.cast(ctxt, 'update_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities)
def select_hosts(self, ctxt, request_spec, filter_properties):
return self.call(ctxt, self.make_msg('select_hosts',
request_spec=request_spec,
filter_properties=filter_properties),
version='2.6')
cctxt = self.client.prepare(version='2.6')
return cctxt.call(ctxt, 'select_hosts',
request_spec=request_spec,
filter_properties=filter_properties)

View File

@ -107,7 +107,7 @@ class CellsRPCDriverTestCase(test.TestCase):
call_info = {}
def _fake_make_msg(method, **kwargs):
def _fake_make_msg(method, namespace, **kwargs):
call_info['rpc_method'] = method
call_info['rpc_kwargs'] = kwargs
return 'fake-message'
@ -117,7 +117,7 @@ class CellsRPCDriverTestCase(test.TestCase):
call_info['cast_kwargs'] = kwargs
self.stubs.Set(rpc, 'cast_to_server', _fake_cast_to_server)
self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg',
self.stubs.Set(self.driver.intercell_rpcapi, 'make_namespaced_msg',
_fake_make_msg)
self.stubs.Set(self.driver.intercell_rpcapi, 'cast_to_server',
_fake_cast_to_server)
@ -145,7 +145,7 @@ class CellsRPCDriverTestCase(test.TestCase):
call_info = {}
def _fake_make_msg(method, **kwargs):
def _fake_make_msg(method, namespace, **kwargs):
call_info['rpc_method'] = method
call_info['rpc_kwargs'] = kwargs
return 'fake-message'
@ -156,7 +156,7 @@ class CellsRPCDriverTestCase(test.TestCase):
self.stubs.Set(rpc, 'fanout_cast_to_server',
_fake_fanout_cast_to_server)
self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg',
self.stubs.Set(self.driver.intercell_rpcapi, 'make_namespaced_msg',
_fake_make_msg)
self.stubs.Set(self.driver.intercell_rpcapi,
'fanout_cast_to_server', _fake_fanout_cast_to_server)

View File

@ -79,6 +79,8 @@ class NetworkRpcAPITestCase(test.TestCase):
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
self.assertIsNotNone(self.fake_args)
self.assertIsNotNone(self.fake_kwargs)
expected_args = [ctxt, expected_topic, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
try: