Merge "Port all rpcapi modules to oslo.messaging interface"
This commit is contained in:
@@ -21,8 +21,7 @@ Base RPC client and server common to all services.
|
||||
from oslo.config import cfg
|
||||
|
||||
from nova.openstack.common import jsonutils
|
||||
from nova.openstack.common import rpc
|
||||
import nova.openstack.common.rpc.proxy as rpc_proxy
|
||||
from nova import rpcclient
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@@ -34,7 +33,7 @@ CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
|
||||
_NAMESPACE = 'baseapi'
|
||||
|
||||
|
||||
class BaseAPI(rpc_proxy.RpcProxy):
|
||||
class BaseAPI(rpcclient.RpcProxy):
|
||||
"""Client side of the base rpc API.
|
||||
|
||||
API version history:
|
||||
@@ -63,18 +62,17 @@ class BaseAPI(rpc_proxy.RpcProxy):
|
||||
super(BaseAPI, self).__init__(topic=topic,
|
||||
default_version=self.BASE_RPC_API_VERSION,
|
||||
version_cap=version_cap)
|
||||
self.namespace = _NAMESPACE
|
||||
|
||||
self.client = self.get_client(namespace=_NAMESPACE)
|
||||
|
||||
def ping(self, context, arg, timeout=None):
|
||||
arg_p = jsonutils.to_primitive(arg)
|
||||
msg = self.make_namespaced_msg('ping', self.namespace, arg=arg_p)
|
||||
return self.call(context, msg, timeout=timeout)
|
||||
cctxt = self.client.prepare(timeout=timeout)
|
||||
return cctxt.call(context, 'ping', arg=arg_p)
|
||||
|
||||
def get_backdoor_port(self, context, host):
|
||||
msg = self.make_namespaced_msg('get_backdoor_port', self.namespace)
|
||||
return self.call(context, msg,
|
||||
topic=rpc.queue_get_for(context, self.topic, host),
|
||||
version='1.1')
|
||||
cctxt = self.client.prepare(server=host, version='1.1')
|
||||
return cctxt.call(context, 'get_backdoor_port')
|
||||
|
||||
|
||||
class BaseRPCAPI(object):
|
||||
|
||||
96
nova/rpcclient.py
Normal file
96
nova/rpcclient.py
Normal file
@@ -0,0 +1,96 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
A temporary helper which emulates oslo.messaging.rpc.RPCClient.
|
||||
|
||||
The most tedious part of porting to oslo.messaging is porting the code which
|
||||
sub-classes RpcProxy to use RPCClient instead.
|
||||
|
||||
This helper method allows us to do that tedious porting as a standalone commit
|
||||
so that the commit which switches us to oslo.messaging is smaller and easier
|
||||
to review. This file will be removed as part of that commit.
|
||||
"""
|
||||
|
||||
from nova.openstack.common.rpc import proxy
|
||||
|
||||
|
||||
class RPCClient(object):
|
||||
|
||||
def __init__(self, proxy, namespace=None, server_params=None):
|
||||
super(RPCClient, self).__init__()
|
||||
self.proxy = proxy
|
||||
self.namespace = namespace
|
||||
self.server_params = server_params
|
||||
self.kwargs = {}
|
||||
self.fanout = None
|
||||
|
||||
def prepare(self, **kwargs):
|
||||
# Clone ourselves
|
||||
ret = self.__class__(self.proxy, self.namespace, self.server_params)
|
||||
ret.kwargs.update(self.kwargs)
|
||||
ret.fanout = self.fanout
|
||||
|
||||
# Update according to supplied kwargs
|
||||
ret.kwargs.update(kwargs)
|
||||
server = ret.kwargs.pop('server', None)
|
||||
if server:
|
||||
ret.kwargs['topic'] = '%s.%s' % (self.proxy.topic, server)
|
||||
fanout = ret.kwargs.pop('fanout', None)
|
||||
if fanout:
|
||||
ret.fanout = True
|
||||
|
||||
return ret
|
||||
|
||||
def _invoke(self, cast_or_call, ctxt, method, **kwargs):
|
||||
try:
|
||||
msg = self.proxy.make_namespaced_msg(method,
|
||||
self.namespace,
|
||||
**kwargs)
|
||||
return cast_or_call(ctxt, msg, **self.kwargs)
|
||||
finally:
|
||||
self.kwargs = {}
|
||||
self.fanout = None
|
||||
|
||||
def cast(self, ctxt, method, **kwargs):
|
||||
if self.server_params:
|
||||
def cast_to_server(ctxt, msg, **kwargs):
|
||||
if self.fanout:
|
||||
return self.proxy.fanout_cast_to_server(
|
||||
ctxt, self.server_params, msg, **kwargs)
|
||||
else:
|
||||
return self.proxy.cast_to_server(
|
||||
ctxt, self.server_params, msg, **kwargs)
|
||||
|
||||
caster = cast_to_server
|
||||
else:
|
||||
caster = self.proxy.fanout_cast if self.fanout else self.proxy.cast
|
||||
|
||||
self._invoke(caster, ctxt, method, **kwargs)
|
||||
|
||||
def call(self, ctxt, method, **kwargs):
|
||||
return self._invoke(self.proxy.call, ctxt, method, **kwargs)
|
||||
|
||||
def can_send_version(self, version):
|
||||
return self.proxy.can_send_version(version)
|
||||
|
||||
|
||||
class RpcProxy(proxy.RpcProxy):
|
||||
|
||||
def get_client(self, namespace=None, server_params=None):
|
||||
return RPCClient(self,
|
||||
namespace=namespace,
|
||||
server_params=server_params)
|
||||
@@ -21,7 +21,7 @@ Client side of the scheduler manager RPC API.
|
||||
from oslo.config import cfg
|
||||
|
||||
from nova.openstack.common import jsonutils
|
||||
import nova.openstack.common.rpc.proxy
|
||||
from nova import rpcclient
|
||||
|
||||
rpcapi_opts = [
|
||||
cfg.StrOpt('scheduler_topic',
|
||||
@@ -37,7 +37,7 @@ rpcapi_cap_opt = cfg.StrOpt('scheduler',
|
||||
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
|
||||
|
||||
|
||||
class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
class SchedulerAPI(rpcclient.RpcProxy):
|
||||
'''Client side of the scheduler rpc API.
|
||||
|
||||
API version history:
|
||||
@@ -94,11 +94,12 @@ class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
super(SchedulerAPI, self).__init__(topic=CONF.scheduler_topic,
|
||||
default_version=self.BASE_RPC_API_VERSION,
|
||||
version_cap=version_cap)
|
||||
self.client = self.get_client()
|
||||
|
||||
def select_destinations(self, ctxt, request_spec, filter_properties):
|
||||
return self.call(ctxt, self.make_msg('select_destinations',
|
||||
request_spec=request_spec, filter_properties=filter_properties),
|
||||
version='2.7')
|
||||
cctxt = self.client.prepare(version='2.7')
|
||||
return cctxt.call(ctxt, 'select_destinations',
|
||||
request_spec=request_spec, filter_properties=filter_properties)
|
||||
|
||||
def run_instance(self, ctxt, request_spec, admin_password,
|
||||
injected_files, requested_networks, is_first_time,
|
||||
@@ -110,11 +111,11 @@ class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
'requested_networks': requested_networks,
|
||||
'is_first_time': is_first_time,
|
||||
'filter_properties': filter_properties}
|
||||
if self.can_send_version('2.9'):
|
||||
if self.client.can_send_version('2.9'):
|
||||
version = '2.9'
|
||||
msg_kwargs['legacy_bdm_in_spec'] = legacy_bdm_in_spec
|
||||
return self.cast(ctxt, self.make_msg('run_instance', **msg_kwargs),
|
||||
version=version)
|
||||
cctxt = self.client.prepare(version=version)
|
||||
return cctxt.cast(ctxt, 'run_instance', **msg_kwargs)
|
||||
|
||||
def prep_resize(self, ctxt, instance, instance_type, image,
|
||||
request_spec, filter_properties, reservations):
|
||||
@@ -122,24 +123,24 @@ class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
instance_type_p = jsonutils.to_primitive(instance_type)
|
||||
reservations_p = jsonutils.to_primitive(reservations)
|
||||
image_p = jsonutils.to_primitive(image)
|
||||
self.cast(ctxt, self.make_msg('prep_resize',
|
||||
instance=instance_p, instance_type=instance_type_p,
|
||||
image=image_p, request_spec=request_spec,
|
||||
filter_properties=filter_properties,
|
||||
reservations=reservations_p))
|
||||
self.client.cast(ctxt, 'prep_resize',
|
||||
instance=instance_p, instance_type=instance_type_p,
|
||||
image=image_p, request_spec=request_spec,
|
||||
filter_properties=filter_properties,
|
||||
reservations=reservations_p)
|
||||
|
||||
def update_service_capabilities(self, ctxt, service_name, host,
|
||||
capabilities):
|
||||
#NOTE(jogo) This is deprecated, but is used by the deprecated
|
||||
# publish_service_capabilities call. So this can begin its removal
|
||||
# process once publish_service_capabilities is removed.
|
||||
self.fanout_cast(ctxt, self.make_msg('update_service_capabilities',
|
||||
service_name=service_name, host=host,
|
||||
capabilities=capabilities),
|
||||
version='2.4')
|
||||
cctxt = self.client.prepare(fanout=True, version='2.4')
|
||||
cctxt.cast(ctxt, 'update_service_capabilities',
|
||||
service_name=service_name, host=host,
|
||||
capabilities=capabilities)
|
||||
|
||||
def select_hosts(self, ctxt, request_spec, filter_properties):
|
||||
return self.call(ctxt, self.make_msg('select_hosts',
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties),
|
||||
version='2.6')
|
||||
cctxt = self.client.prepare(version='2.6')
|
||||
return cctxt.call(ctxt, 'select_hosts',
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
Reference in New Issue
Block a user