Merge "Move final remnants of router processing to router classes"

This commit is contained in:
Jenkins 2015-03-30 23:49:29 +00:00 committed by Gerrit Code Review
commit c4f59fe0f5
7 changed files with 112 additions and 90 deletions

View File

@ -32,7 +32,6 @@ from neutron.agent.l3 import namespaces
from neutron.agent.l3 import router_processing_queue as queue
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ra
from neutron.agent.metadata import driver as metadata_driver
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as l3_constants
@ -40,7 +39,6 @@ from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils as common_utils
from neutron import context as n_context
from neutron.i18n import _LE, _LI, _LW
from neutron import manager
@ -288,25 +286,22 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
return dvr_router.DvrRouter(*args, **kwargs)
if router.get('ha'):
kwargs['state_change_callback'] = self.enqueue_state_change
return ha_router.HaRouter(*args, **kwargs)
return legacy_router.LegacyRouter(*args, **kwargs)
def _router_added(self, router_id, router):
ri = self._create_router(router_id, router)
ri.radvd = ra.DaemonMonitor(router['id'],
ri.ns_name,
self.process_monitor,
ri.get_internal_device_name)
self.event_observers.notify(
adv_svc.AdvancedService.before_router_added, ri)
self.router_info[router_id] = ri
ri.create()
self.process_router_add(ri)
if ri.is_ha:
ri.initialize(self.process_monitor, self.enqueue_state_change)
ri.initialize(self.process_monitor)
# TODO(Carl) This is a hook in to fwaas. It should be cleaned up.
self.process_router_add(ri)
def _router_removed(self, router_id):
ri = self.router_info.get(router_id)
@ -318,15 +313,9 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
self.event_observers.notify(
adv_svc.AdvancedService.before_router_removed, ri)
if ri.is_ha:
ri.terminate(self.process_monitor)
ri.router['gw_port'] = None
ri.router[l3_constants.INTERFACE_KEY] = []
ri.router[l3_constants.FLOATINGIP_KEY] = []
self.process_router(ri)
ri.delete(self)
del self.router_info[router_id]
ri.delete()
self.event_observers.notify(
adv_svc.AdvancedService.after_router_removed, ri)
@ -340,29 +329,6 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
self.plugin_rpc.update_floatingip_statuses(
self.context, ri.router_id, fip_statuses)
@common_utils.exception_logger()
def process_router(self, ri):
# TODO(mrsmith) - we shouldn't need to check here
if 'distributed' not in ri.router:
ri.router['distributed'] = False
ex_gw_port = ri.get_ex_gw_port()
if ri.router.get('distributed') and ex_gw_port:
ri.fip_ns = self.get_fip_ns(ex_gw_port['network_id'])
ri.fip_ns.scan_fip_ports(ri)
ri._process_internal_ports()
ri.process_external(self)
# Process static routes for router
ri.routes_updated()
# If process_router was called during a create or update
if ri.is_ha and ri.ha_port:
ri.enable_keepalived()
# Update ex_gw_port and enable_snat on the router info cache
ri.ex_gw_port = ex_gw_port
ri.snat_ports = ri.router.get(l3_constants.SNAT_ROUTER_INTF_KEY, [])
ri.enable_snat = ri.router.get('enable_snat')
def router_deleted(self, context, router_id):
"""Deal with router deletion RPC message."""
LOG.debug('Got router deleted notification for %s', router_id)
@ -427,21 +393,19 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
self._process_updated_router(router)
def _process_added_router(self, router):
# TODO(pcm): Next refactoring will rework this logic
self._router_added(router['id'], router)
ri = self.router_info[router['id']]
ri.router = router
self.process_router(ri)
ri.process(self)
self.event_observers.notify(
adv_svc.AdvancedService.after_router_added, ri)
def _process_updated_router(self, router):
# TODO(pcm): Next refactoring will rework this logic
ri = self.router_info[router['id']]
ri.router = router
self.event_observers.notify(
adv_svc.AdvancedService.before_router_updated, ri)
self.process_router(ri)
ri.process(self)
self.event_observers.notify(
adv_svc.AdvancedService.after_router_updated, ri)

View File

@ -508,3 +508,11 @@ class DvrRouter(router.RouterInfo):
# kicks the FW Agent to add rules for the IR namespace if
# configured
self.agent.process_router_add(self)
def process(self, agent):
ex_gw_port = self.get_ex_gw_port()
if ex_gw_port:
self.fip_ns = agent.get_fip_ns(ex_gw_port['network_id'])
self.fip_ns.scan_fip_ports(self)
super(DvrRouter, self).process(agent)

View File

@ -32,11 +32,12 @@ IP_MONITOR_PROCESS_SERVICE = 'ip_monitor'
class HaRouter(router.RouterInfo):
def __init__(self, *args, **kwargs):
def __init__(self, state_change_callback, *args, **kwargs):
super(HaRouter, self).__init__(*args, **kwargs)
self.ha_port = None
self.keepalived_manager = None
self.state_change_callback = state_change_callback
@property
def is_ha(self):
@ -73,7 +74,8 @@ class HaRouter(router.RouterInfo):
LOG.error(_LE('Error while writing HA state for %s'),
self.router_id)
def initialize(self, process_monitor, state_change_callback):
def initialize(self, process_monitor):
super(HaRouter, self).initialize(process_monitor)
ha_port = self.router.get(n_consts.HA_INTERFACE_KEY)
if not ha_port:
LOG.error(_LE('Unable to process HA router %s without HA port'),
@ -83,14 +85,9 @@ class HaRouter(router.RouterInfo):
self.ha_port = ha_port
self._init_keepalived_manager(process_monitor)
self.ha_network_added()
self.update_initial_state(state_change_callback)
self.update_initial_state(self.state_change_callback)
self.spawn_state_change_monitor(process_monitor)
def terminate(self, process_monitor):
self.destroy_state_change_monitor(process_monitor)
self.ha_network_removed()
self.disable_keepalived()
def _init_keepalived_manager(self, process_monitor):
self.keepalived_manager = keepalived.KeepalivedManager(
self.router['id'],
@ -344,3 +341,15 @@ class HaRouter(router.RouterInfo):
super(HaRouter, self).external_gateway_removed(ex_gw_port,
interface_name)
def delete(self, agent):
self.destroy_state_change_monitor(self.process_monitor)
self.ha_network_removed()
self.disable_keepalived()
super(HaRouter, self).delete(agent)
def process(self, agent):
super(HaRouter, self).process(agent)
if self.ha_port:
self.enable_keepalived()

View File

@ -19,6 +19,7 @@ from oslo_log import log as logging
from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.agent.linux import ra
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
from neutron.common import utils as common_utils
@ -62,6 +63,25 @@ class RouterInfo(object):
# radvd is a neutron.agent.linux.ra.DaemonMonitor
self.radvd = None
def initialize(self, process_monitor):
"""Initialize the router on the system.
This differs from __init__ in that this method actually affects the
system creating namespaces, starting processes, etc. The other merely
initializes the python object. This separates in-memory object
initialization from methods that actually go do stuff to the system.
:param process_monitor: The agent's process monitor instance.
"""
self.process_monitor = process_monitor
self.radvd = ra.DaemonMonitor(self.router_id,
self.ns_name,
process_monitor,
self.get_internal_device_name)
if self.router_namespace:
self.router_namespace.create()
@property
def router(self):
return self._router
@ -247,11 +267,11 @@ class RouterInfo(object):
fip_statuses[fip['id']] = l3_constants.FLOATINGIP_STATUS_ERROR
return fip_statuses
def create(self):
if self.router_namespace:
self.router_namespace.create()
def delete(self):
def delete(self, agent):
self.router['gw_port'] = None
self.router[l3_constants.INTERFACE_KEY] = []
self.router[l3_constants.FLOATINGIP_KEY] = []
self.process(agent)
self.radvd.disable()
if self.router_namespace:
self.router_namespace.delete()
@ -512,3 +532,23 @@ class RouterInfo(object):
fip_statuses = self.put_fips_in_error_state()
agent.update_fip_statuses(self, existing_floating_ips, fip_statuses)
@common_utils.exception_logger()
def process(self, agent):
"""Process updates to this router
This method is the point where the agent requests that updates be
applied to this router.
:param agent: Passes the agent in order to send RPC messages.
"""
self._process_internal_ports()
self.process_external(agent)
# Process static routes for router
self.routes_updated()
# Update ex_gw_port and enable_snat on the router info cache
self.ex_gw_port = self.get_ex_gw_port()
self.snat_ports = self.router.get(
l3_constants.SNAT_ROUTER_INTF_KEY, [])
self.enable_snat = self.router.get('enable_snat')

View File

@ -384,7 +384,7 @@ class L3AgentTestCase(L3AgentTestFramework):
clean_fips(router)
self._add_fip(router, client_address, fixed_address=server_address)
self.agent.process_router(router)
router.process(self.agent)
router_ns = ip_lib.IPWrapper(namespace=router.ns_name)
netcat = helpers.NetcatTester(router_ns, router_ns,
@ -407,7 +407,7 @@ class L3AgentTestCase(L3AgentTestFramework):
assert_num_of_conntrack_rules(1)
clean_fips(router)
self.agent.process_router(router)
router.process(self.agent)
assert_num_of_conntrack_rules(0)
with testtools.ExpectedException(RuntimeError):
@ -439,7 +439,7 @@ class L3AgentTestCase(L3AgentTestFramework):
router.router['gw_port']['subnets'] = subnets
router.router['gw_port']['fixed_ips'] = fixed_ips
self.agent.process_router(router)
router.process(self.agent)
# Get the updated configuration and assert that both FIPs are in,
# and that the GW IP address was updated.

View File

@ -32,7 +32,8 @@ class TestBasicRouterOperations(base.BaseTestCase):
# NOTE The use_namespaces config will soon be deprecated
self.agent_conf.use_namespaces = True
self.router_id = _uuid()
return ha_router.HaRouter(self.router_id,
return ha_router.HaRouter(mock.sentinel.enqueue_state,
self.router_id,
router,
self.agent_conf,
mock.sentinel.driver,

View File

@ -315,7 +315,7 @@ class BasicRouterOperationsFramework(base.BaseTestCase):
ri.ns_name,
agent.process_monitor,
ri.get_internal_device_name)
agent.process_router(ri)
ri.process(agent)
class TestBasicRouterOperations(BasicRouterOperationsFramework):
@ -894,7 +894,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
'fixed_ip_address': '7.7.7.7',
'port_id': _uuid(),
'host': HOSTNAME}]}
agent.process_router(ri)
ri.process(agent)
ri.process_floating_ip_addresses.assert_called_with(mock.ANY)
ri.process_floating_ip_addresses.reset_mock()
ri.process_floating_ip_nat_rules.assert_called_with()
@ -906,7 +906,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
fake_floatingips2['floatingips'][0]['fixed_ip_address'] = '7.7.7.8'
router[l3_constants.FLOATINGIP_KEY] = fake_floatingips2['floatingips']
agent.process_router(ri)
ri.process(agent)
ri.process_floating_ip_addresses.assert_called_with(mock.ANY)
ri.process_floating_ip_addresses.reset_mock()
ri.process_floating_ip_nat_rules.assert_called_with()
@ -923,7 +923,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
['fixed_ips'][0]['ip_address']))
ri.router['gw_port']['fixed_ips'][0]['ip_address'] = str(old_ip + 1)
agent.process_router(ri)
ri.process(agent)
ri.process_floating_ip_addresses.reset_mock()
ri.process_floating_ip_nat_rules.reset_mock()
self.assertEqual(ri.external_gateway_added.call_count, 0)
@ -931,7 +931,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
# remove just the floating ips
del router[l3_constants.FLOATINGIP_KEY]
agent.process_router(ri)
ri.process(agent)
ri.process_floating_ip_addresses.assert_called_with(mock.ANY)
ri.process_floating_ip_addresses.reset_mock()
ri.process_floating_ip_nat_rules.assert_called_with()
@ -940,7 +940,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
# now no ports so state is torn down
del router[l3_constants.INTERFACE_KEY]
del router['gw_port']
agent.process_router(ri)
ri.process(agent)
self.assertEqual(self.send_arp.call_count, 1)
distributed = ri.router.get('distributed', False)
self.assertEqual(ri.process_floating_ip_addresses.called,
@ -1110,13 +1110,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process with NAT
agent.process_router(ri)
ri.process(agent)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Reprocess without NAT
router['enable_snat'] = False
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
ri.process(agent)
# For some reason set logic does not work well with
# IpTablesRule instances
nat_rules_delta = [r for r in orig_nat_rules
@ -1131,13 +1131,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process without NAT
agent.process_router(ri)
ri.process(agent)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Reprocess with NAT
router['enable_snat'] = True
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
ri.process(agent)
# For some reason set logic does not work well with
# IpTablesRule instances
nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
@ -1152,13 +1152,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process with NAT
agent.process_router(ri)
ri.process(agent)
# Add an interface and reprocess
router_append_interface(router)
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
# send_arp is called both times process_router is called
ri.process(agent)
# send_arp is called both times process is called
self.assertEqual(self.send_arp.call_count, 2)
def _test_process_ipv6_only_or_dual_stack_gw(self, dual_stack=False):
@ -1203,7 +1203,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process with NAT
agent.process_router(ri)
ri.process(agent)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Add an IPv6 interface and reprocess
router_append_interface(router, count=1, ip_version=6, ra_mode=ra_mode,
@ -1261,7 +1261,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process with NAT
agent.process_router(ri)
ri.process(agent)
# Add an IPv4 and IPv6 interface and reprocess
router_append_interface(router, count=1, ip_version=4)
router_append_interface(router, count=1, ip_version=6)
@ -1275,13 +1275,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# Process with NAT
agent.process_router(ri)
ri.process(agent)
# Add an interface and reprocess
del router[l3_constants.INTERFACE_KEY][1]
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
# send_arp is called both times process_router is called
ri.process(agent)
# send_arp is called both times process is called
self.assertEqual(self.send_arp.call_count, 2)
def test_process_router_ipv6_interface_removed(self):
@ -1313,7 +1313,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
# raise RuntimeError to simulate that an unexpected exception
# occurs
internal_network_added.side_effect = RuntimeError
self.assertRaises(RuntimeError, agent.process_router, ri)
self.assertRaises(RuntimeError, ri.process, agent)
self.assertNotIn(
router[l3_constants.INTERFACE_KEY][0], ri.internal_ports)
@ -1322,7 +1322,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
# periodic_sync_routers_task finds out that _rpc_loop failed to
# process the router last time, it will retry in the next run.
agent.process_router(ri)
ri.process(agent)
# We were able to add the port to ri.internal_ports
self.assertIn(
router[l3_constants.INTERFACE_KEY][0], ri.internal_ports)
@ -1333,7 +1333,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
# add an internal port
agent.process_router(ri)
ri.process(agent)
with mock.patch.object(
ri,
@ -1343,7 +1343,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
internal_net_removed.side_effect = RuntimeError
ri.internal_ports[0]['admin_state_up'] = False
# The above port is set to down state, remove it.
self.assertRaises(RuntimeError, agent.process_router, ri)
self.assertRaises(RuntimeError, ri.process, agent)
self.assertIn(
router[l3_constants.INTERFACE_KEY][0], ri.internal_ports)
@ -1352,7 +1352,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
# periodic_sync_routers_task finds out that _rpc_loop failed to
# process the router last time, it will retry in the next run.
agent.process_router(ri)
ri.process(agent)
# We were able to remove the port from ri.internal_ports
self.assertNotIn(
router[l3_constants.INTERFACE_KEY][0], ri.internal_ports)
@ -1374,7 +1374,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router,
**self.ri_kwargs)
ri.external_gateway_added = mock.Mock()
agent.process_router(ri)
ri.process(agent)
# Assess the call for putting the floating IP up was performed
mock_update_fip_status.assert_called_once_with(
mock.ANY, ri.router_id,
@ -1383,7 +1383,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
# Process the router again, this time without floating IPs
router[l3_constants.FLOATINGIP_KEY] = []
ri.router = router
agent.process_router(ri)
ri.process(agent)
# Assess the call for putting the floating IP up was performed
mock_update_fip_status.assert_called_once_with(
mock.ANY, ri.router_id,
@ -1406,7 +1406,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri.process_floating_ip_addresses = mock.Mock(
side_effect=RuntimeError)
ri.external_gateway_added = mock.Mock()
agent.process_router(ri)
ri.process(agent)
# Assess the call for putting the floating IP into Error
# was performed
mock_update_fip_status.assert_called_once_with(
@ -1520,7 +1520,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
external_gateway_removed,
external_gateway_added):
agent.process_router(ri)
ri.process(agent)
self.assertEqual(external_gateway_added.call_count, 1)
self.assertFalse(external_gateway_removed.called)
@ -1545,7 +1545,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
self.mock_ip.get_devices.return_value = stale_devlist
agent.process_router(ri)
ri.process(agent)
self.mock_driver.unplug.assert_called_with(
stale_devnames[0],