Drop RpcProxy usage from VPNaaS code

Drop the usage of the RpcProxy compatibility class from the VPNaaS
code.  The equivalent direct usage of oslo.messaging APIs is now used
instead.

Part of blueprint drop-rpc-compat.

Change-Id: I4ff0bfe0b5e909bfe088f4059d85aa6366526dad
This commit is contained in:
Russell Bryant 2014-11-19 17:32:38 +00:00
parent bdcc5a46d7
commit 2161b1aea1
6 changed files with 57 additions and 44 deletions

View File

@ -67,24 +67,26 @@ class CsrUnknownMappingError(exceptions.NeutronException):
"attribute %(attr)s of %(resource)s")
class CiscoCsrIPsecVpnDriverApi(n_rpc.RpcProxy):
class CiscoCsrIPsecVpnDriverApi(object):
"""RPC API for agent to plugin messaging."""
def __init__(self, topic):
target = messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def get_vpn_services_on_host(self, context, host):
"""Get list of vpnservices on this host.
The vpnservices including related ipsec_site_connection,
ikepolicy, ipsecpolicy, and Cisco info on this host.
"""
return self.call(context,
self.make_msg('get_vpn_services_on_host',
host=host))
cctxt = self.client.prepare()
return cctxt.call(context, 'get_vpn_services_on_host', host=host)
def update_status(self, context, status):
"""Update status for all VPN services and connections."""
return self.cast(context,
self.make_msg('update_status',
status=status))
cctxt = self.client.prepare()
return cctxt.call(context, 'update_status', status=status)
@six.add_metaclass(abc.ABCMeta)
@ -117,7 +119,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = (
CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC))
self.periodic_report = loopingcall.FixedIntervalLoopingCall(
self.report_status, context)
self.periodic_report.start(

View File

@ -442,9 +442,12 @@ class OpenSwanProcess(BaseSwanProcess):
self.connection_status = {}
class IPsecVpnDriverApi(n_rpc.RpcProxy):
class IPsecVpnDriverApi(object):
"""IPSecVpnDriver RPC api."""
IPSEC_PLUGIN_VERSION = '1.0'
def __init__(self, topic):
target = messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def get_vpn_services_on_host(self, context, host):
"""Get list of vpnservices.
@ -452,10 +455,8 @@ class IPsecVpnDriverApi(n_rpc.RpcProxy):
The vpnservices including related ipsec_site_connection,
ikepolicy and ipsecpolicy on this host
"""
return self.call(context,
self.make_msg('get_vpn_services_on_host',
host=host),
version=self.IPSEC_PLUGIN_VERSION)
cctxt = self.client.prepare()
return cctxt.call(context, 'get_vpn_services_on_host', host=host)
def update_status(self, context, status):
"""Update local status.
@ -463,10 +464,8 @@ class IPsecVpnDriverApi(n_rpc.RpcProxy):
This method call updates status attribute of
VPNServices.
"""
return self.cast(context,
self.make_msg('update_status',
status=status),
version=self.IPSEC_PLUGIN_VERSION)
cctxt = self.client.prepare()
return cctxt.call(context, 'update_status', status=status)
@six.add_metaclass(abc.ABCMeta)
@ -504,7 +503,7 @@ class IPsecDriver(device_drivers.DeviceDriver):
self.endpoints = [self]
self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC)
self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
self.report_status, self.context)
self.process_status_cache_check.start(

View File

@ -15,6 +15,7 @@
import abc
from oslo import messaging
import six
from neutron.common import rpc as n_rpc
@ -71,13 +72,14 @@ class VpnDriver(object):
pass
class BaseIPsecVpnAgentApi(n_rpc.RpcProxy):
class BaseIPsecVpnAgentApi(object):
"""Base class for IPSec API to agent."""
def __init__(self, topic, default_version, driver):
self.topic = topic
self.driver = driver
super(BaseIPsecVpnAgentApi, self).__init__(topic, default_version)
target = messaging.Target(topic=topic, version=default_version)
self.client = n_rpc.get_client(target)
def _agent_notification(self, context, method, router_id,
version=None, **kwargs):
@ -100,10 +102,8 @@ class BaseIPsecVpnAgentApi(n_rpc.RpcProxy):
'host': l3_agent.host,
'method': method,
'args': kwargs})
self.cast(
context, self.make_msg(method, **kwargs),
version=version,
topic='%s.%s' % (self.topic, l3_agent.host))
cctxt = self.client.prepare(server=l3_agent.host, version=version)
cctxt.cast(context, method, **kwargs)
def vpnservice_updated(self, context, router_id, **kwargs):
"""Send update event of vpnservices."""

View File

@ -107,9 +107,8 @@ class CiscoCsrIPsecVpnAgentApi(service_drivers.BaseIPsecVpnAgentApi,
'method': method,
'args': kwargs,
'router': router_id})
self.cast(context, self.make_msg(method, **kwargs),
version=version,
topic='%s.%s' % (self.topic, host))
cctxt = self.client.prepare(server=host, version=version)
cctxt.cast(context, method, **kwargs)
class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from oslo.config import cfg
@ -352,15 +354,20 @@ class TestCiscoIPsecDriver(testlib_api.SqlTestCase):
self.context = n_ctx.Context('some_user', 'some_tenant')
def _test_update(self, func, args, additional_info=None):
with mock.patch.object(self.driver.agent_rpc, 'cast') as cast:
with contextlib.nested(
mock.patch.object(self.driver.agent_rpc.client, 'cast'),
mock.patch.object(self.driver.agent_rpc.client, 'prepare'),
) as (
rpc_mock, prepare_mock
):
prepare_mock.return_value = self.driver.agent_rpc.client
func(self.context, *args)
cast.assert_called_once_with(
self.context,
{'args': additional_info,
'namespace': None,
'method': 'vpnservice_updated'},
version='1.0',
topic='cisco_csr_ipsec_agent.fake_host')
prepare_args = {'server': 'fake_host', 'version': '1.0'}
prepare_mock.assert_called_once_with(**prepare_args)
rpc_mock.assert_called_once_with(self.context, 'vpnservice_updated',
reason=mock.ANY)
def test_create_ipsec_site_connection(self):
self._test_update(self.driver.create_ipsec_site_connection,

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from oslo.config import cfg
@ -240,15 +242,19 @@ class TestIPsecDriver(base.BaseTestCase):
def _test_update(self, func, args):
ctxt = n_ctx.Context('', 'somebody')
with mock.patch.object(self.driver.agent_rpc, 'cast') as cast:
with contextlib.nested(
mock.patch.object(self.driver.agent_rpc.client, 'cast'),
mock.patch.object(self.driver.agent_rpc.client, 'prepare'),
) as (
rpc_mock, prepare_mock
):
prepare_mock.return_value = self.driver.agent_rpc.client
func(ctxt, *args)
cast.assert_called_once_with(
ctxt,
{'args': {},
'namespace': None,
'method': 'vpnservice_updated'},
version='1.0',
topic='ipsec_agent.fake_host')
prepare_args = {'server': 'fake_host', 'version': '1.0'}
prepare_mock.assert_called_once_with(**prepare_args)
rpc_mock.assert_called_once_with(ctxt, 'vpnservice_updated')
def test_create_ipsec_site_connection(self):
self._test_update(self.driver.create_ipsec_site_connection,