Merge "VPNaas: L3 Agent restructure - observer hierarchy"
This commit is contained in:
commit
0b14a3f171
@ -13,11 +13,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron.agent.l3 import agent as l3_agent
|
||||
from oslo.config import cfg
|
||||
from oslo.utils import importutils
|
||||
|
||||
from neutron.agent.l3 import agent
|
||||
from neutron.extensions import vpnaas
|
||||
from neutron_vpnaas.services.vpn import vpn_service
|
||||
|
||||
vpn_agent_opts = [
|
||||
cfg.MultiStrOpt(
|
||||
@ -29,119 +28,15 @@ vpn_agent_opts = [
|
||||
cfg.CONF.register_opts(vpn_agent_opts, 'vpnagent')
|
||||
|
||||
|
||||
class VPNAgent(agent.L3NATAgentWithStateReport):
|
||||
class VPNAgent(l3_agent.L3NATAgentWithStateReport):
|
||||
"""VPNAgent class which can handle vpn service drivers."""
|
||||
def __init__(self, host, conf=None):
|
||||
super(VPNAgent, self).__init__(host=host, conf=conf)
|
||||
self.setup_device_drivers(host)
|
||||
|
||||
def setup_device_drivers(self, host):
|
||||
"""Setting up device drivers.
|
||||
|
||||
:param host: hostname. This is needed for rpc
|
||||
Each devices will stays as processes.
|
||||
They will communicate with
|
||||
server side service plugin using rpc with
|
||||
device specific rpc topic.
|
||||
:returns: None
|
||||
"""
|
||||
device_drivers = cfg.CONF.vpnagent.vpn_device_driver
|
||||
self.devices = []
|
||||
for device_driver in device_drivers:
|
||||
try:
|
||||
self.devices.append(
|
||||
importutils.import_object(device_driver, self, host))
|
||||
except ImportError:
|
||||
raise vpnaas.DeviceDriverImportError(
|
||||
device_driver=device_driver)
|
||||
|
||||
def get_namespace(self, router_id):
|
||||
"""Get namespace of router.
|
||||
|
||||
:router_id: router_id
|
||||
:returns: namespace string.
|
||||
Note if the router is not exist, this function
|
||||
returns None
|
||||
"""
|
||||
router_info = self.router_info.get(router_id)
|
||||
if not router_info:
|
||||
return
|
||||
return router_info.ns_name
|
||||
|
||||
def add_nat_rule(self, router_id, chain, rule, top=False):
|
||||
"""Add nat rule in namespace.
|
||||
|
||||
:param router_id: router_id
|
||||
:param chain: a string of chain name
|
||||
:param rule: a string of rule
|
||||
:param top: if top is true, the rule
|
||||
will be placed on the top of chain
|
||||
Note if there is no rotuer, this method do nothing
|
||||
"""
|
||||
router_info = self.router_info.get(router_id)
|
||||
if not router_info:
|
||||
return
|
||||
router_info.iptables_manager.ipv4['nat'].add_rule(
|
||||
chain, rule, top=top)
|
||||
|
||||
def remove_nat_rule(self, router_id, chain, rule, top=False):
|
||||
"""Remove nat rule in namespace.
|
||||
|
||||
:param router_id: router_id
|
||||
:param chain: a string of chain name
|
||||
:param rule: a string of rule
|
||||
:param top: unused
|
||||
needed to have same argument with add_nat_rule
|
||||
"""
|
||||
router_info = self.router_info.get(router_id)
|
||||
if not router_info:
|
||||
return
|
||||
router_info.iptables_manager.ipv4['nat'].remove_rule(
|
||||
chain, rule, top=top)
|
||||
|
||||
def iptables_apply(self, router_id):
|
||||
"""Apply IPtables.
|
||||
|
||||
:param router_id: router_id
|
||||
This method do nothing if there is no router
|
||||
"""
|
||||
router_info = self.router_info.get(router_id)
|
||||
if not router_info:
|
||||
return
|
||||
router_info.iptables_manager.apply()
|
||||
|
||||
def _router_added(self, router_id, router):
|
||||
"""Router added event.
|
||||
|
||||
This method overwrites parent class method.
|
||||
:param router_id: id of added router
|
||||
:param router: dict of rotuer
|
||||
"""
|
||||
super(VPNAgent, self)._router_added(router_id, router)
|
||||
for device in self.devices:
|
||||
device.create_router(router_id)
|
||||
|
||||
def _router_removed(self, router_id):
|
||||
"""Router removed event.
|
||||
|
||||
This method overwrites parent class method.
|
||||
:param router_id: id of removed router
|
||||
"""
|
||||
super(VPNAgent, self)._router_removed(router_id)
|
||||
for device in self.devices:
|
||||
device.destroy_router(router_id)
|
||||
|
||||
def _process_router_if_compatible(self, router):
|
||||
"""Router sync event.
|
||||
|
||||
This method overwrites parent class method.
|
||||
:param router: a router
|
||||
"""
|
||||
super(VPNAgent, self)._process_router_if_compatible(router)
|
||||
for device in self.devices:
|
||||
device.sync(self.context, [router])
|
||||
# NOTE: Temp location for creating service and loading drivers
|
||||
self.service = vpn_service.VPNService.instance(self)
|
||||
self.event_observers.add(self.service)
|
||||
self.devices = self.service.load_device_drivers(host)
|
||||
|
||||
|
||||
def main():
|
||||
agent.main(
|
||||
manager='neutron_vpnaas.services.vpn.agent.VPNAgent')
|
||||
l3_agent.main(manager='neutron_vpnaas.services.vpn.agent.VPNAgent')
|
||||
|
124
neutron_vpnaas/services/vpn/vpn_service.py
Normal file
124
neutron_vpnaas/services/vpn/vpn_service.py
Normal file
@ -0,0 +1,124 @@
|
||||
# Copyright 2014 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo.utils import importutils
|
||||
|
||||
from neutron import context as n_context
|
||||
from neutron.extensions import vpnaas
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.services import advanced_service
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VPNService(advanced_service.AdvancedService):
|
||||
"""VPN Service observer."""
|
||||
|
||||
def __init__(self, l3_agent):
|
||||
"""Creates a VPN Service instance with context.
|
||||
|
||||
DO NOT CALL THIS DIRECTLY! Use the instance() class method to Creates
|
||||
a singleton instance of the service.
|
||||
"""
|
||||
self.context = n_context.get_admin_context_without_session()
|
||||
super(VPNService, self).__init__(l3_agent)
|
||||
|
||||
def load_device_drivers(self, host):
|
||||
"""Loads one or more device drivers for VPNaaS."""
|
||||
self.devices = []
|
||||
for device_driver in cfg.CONF.vpnagent.vpn_device_driver:
|
||||
try:
|
||||
self.devices.append(importutils.import_object(device_driver,
|
||||
self,
|
||||
host))
|
||||
LOG.debug('Loaded VPNaaS device driver: %s', device_driver)
|
||||
except ImportError:
|
||||
raise vpnaas.DeviceDriverImportError(
|
||||
device_driver=device_driver)
|
||||
return self.devices
|
||||
|
||||
# Overridden handlers for L3 agent events.
|
||||
def after_router_added(self, ri):
|
||||
"""Create the router and sync for each loaded device driver."""
|
||||
for device in self.devices:
|
||||
device.create_router(ri.router_id)
|
||||
device.sync(self.context, [ri.router])
|
||||
|
||||
def after_router_removed(self, ri):
|
||||
"""Remove the router from each loaded device driver."""
|
||||
for device in self.devices:
|
||||
device.destroy_router(ri.router_id)
|
||||
|
||||
def after_router_updated(self, ri):
|
||||
"""Perform a sync on each loaded device driver."""
|
||||
for device in self.devices:
|
||||
device.sync(self.context, [ri.router])
|
||||
|
||||
# Device driver methods calling back to L3 agent
|
||||
def get_namespace(self, router_id):
|
||||
"""Get namespace of router.
|
||||
|
||||
:router_id: router_id
|
||||
:returns: namespace string.
|
||||
Note if the router is not exist, this function
|
||||
returns None
|
||||
"""
|
||||
router_info = self.l3_agent.router_info.get(router_id)
|
||||
if not router_info:
|
||||
return
|
||||
return router_info.ns_name
|
||||
|
||||
def add_nat_rule(self, router_id, chain, rule, top=False):
|
||||
"""Add nat rule in namespace.
|
||||
|
||||
:param router_id: router_id
|
||||
:param chain: a string of chain name
|
||||
:param rule: a string of rule
|
||||
:param top: if top is true, the rule
|
||||
will be placed on the top of chain
|
||||
Note if there is no rotuer, this method do nothing
|
||||
"""
|
||||
router_info = self.l3_agent.router_info.get(router_id)
|
||||
if not router_info:
|
||||
return
|
||||
router_info.iptables_manager.ipv4['nat'].add_rule(
|
||||
chain, rule, top=top)
|
||||
|
||||
def remove_nat_rule(self, router_id, chain, rule, top=False):
|
||||
"""Remove nat rule in namespace.
|
||||
|
||||
:param router_id: router_id
|
||||
:param chain: a string of chain name
|
||||
:param rule: a string of rule
|
||||
:param top: unused
|
||||
needed to have same argument with add_nat_rule
|
||||
"""
|
||||
router_info = self.l3_agent.router_info.get(router_id)
|
||||
if not router_info:
|
||||
return
|
||||
router_info.iptables_manager.ipv4['nat'].remove_rule(
|
||||
chain, rule, top=top)
|
||||
|
||||
def iptables_apply(self, router_id):
|
||||
"""Apply IPtables.
|
||||
|
||||
:param router_id: router_id
|
||||
This method do nothing if there is no router
|
||||
"""
|
||||
router_info = self.l3_agent.router_info.get(router_id)
|
||||
if not router_info:
|
||||
return
|
||||
router_info.iptables_manager.apply()
|
@ -1,195 +0,0 @@
|
||||
# Copyright 2013, Nachi Ueno, NTT I3, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.agent.common import config as agent_config
|
||||
from neutron.agent.l3 import agent as l3_agent
|
||||
from neutron.agent.l3 import ha
|
||||
from neutron.agent.l3 import router_info
|
||||
from neutron.agent.linux import interface
|
||||
from neutron.common import config as base_config
|
||||
from neutron.openstack.common import uuidutils
|
||||
from neutron_vpnaas.services.vpn import agent
|
||||
from neutron_vpnaas.services.vpn import device_drivers
|
||||
from neutron_vpnaas.tests import base
|
||||
|
||||
_uuid = uuidutils.generate_uuid
|
||||
NOOP_DEVICE_CLASS = 'NoopDeviceDriver'
|
||||
NOOP_DEVICE = ('neutron_vpnaas.tests.unit.services.'
|
||||
'vpn.test_vpn_agent.%s' % NOOP_DEVICE_CLASS)
|
||||
|
||||
|
||||
class NoopDeviceDriver(device_drivers.DeviceDriver):
|
||||
def sync(self, context, processes):
|
||||
pass
|
||||
|
||||
def create_router(self, process_id):
|
||||
pass
|
||||
|
||||
def destroy_router(self, process_id):
|
||||
pass
|
||||
|
||||
|
||||
class TestVPNAgent(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestVPNAgent, self).setUp()
|
||||
self.conf = cfg.CONF
|
||||
self.conf.register_opts(base_config.core_opts)
|
||||
self.conf.register_opts(l3_agent.L3NATAgent.OPTS)
|
||||
self.conf.register_opts(ha.OPTS)
|
||||
self.conf.register_opts(interface.OPTS)
|
||||
agent_config.register_interface_driver_opts_helper(self.conf)
|
||||
agent_config.register_use_namespaces_opts_helper(self.conf)
|
||||
agent_config.register_agent_state_opts_helper(self.conf)
|
||||
agent_config.register_root_helper(self.conf)
|
||||
|
||||
self.conf.set_override('interface_driver',
|
||||
'neutron.agent.linux.interface.NullDriver')
|
||||
self.conf.set_override(
|
||||
'vpn_device_driver',
|
||||
[NOOP_DEVICE],
|
||||
'vpnagent')
|
||||
|
||||
for clazz in [
|
||||
'neutron.agent.linux.ip_lib.device_exists',
|
||||
'neutron.agent.linux.ip_lib.IPWrapper',
|
||||
'neutron.agent.linux.interface.NullDriver',
|
||||
'neutron.agent.linux.utils.execute'
|
||||
]:
|
||||
mock.patch(clazz).start()
|
||||
|
||||
l3pluginApi_cls = mock.patch(
|
||||
'neutron.agent.l3.agent.L3PluginApi').start()
|
||||
self.plugin_api = mock.MagicMock()
|
||||
l3pluginApi_cls.return_value = self.plugin_api
|
||||
|
||||
looping_call_p = mock.patch(
|
||||
'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall')
|
||||
looping_call_p.start()
|
||||
|
||||
self.fake_host = 'fake_host'
|
||||
self.agent = agent.VPNAgent(self.fake_host)
|
||||
|
||||
def test_setup_drivers(self):
|
||||
self.assertEqual(1, len(self.agent.devices))
|
||||
device = self.agent.devices[0]
|
||||
self.assertEqual(
|
||||
NOOP_DEVICE_CLASS,
|
||||
device.__class__.__name__
|
||||
)
|
||||
|
||||
def test_get_namespace(self):
|
||||
router_id = _uuid()
|
||||
ns = "ns-" + router_id
|
||||
ri = router_info.RouterInfo(router_id, self.conf.root_helper, {},
|
||||
ns_name=ns)
|
||||
self.agent.router_info = {router_id: ri}
|
||||
namespace = self.agent.get_namespace(router_id)
|
||||
self.assertTrue(namespace.endswith(router_id))
|
||||
self.assertFalse(self.agent.get_namespace('fake_id'))
|
||||
|
||||
def test_add_nat_rule(self):
|
||||
router_id = _uuid()
|
||||
ri = router_info.RouterInfo(router_id, self.conf.root_helper, {})
|
||||
iptables = mock.Mock()
|
||||
ri.iptables_manager.ipv4['nat'] = iptables
|
||||
self.agent.router_info = {router_id: ri}
|
||||
self.agent.add_nat_rule(router_id, 'fake_chain', 'fake_rule', True)
|
||||
iptables.add_rule.assert_called_once_with(
|
||||
'fake_chain', 'fake_rule', top=True)
|
||||
|
||||
def test_add_nat_rule_with_no_router(self):
|
||||
self.agent.router_info = {}
|
||||
#Should do nothing
|
||||
self.agent.add_nat_rule(
|
||||
'fake_router_id',
|
||||
'fake_chain',
|
||||
'fake_rule',
|
||||
True)
|
||||
|
||||
def test_remove_rule(self):
|
||||
router_id = _uuid()
|
||||
ri = router_info.RouterInfo(router_id, self.conf.root_helper, {})
|
||||
iptables = mock.Mock()
|
||||
ri.iptables_manager.ipv4['nat'] = iptables
|
||||
self.agent.router_info = {router_id: ri}
|
||||
self.agent.remove_nat_rule(router_id, 'fake_chain', 'fake_rule', True)
|
||||
iptables.remove_rule.assert_called_once_with(
|
||||
'fake_chain', 'fake_rule', top=True)
|
||||
|
||||
def test_remove_rule_with_no_router(self):
|
||||
self.agent.router_info = {}
|
||||
#Should do nothing
|
||||
self.agent.remove_nat_rule(
|
||||
'fake_router_id',
|
||||
'fake_chain',
|
||||
'fake_rule')
|
||||
|
||||
def test_iptables_apply(self):
|
||||
router_id = _uuid()
|
||||
ri = router_info.RouterInfo(router_id, self.conf.root_helper, {})
|
||||
iptables = mock.Mock()
|
||||
ri.iptables_manager = iptables
|
||||
self.agent.router_info = {router_id: ri}
|
||||
self.agent.iptables_apply(router_id)
|
||||
iptables.apply.assert_called_once_with()
|
||||
|
||||
def test_iptables_apply_with_no_router(self):
|
||||
#Should do nothing
|
||||
self.agent.router_info = {}
|
||||
self.agent.iptables_apply('fake_router_id')
|
||||
|
||||
def test_router_added(self):
|
||||
mock.patch(
|
||||
'neutron.agent.linux.iptables_manager.IptablesManager').start()
|
||||
router_id = _uuid()
|
||||
router = {'id': router_id}
|
||||
device = mock.Mock()
|
||||
self.agent.devices = [device]
|
||||
self.agent._router_added(router_id, router)
|
||||
device.create_router.assert_called_once_with(router_id)
|
||||
|
||||
def test_router_removed(self):
|
||||
self.plugin_api.get_external_network_id.return_value = None
|
||||
mock.patch(
|
||||
'neutron.agent.linux.iptables_manager.IptablesManager').start()
|
||||
router_id = _uuid()
|
||||
ri = router_info.RouterInfo(router_id, self.conf.root_helper, {},
|
||||
ns_name="qrouter-%s" % router_id)
|
||||
ri.router = {
|
||||
'id': router_id,
|
||||
'admin_state_up': True,
|
||||
'routes': [],
|
||||
'external_gateway_info': {},
|
||||
'distributed': False}
|
||||
device = mock.Mock()
|
||||
self.agent.router_info = {router_id: ri}
|
||||
self.agent.devices = [device]
|
||||
self.agent._router_removed(router_id)
|
||||
device.destroy_router.assert_called_once_with(router_id)
|
||||
|
||||
def test_process_router_if_compatible(self):
|
||||
self.plugin_api.get_external_network_id.return_value = None
|
||||
router = {'id': _uuid(),
|
||||
'admin_state_up': True,
|
||||
'routes': [],
|
||||
'external_gateway_info': {}}
|
||||
|
||||
device = mock.Mock()
|
||||
self.agent.devices = [device]
|
||||
self.agent._process_router_if_compatible(router)
|
||||
device.sync.assert_called_once_with(mock.ANY, [router])
|
183
neutron_vpnaas/tests/unit/services/vpn/test_vpn_service.py
Normal file
183
neutron_vpnaas/tests/unit/services/vpn/test_vpn_service.py
Normal file
@ -0,0 +1,183 @@
|
||||
# Copyright 2014 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.agent.common import config as agent_config
|
||||
from neutron.agent.l3 import router_info
|
||||
from neutron.extensions import vpnaas
|
||||
from neutron.openstack.common import uuidutils
|
||||
from neutron_vpnaas.services.vpn import agent as vpn_agent
|
||||
from neutron_vpnaas.services.vpn import device_drivers
|
||||
from neutron_vpnaas.services.vpn import vpn_service
|
||||
from neutron_vpnaas.tests import base
|
||||
|
||||
_uuid = uuidutils.generate_uuid
|
||||
|
||||
VPNAAS_NOP_DEVICE = ('neutron_vpnaas.tests.unit.services.'
|
||||
'vpn.test_vpn_service.NoopDeviceDriver')
|
||||
VPNAAS_DEFAULT_DEVICE = ('neutron_vpnaas.services.vpn.'
|
||||
'device_drivers.ipsec.OpenSwanDriver')
|
||||
FAKE_ROUTER_ID = _uuid()
|
||||
|
||||
|
||||
class NoopDeviceDriver(device_drivers.DeviceDriver):
|
||||
|
||||
def sync(self, context, processes):
|
||||
pass
|
||||
|
||||
def create_router(self, process_id):
|
||||
pass
|
||||
|
||||
def destroy_router(self, process_id):
|
||||
pass
|
||||
|
||||
|
||||
class TestVirtualPrivateNetworkDeviceDriverLoading(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestVirtualPrivateNetworkDeviceDriverLoading, self).setUp()
|
||||
cfg.CONF.register_opts(vpn_agent.vpn_agent_opts, 'vpnagent')
|
||||
self.agent = mock.Mock()
|
||||
self.service = vpn_service.VPNService.instance(self.agent)
|
||||
|
||||
def test_loading_vpn_device_drivers(self):
|
||||
"""Get two device drivers (in a list) for VPNaaS."""
|
||||
cfg.CONF.set_override('vpn_device_driver',
|
||||
[VPNAAS_NOP_DEVICE, VPNAAS_NOP_DEVICE],
|
||||
'vpnagent')
|
||||
|
||||
drivers = self.service.load_device_drivers('host')
|
||||
self.assertEqual(2, len(drivers))
|
||||
self.assertIn(drivers[0].__class__.__name__, VPNAAS_NOP_DEVICE)
|
||||
self.assertIn(drivers[1].__class__.__name__, VPNAAS_NOP_DEVICE)
|
||||
|
||||
def test_use_default_for_vpn_device_driver(self):
|
||||
"""When no VPNaaS device drivers specified, we get the default."""
|
||||
drivers = self.service.load_device_drivers('host')
|
||||
self.assertEqual(1, len(drivers))
|
||||
self.assertIn(drivers[0].__class__.__name__, VPNAAS_DEFAULT_DEVICE)
|
||||
|
||||
def test_fail_no_such_vpn_device_driver(self):
|
||||
"""Failure test of import error for VPNaaS device driver."""
|
||||
cfg.CONF.set_override('vpn_device_driver',
|
||||
['no.such.class'],
|
||||
'vpnagent')
|
||||
self.assertRaises(vpnaas.DeviceDriverImportError,
|
||||
self.service.load_device_drivers, 'host')
|
||||
|
||||
|
||||
class TestVPNDeviceDriverCallsToService(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestVPNDeviceDriverCallsToService, self).setUp()
|
||||
self.conf = cfg.CONF
|
||||
agent_config.register_root_helper(self.conf)
|
||||
self.service = vpn_service.VPNService.instance(mock.Mock())
|
||||
self.iptables = mock.Mock()
|
||||
self.apply_mock = mock.Mock()
|
||||
|
||||
def _make_router_info_for_test(self, ns_name=None, iptables=None):
|
||||
ri = router_info.RouterInfo(FAKE_ROUTER_ID, self.conf.root_helper,
|
||||
{}, ns_name=ns_name)
|
||||
if iptables:
|
||||
ri.iptables_manager.ipv4['nat'] = iptables
|
||||
ri.iptables_manager.apply = self.apply_mock
|
||||
self.service.l3_agent.router_info = {FAKE_ROUTER_ID: ri}
|
||||
|
||||
def test_get_namespace_for_router(self):
|
||||
ns = "ns-" + FAKE_ROUTER_ID
|
||||
self._make_router_info_for_test(ns_name=ns)
|
||||
namespace = self.service.get_namespace(FAKE_ROUTER_ID)
|
||||
self.assertTrue(namespace.endswith(FAKE_ROUTER_ID))
|
||||
|
||||
def test_fail_getting_namespace_for_unknown_router(self):
|
||||
self._make_router_info_for_test()
|
||||
self.assertFalse(self.service.get_namespace('bogus_id'))
|
||||
|
||||
def test_add_nat_rule(self):
|
||||
self._make_router_info_for_test(iptables=self.iptables)
|
||||
self.service.add_nat_rule(FAKE_ROUTER_ID, 'fake_chain',
|
||||
'fake_rule', True)
|
||||
self.iptables.add_rule.assert_called_once_with(
|
||||
'fake_chain', 'fake_rule', top=True)
|
||||
|
||||
def test_add_nat_rule_with_no_router(self):
|
||||
self._make_router_info_for_test(iptables=self.iptables)
|
||||
self.service.add_nat_rule(
|
||||
'bogus_router_id',
|
||||
'fake_chain',
|
||||
'fake_rule',
|
||||
True)
|
||||
self.assertFalse(self.iptables.add_rule.called)
|
||||
|
||||
def test_remove_rule(self):
|
||||
self._make_router_info_for_test(iptables=self.iptables)
|
||||
self.service.remove_nat_rule(FAKE_ROUTER_ID, 'fake_chain',
|
||||
'fake_rule', True)
|
||||
self.iptables.remove_rule.assert_called_once_with(
|
||||
'fake_chain', 'fake_rule', top=True)
|
||||
|
||||
def test_remove_rule_with_no_router(self):
|
||||
self._make_router_info_for_test(iptables=self.iptables)
|
||||
self.service.remove_nat_rule(
|
||||
'bogus_router_id',
|
||||
'fake_chain',
|
||||
'fake_rule')
|
||||
self.assertFalse(self.iptables.remove_rule.called)
|
||||
|
||||
def test_iptables_apply(self):
|
||||
self._make_router_info_for_test(iptables=self.iptables)
|
||||
self.service.iptables_apply(FAKE_ROUTER_ID)
|
||||
self.apply_mock.assert_called_once_with()
|
||||
|
||||
def test_iptables_apply_with_no_router(self):
|
||||
self._make_router_info_for_test(iptables=self.iptables)
|
||||
self.service.iptables_apply('bogus_router_id')
|
||||
self.assertFalse(self.apply_mock.called)
|
||||
|
||||
|
||||
class TestVPNServiceEventHandlers(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestVPNServiceEventHandlers, self).setUp()
|
||||
self.conf = cfg.CONF
|
||||
agent_config.register_root_helper(self.conf)
|
||||
self.service = vpn_service.VPNService.instance(mock.Mock())
|
||||
self.device = mock.Mock()
|
||||
self.service.devices = [self.device]
|
||||
|
||||
def test_actions_after_router_added(self):
|
||||
ri = router_info.RouterInfo(
|
||||
FAKE_ROUTER_ID, self.conf.root_helper, {})
|
||||
self.service.after_router_added(ri)
|
||||
self.device.create_router.assert_called_once_with(FAKE_ROUTER_ID)
|
||||
self.device.sync.assert_called_once_with(self.service.context,
|
||||
[ri.router])
|
||||
|
||||
def test_actions_after_router_removed(self):
|
||||
ri = router_info.RouterInfo(
|
||||
FAKE_ROUTER_ID, self.conf.root_helper, {},
|
||||
ns_name="qrouter-%s" % FAKE_ROUTER_ID)
|
||||
self.service.after_router_removed(ri)
|
||||
self.device.destroy_router.assert_called_once_with(FAKE_ROUTER_ID)
|
||||
|
||||
def test_actions_after_router_updated(self):
|
||||
ri = router_info.RouterInfo(
|
||||
FAKE_ROUTER_ID, self.conf.root_helper, {})
|
||||
self.service.after_router_updated(ri)
|
||||
self.device.sync.assert_called_once_with(self.service.context,
|
||||
[ri.router])
|
Loading…
x
Reference in New Issue
Block a user