Drop RpcProxy usage from ml2 AgentNotifierApi

Remove usage of the RpcProxy compatibility class from the ml2
AgentNotifierApi.  The equivalent oslo.messaging APIs are now used
instead.  A couple of other mixin APIs had to be converted at the same
time.

Note that there is one very minor functional change here.  The base
rpc version is set to '1.0' now instead of '1.1'.  The right pattern
to use is to always set the base to be N.0.  Any method that needs a
newer version should specify it.

Part of blueprint drop-rpc-compat.

Change-Id: I640568e2d73c9eb7a9505db640dc1427a1ae2abe
This commit is contained in:
Russell Bryant 2014-12-01 18:42:30 +00:00
parent 881344280c
commit 1bc911ca8a
4 changed files with 35 additions and 61 deletions

View File

@ -98,11 +98,9 @@ class DVRAgentRpcApiMixin(object):
"""Notify dvr mac address updates."""
if not dvr_macs:
return
self.fanout_cast(context,
self.make_msg('dvr_mac_address_update',
dvr_macs=dvr_macs),
version=self.DVR_RPC_VERSION,
topic=self._get_dvr_update_topic())
cctxt = self.client.prepare(topic=self._get_dvr_update_topic(),
version=self.DVR_RPC_VERSION, fanout=True)
cctxt.cast(context, 'dvr_mac_address_update', dvr_macs=dvr_macs)
class DVRAgentRpcCallbackMixin(object):

View File

@ -187,8 +187,7 @@ class TunnelAgentRpcApiMixin(object):
topics.UPDATE)
def tunnel_update(self, context, tunnel_ip, tunnel_type):
self.fanout_cast(context,
self.make_msg('tunnel_update',
tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type),
topic=self._get_tunnel_update_topic())
cctxt = self.client.prepare(topic=self._get_tunnel_update_topic(),
fanout=True)
cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type)

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo import messaging
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
from neutron.common import constants as q_const
@ -172,8 +174,7 @@ class RpcCallbacks(n_rpc.RpcCallback,
LOG.debug('Port %s not found during ARP update', port_id)
class AgentNotifierApi(n_rpc.RpcProxy,
dvr_rpc.DVRAgentRpcApiMixin,
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,
type_tunnel.TunnelAgentRpcApiMixin):
"""Agent side of the openvswitch rpc API.
@ -185,30 +186,26 @@ class AgentNotifierApi(n_rpc.RpcProxy,
"""
BASE_RPC_API_VERSION = '1.1'
def __init__(self, topic):
super(AgentNotifierApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic = topic
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
self.topic_port_update = topics.get_topic_name(topic,
topics.PORT,
topics.UPDATE)
target = messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def network_delete(self, context, network_id):
self.fanout_cast(context,
self.make_msg('network_delete',
network_id=network_id),
topic=self.topic_network_delete)
cctxt = self.client.prepare(topic=self.topic_network_delete,
fanout=True)
cctxt.cast(context, 'network_delete', network_id=network_id)
def port_update(self, context, port, network_type, segmentation_id,
physical_network):
self.fanout_cast(context,
self.make_msg('port_update',
port=port,
network_type=network_type,
segmentation_id=segmentation_id,
physical_network=physical_network),
topic=self.topic_port_update)
cctxt = self.client.prepare(topic=self.topic_port_update,
fanout=True)
cctxt.cast(context, 'port_update', port=port,
network_type=network_type, segmentation_id=segmentation_id,
physical_network=physical_network)

View File

@ -25,7 +25,6 @@ import mock
from neutron.agent import rpc as agent_rpc
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.openstack.common import context
from neutron.plugins.ml2.drivers import type_tunnel
@ -166,35 +165,11 @@ class RpcCallbacksTestCase(base.BaseTestCase):
class RpcApiTestCase(base.BaseTestCase):
def _test_rpc_api_legacy(self, rpcapi, topic, method, rpc_method,
**kwargs):
# NOTE(russellb) This can be removed once AgentNotifierApi has been
# converted over to no longer use the RpcProxy compatibility class.
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
expected_msg = rpcapi.make_msg(method, **kwargs)
rpc = n_rpc.RpcProxy
with mock.patch.object(rpc, rpc_method) as rpc_method_mock:
rpc_method_mock.return_value = expected_retval
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
additional_args = {}
if topic:
additional_args['topic'] = topic
if expected_version:
additional_args['version'] = expected_version
expected = [
mock.call(ctxt, expected_msg, **additional_args)
]
rpc_method_mock.assert_has_calls(expected)
def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
fanout = kwargs.pop('fanout', False)
with contextlib.nested(
mock.patch.object(rpcapi.client, rpc_method),
@ -209,6 +184,10 @@ class RpcApiTestCase(base.BaseTestCase):
prepare_args = {}
if expected_version:
prepare_args['version'] = expected_version
if fanout:
prepare_args['fanout'] = fanout
if topic:
prepare_args['topic'] = topic
prepare_mock.assert_called_once_with(**prepare_args)
self.assertEqual(retval, expected_retval)
@ -216,35 +195,36 @@ class RpcApiTestCase(base.BaseTestCase):
def test_delete_network(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
self._test_rpc_api_legacy(
self._test_rpc_api(
rpcapi,
topics.get_topic_name(topics.AGENT,
topics.NETWORK,
topics.DELETE),
'network_delete', rpc_method='fanout_cast',
network_id='fake_request_spec')
'network_delete', rpc_method='cast',
fanout=True, network_id='fake_request_spec')
def test_port_update(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
self._test_rpc_api_legacy(
self._test_rpc_api(
rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
port='fake_port',
'port_update', rpc_method='cast',
fanout=True, port='fake_port',
network_type='fake_network_type',
segmentation_id='fake_segmentation_id',
physical_network='fake_physical_network')
def test_tunnel_update(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
self._test_rpc_api_legacy(
self._test_rpc_api(
rpcapi,
topics.get_topic_name(topics.AGENT,
type_tunnel.TUNNEL,
topics.UPDATE),
'tunnel_update', rpc_method='fanout_cast',
'tunnel_update', rpc_method='cast',
fanout=True,
tunnel_ip='fake_ip', tunnel_type='gre')
def test_device_details(self):