Merge "Drop RpcProxy usage from VPNaaS code"

This commit is contained in:
Jenkins 2014-11-23 20:00:42 +00:00 committed by Gerrit Code Review
commit 4c9b9ec9de
6 changed files with 57 additions and 44 deletions

View File

@ -67,24 +67,26 @@ class CsrUnknownMappingError(exceptions.NeutronException):
"attribute %(attr)s of %(resource)s")
class CiscoCsrIPsecVpnDriverApi(n_rpc.RpcProxy):
class CiscoCsrIPsecVpnDriverApi(object):
"""RPC API for agent to plugin messaging."""
def __init__(self, topic):
target = messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def get_vpn_services_on_host(self, context, host):
"""Get list of vpnservices on this host.
The vpnservices including related ipsec_site_connection,
ikepolicy, ipsecpolicy, and Cisco info on this host.
"""
return self.call(context,
self.make_msg('get_vpn_services_on_host',
host=host))
cctxt = self.client.prepare()
return cctxt.call(context, 'get_vpn_services_on_host', host=host)
def update_status(self, context, status):
"""Update status for all VPN services and connections."""
return self.cast(context,
self.make_msg('update_status',
status=status))
cctxt = self.client.prepare()
return cctxt.call(context, 'update_status', status=status)
@six.add_metaclass(abc.ABCMeta)
@ -117,7 +119,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = (
CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC))
self.periodic_report = loopingcall.FixedIntervalLoopingCall(
self.report_status, context)
self.periodic_report.start(

View File

@ -442,9 +442,12 @@ class OpenSwanProcess(BaseSwanProcess):
self.connection_status = {}
class IPsecVpnDriverApi(n_rpc.RpcProxy):
class IPsecVpnDriverApi(object):
"""IPSecVpnDriver RPC api."""
IPSEC_PLUGIN_VERSION = '1.0'
def __init__(self, topic):
target = messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def get_vpn_services_on_host(self, context, host):
"""Get list of vpnservices.
@ -452,10 +455,8 @@ class IPsecVpnDriverApi(n_rpc.RpcProxy):
The vpnservices including related ipsec_site_connection,
ikepolicy and ipsecpolicy on this host
"""
return self.call(context,
self.make_msg('get_vpn_services_on_host',
host=host),
version=self.IPSEC_PLUGIN_VERSION)
cctxt = self.client.prepare()
return cctxt.call(context, 'get_vpn_services_on_host', host=host)
def update_status(self, context, status):
"""Update local status.
@ -463,10 +464,8 @@ class IPsecVpnDriverApi(n_rpc.RpcProxy):
This method call updates status attribute of
VPNServices.
"""
return self.cast(context,
self.make_msg('update_status',
status=status),
version=self.IPSEC_PLUGIN_VERSION)
cctxt = self.client.prepare()
return cctxt.call(context, 'update_status', status=status)
@six.add_metaclass(abc.ABCMeta)
@ -504,7 +503,7 @@ class IPsecDriver(device_drivers.DeviceDriver):
self.endpoints = [self]
self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC)
self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
self.report_status, self.context)
self.process_status_cache_check.start(

View File

@ -15,6 +15,7 @@
import abc
from oslo import messaging
import six
from neutron.common import rpc as n_rpc
@ -71,13 +72,14 @@ class VpnDriver(object):
pass
class BaseIPsecVpnAgentApi(n_rpc.RpcProxy):
class BaseIPsecVpnAgentApi(object):
"""Base class for IPSec API to agent."""
def __init__(self, topic, default_version, driver):
self.topic = topic
self.driver = driver
super(BaseIPsecVpnAgentApi, self).__init__(topic, default_version)
target = messaging.Target(topic=topic, version=default_version)
self.client = n_rpc.get_client(target)
def _agent_notification(self, context, method, router_id,
version=None, **kwargs):
@ -100,10 +102,8 @@ class BaseIPsecVpnAgentApi(n_rpc.RpcProxy):
'host': l3_agent.host,
'method': method,
'args': kwargs})
self.cast(
context, self.make_msg(method, **kwargs),
version=version,
topic='%s.%s' % (self.topic, l3_agent.host))
cctxt = self.client.prepare(server=l3_agent.host, version=version)
cctxt.cast(context, method, **kwargs)
def vpnservice_updated(self, context, router_id, **kwargs):
"""Send update event of vpnservices."""

View File

@ -107,9 +107,8 @@ class CiscoCsrIPsecVpnAgentApi(service_drivers.BaseIPsecVpnAgentApi,
'method': method,
'args': kwargs,
'router': router_id})
self.cast(context, self.make_msg(method, **kwargs),
version=version,
topic='%s.%s' % (self.topic, host))
cctxt = self.client.prepare(server=host, version=version)
cctxt.cast(context, method, **kwargs)
class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from oslo.config import cfg
@ -352,15 +354,20 @@ class TestCiscoIPsecDriver(testlib_api.SqlTestCase):
self.context = n_ctx.Context('some_user', 'some_tenant')
def _test_update(self, func, args, additional_info=None):
with mock.patch.object(self.driver.agent_rpc, 'cast') as cast:
with contextlib.nested(
mock.patch.object(self.driver.agent_rpc.client, 'cast'),
mock.patch.object(self.driver.agent_rpc.client, 'prepare'),
) as (
rpc_mock, prepare_mock
):
prepare_mock.return_value = self.driver.agent_rpc.client
func(self.context, *args)
cast.assert_called_once_with(
self.context,
{'args': additional_info,
'namespace': None,
'method': 'vpnservice_updated'},
version='1.0',
topic='cisco_csr_ipsec_agent.fake_host')
prepare_args = {'server': 'fake_host', 'version': '1.0'}
prepare_mock.assert_called_once_with(**prepare_args)
rpc_mock.assert_called_once_with(self.context, 'vpnservice_updated',
reason=mock.ANY)
def test_create_ipsec_site_connection(self):
self._test_update(self.driver.create_ipsec_site_connection,

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from oslo.config import cfg
@ -240,15 +242,19 @@ class TestIPsecDriver(base.BaseTestCase):
def _test_update(self, func, args):
ctxt = n_ctx.Context('', 'somebody')
with mock.patch.object(self.driver.agent_rpc, 'cast') as cast:
with contextlib.nested(
mock.patch.object(self.driver.agent_rpc.client, 'cast'),
mock.patch.object(self.driver.agent_rpc.client, 'prepare'),
) as (
rpc_mock, prepare_mock
):
prepare_mock.return_value = self.driver.agent_rpc.client
func(ctxt, *args)
cast.assert_called_once_with(
ctxt,
{'args': {},
'namespace': None,
'method': 'vpnservice_updated'},
version='1.0',
topic='ipsec_agent.fake_host')
prepare_args = {'server': 'fake_host', 'version': '1.0'}
prepare_mock.assert_called_once_with(**prepare_args)
rpc_mock.assert_called_once_with(ctxt, 'vpnservice_updated')
def test_create_ipsec_site_connection(self):
self._test_update(self.driver.create_ipsec_site_connection,