diff --git a/neutron/services/vpn/device_drivers/cisco_ipsec.py b/neutron/services/vpn/device_drivers/cisco_ipsec.py index 0eb3871d181..7776a34572b 100644 --- a/neutron/services/vpn/device_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/device_drivers/cisco_ipsec.py @@ -67,24 +67,26 @@ class CsrUnknownMappingError(exceptions.NeutronException): "attribute %(attr)s of %(resource)s") -class CiscoCsrIPsecVpnDriverApi(n_rpc.RpcProxy): +class CiscoCsrIPsecVpnDriverApi(object): """RPC API for agent to plugin messaging.""" + def __init__(self, topic): + target = messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) + def get_vpn_services_on_host(self, context, host): """Get list of vpnservices on this host. The vpnservices including related ipsec_site_connection, ikepolicy, ipsecpolicy, and Cisco info on this host. """ - return self.call(context, - self.make_msg('get_vpn_services_on_host', - host=host)) + cctxt = self.client.prepare() + return cctxt.call(context, 'get_vpn_services_on_host', host=host) def update_status(self, context, status): """Update status for all VPN services and connections.""" - return self.cast(context, - self.make_msg('update_status', - status=status)) + cctxt = self.client.prepare() + return cctxt.call(context, 'update_status', status=status) @six.add_metaclass(abc.ABCMeta) @@ -117,7 +119,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): self.conn.create_consumer(node_topic, self.endpoints, fanout=False) self.conn.consume_in_threads() self.agent_rpc = ( - CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0')) + CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC)) self.periodic_report = loopingcall.FixedIntervalLoopingCall( self.report_status, context) self.periodic_report.start( diff --git a/neutron/services/vpn/device_drivers/ipsec.py b/neutron/services/vpn/device_drivers/ipsec.py index a7579e27dee..fb393c1b0f2 100644 --- a/neutron/services/vpn/device_drivers/ipsec.py +++ b/neutron/services/vpn/device_drivers/ipsec.py @@ -442,9 +442,12 @@ class OpenSwanProcess(BaseSwanProcess): self.connection_status = {} -class IPsecVpnDriverApi(n_rpc.RpcProxy): +class IPsecVpnDriverApi(object): """IPSecVpnDriver RPC api.""" - IPSEC_PLUGIN_VERSION = '1.0' + + def __init__(self, topic): + target = messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) def get_vpn_services_on_host(self, context, host): """Get list of vpnservices. @@ -452,10 +455,8 @@ class IPsecVpnDriverApi(n_rpc.RpcProxy): The vpnservices including related ipsec_site_connection, ikepolicy and ipsecpolicy on this host """ - return self.call(context, - self.make_msg('get_vpn_services_on_host', - host=host), - version=self.IPSEC_PLUGIN_VERSION) + cctxt = self.client.prepare() + return cctxt.call(context, 'get_vpn_services_on_host', host=host) def update_status(self, context, status): """Update local status. @@ -463,10 +464,8 @@ class IPsecVpnDriverApi(n_rpc.RpcProxy): This method call updates status attribute of VPNServices. """ - return self.cast(context, - self.make_msg('update_status', - status=status), - version=self.IPSEC_PLUGIN_VERSION) + cctxt = self.client.prepare() + return cctxt.call(context, 'update_status', status=status) @six.add_metaclass(abc.ABCMeta) @@ -504,7 +503,7 @@ class IPsecDriver(device_drivers.DeviceDriver): self.endpoints = [self] self.conn.create_consumer(node_topic, self.endpoints, fanout=False) self.conn.consume_in_threads() - self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0') + self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC) self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall( self.report_status, self.context) self.process_status_cache_check.start( diff --git a/neutron/services/vpn/service_drivers/__init__.py b/neutron/services/vpn/service_drivers/__init__.py index d66e3785099..b2f0bfdcb02 100644 --- a/neutron/services/vpn/service_drivers/__init__.py +++ b/neutron/services/vpn/service_drivers/__init__.py @@ -15,6 +15,7 @@ import abc +from oslo import messaging import six from neutron.common import rpc as n_rpc @@ -71,13 +72,14 @@ class VpnDriver(object): pass -class BaseIPsecVpnAgentApi(n_rpc.RpcProxy): +class BaseIPsecVpnAgentApi(object): """Base class for IPSec API to agent.""" def __init__(self, topic, default_version, driver): self.topic = topic self.driver = driver - super(BaseIPsecVpnAgentApi, self).__init__(topic, default_version) + target = messaging.Target(topic=topic, version=default_version) + self.client = n_rpc.get_client(target) def _agent_notification(self, context, method, router_id, version=None, **kwargs): @@ -100,10 +102,8 @@ class BaseIPsecVpnAgentApi(n_rpc.RpcProxy): 'host': l3_agent.host, 'method': method, 'args': kwargs}) - self.cast( - context, self.make_msg(method, **kwargs), - version=version, - topic='%s.%s' % (self.topic, l3_agent.host)) + cctxt = self.client.prepare(server=l3_agent.host, version=version) + cctxt.cast(context, method, **kwargs) def vpnservice_updated(self, context, router_id, **kwargs): """Send update event of vpnservices.""" diff --git a/neutron/services/vpn/service_drivers/cisco_ipsec.py b/neutron/services/vpn/service_drivers/cisco_ipsec.py index 2fafc21be83..d88ccb8ec9b 100644 --- a/neutron/services/vpn/service_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/service_drivers/cisco_ipsec.py @@ -107,9 +107,8 @@ class CiscoCsrIPsecVpnAgentApi(service_drivers.BaseIPsecVpnAgentApi, 'method': method, 'args': kwargs, 'router': router_id}) - self.cast(context, self.make_msg(method, **kwargs), - version=version, - topic='%s.%s' % (self.topic, host)) + cctxt = self.client.prepare(server=host, version=version) + cctxt.cast(context, method, **kwargs) class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver): diff --git a/neutron/tests/unit/services/vpn/service_drivers/test_cisco_ipsec.py b/neutron/tests/unit/services/vpn/service_drivers/test_cisco_ipsec.py index 2c18cc8e1c0..436a458ab77 100644 --- a/neutron/tests/unit/services/vpn/service_drivers/test_cisco_ipsec.py +++ b/neutron/tests/unit/services/vpn/service_drivers/test_cisco_ipsec.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib + import mock from oslo.config import cfg @@ -352,15 +354,20 @@ class TestCiscoIPsecDriver(testlib_api.SqlTestCase): self.context = n_ctx.Context('some_user', 'some_tenant') def _test_update(self, func, args, additional_info=None): - with mock.patch.object(self.driver.agent_rpc, 'cast') as cast: + with contextlib.nested( + mock.patch.object(self.driver.agent_rpc.client, 'cast'), + mock.patch.object(self.driver.agent_rpc.client, 'prepare'), + ) as ( + rpc_mock, prepare_mock + ): + prepare_mock.return_value = self.driver.agent_rpc.client func(self.context, *args) - cast.assert_called_once_with( - self.context, - {'args': additional_info, - 'namespace': None, - 'method': 'vpnservice_updated'}, - version='1.0', - topic='cisco_csr_ipsec_agent.fake_host') + + prepare_args = {'server': 'fake_host', 'version': '1.0'} + prepare_mock.assert_called_once_with(**prepare_args) + + rpc_mock.assert_called_once_with(self.context, 'vpnservice_updated', + reason=mock.ANY) def test_create_ipsec_site_connection(self): self._test_update(self.driver.create_ipsec_site_connection, diff --git a/neutron/tests/unit/services/vpn/service_drivers/test_ipsec.py b/neutron/tests/unit/services/vpn/service_drivers/test_ipsec.py index b2aab3a6b15..9f100fd6067 100644 --- a/neutron/tests/unit/services/vpn/service_drivers/test_ipsec.py +++ b/neutron/tests/unit/services/vpn/service_drivers/test_ipsec.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib + import mock from oslo.config import cfg @@ -240,15 +242,19 @@ class TestIPsecDriver(base.BaseTestCase): def _test_update(self, func, args): ctxt = n_ctx.Context('', 'somebody') - with mock.patch.object(self.driver.agent_rpc, 'cast') as cast: + with contextlib.nested( + mock.patch.object(self.driver.agent_rpc.client, 'cast'), + mock.patch.object(self.driver.agent_rpc.client, 'prepare'), + ) as ( + rpc_mock, prepare_mock + ): + prepare_mock.return_value = self.driver.agent_rpc.client func(ctxt, *args) - cast.assert_called_once_with( - ctxt, - {'args': {}, - 'namespace': None, - 'method': 'vpnservice_updated'}, - version='1.0', - topic='ipsec_agent.fake_host') + + prepare_args = {'server': 'fake_host', 'version': '1.0'} + prepare_mock.assert_called_once_with(**prepare_args) + + rpc_mock.assert_called_once_with(ctxt, 'vpnservice_updated') def test_create_ipsec_site_connection(self): self._test_update(self.driver.create_ipsec_site_connection,