From 26d8702b9d7cc5a4293b97bc435fa85983be9f01 Mon Sep 17 00:00:00 2001 From: venkata anil Date: Thu, 4 Aug 2016 07:14:47 +0000 Subject: [PATCH] l2pop fdb flows for HA router ports This patch makes L3 HA failover not depended on neutron components (during failover). All HA agents(active and backup) call update_device_up/down after wiring the ports. But l2pop driver is called for only active agent as port binding in DB reflects active agent. Then l2pop creates unicast and multicast flows for active agent. On failover, flows to new active agent is created. For this to happen - all of database, messaging server, neutron-server and destination L3 agent should be active during failover. This creates two issues - 1) When any of the above resources(i.e neutron-server, .. ) are dead, flows between new master and other agents won't be created and L3 Ha failover is not working. In same scenario, L3 Ha failover will work if l2pop is disabled. 2) Packet loss during failover is higher as above neutron resources interact multiple times, so will take time to create l2 flows. In this change, we allow plugin to notify l2pop when update_device_up/down is called by backup agents also. Then l2pop will create flood flows to all HA agents(both active and slave). L2pop won't create unicast flow for this port, instead unicast flow is created by learning action of table 10 when keepalived sends GARP after assigning ip address to master router's qr-xx port. As flood flows are already created and unicast flow is dynamically added, L3 HA failover is not depended on l2pop. This solves two isses 1) with L3 HA + l2pop, failover will work even if any of above agents or processes dead. 2) Reduce failover time as we are not depending on neutron to create flows during failover. We use L3HARouterAgentPortBinding table for getting all HA agents of a router port. HA router port on slave agent is also considered for l2pop distributed_active_network_ports and agent_network_active_port_count Closes-bug: #1522980 Closes-bug: #1602614 Change-Id: Ie1f5289390b3ff3f7f3ed7ffc8f6a8258ee8662e --- neutron/db/l3_hamode_db.py | 16 ++ neutron/plugins/ml2/drivers/l2pop/db.py | 80 +++++- .../plugins/ml2/drivers/l2pop/mech_driver.py | 57 +++- neutron/plugins/ml2/rpc.py | 56 +++- neutron/tests/unit/db/test_l3_hamode_db.py | 33 +++ .../unit/plugins/ml2/drivers/l2pop/test_db.py | 229 +++++++++++++++-- .../ml2/drivers/l2pop/test_mech_driver.py | 243 +++++++++++++++++- neutron/tests/unit/plugins/ml2/test_rpc.py | 5 +- 8 files changed, 670 insertions(+), 49 deletions(-) diff --git a/neutron/db/l3_hamode_db.py b/neutron/db/l3_hamode_db.py index 35d2298ae6a..9d086e78b90 100644 --- a/neutron/db/l3_hamode_db.py +++ b/neutron/db/l3_hamode_db.py @@ -35,8 +35,10 @@ from neutron.api.v2 import attributes from neutron.common import constants as n_const from neutron.common import utils as n_utils from neutron.db import agents_db +from neutron.db import api as db_api from neutron.db.availability_zone import router as router_az_db from neutron.db import common_db_mixin +from neutron.db import l3_attrs_db from neutron.db import l3_db from neutron.db import l3_dvr_db from neutron.db.l3_dvr_db import is_distributed_router @@ -788,3 +790,17 @@ def is_ha_router(router): if validators.is_attr_set(requested_router_type): return requested_router_type return cfg.CONF.l3_ha + + +def is_ha_router_port(device_owner, router_id): + session = db_api.get_session() + if device_owner == constants.DEVICE_OWNER_HA_REPLICATED_INT: + return True + elif device_owner == constants.DEVICE_OWNER_ROUTER_SNAT: + query = session.query(l3_attrs_db.RouterExtraAttributes) + query = query.filter_by(ha=True) + query = query.filter(l3_attrs_db.RouterExtraAttributes.router_id == + router_id) + return bool(query.limit(1).count()) + else: + return False diff --git a/neutron/plugins/ml2/drivers/l2pop/db.py b/neutron/plugins/ml2/drivers/l2pop/db.py index d1dd530d5c9..80b1a4a2811 100644 --- a/neutron/plugins/ml2/drivers/l2pop/db.py +++ b/neutron/plugins/ml2/drivers/l2pop/db.py @@ -18,10 +18,15 @@ from oslo_serialization import jsonutils from oslo_utils import timeutils from neutron.db import agents_db +from neutron.db import l3_hamode_db from neutron.db import models_v2 from neutron.plugins.ml2 import models as ml2_models +HA_ROUTER_PORTS = (const.DEVICE_OWNER_HA_REPLICATED_INT, + const.DEVICE_OWNER_ROUTER_SNAT) + + def get_agent_ip_by_host(session, agent_host): agent = get_agent_by_host(session, agent_host) if agent: @@ -70,15 +75,33 @@ def _get_active_network_ports(session, network_id): return query +def _ha_router_interfaces_on_network_query(session, network_id): + query = session.query(models_v2.Port) + query = query.join(l3_hamode_db.L3HARouterAgentPortBinding, + l3_hamode_db.L3HARouterAgentPortBinding.router_id == + models_v2.Port.device_id) + return query.filter( + models_v2.Port.network_id == network_id, + models_v2.Port.device_owner.in_(HA_ROUTER_PORTS)) + + +def _get_ha_router_interface_ids(session, network_id): + query = _ha_router_interfaces_on_network_query(session, network_id) + return query.from_self(models_v2.Port.id).distinct() + + def get_nondistributed_active_network_ports(session, network_id): query = _get_active_network_ports(session, network_id) + # Exclude DVR and HA router interfaces query = query.filter(models_v2.Port.device_owner != const.DEVICE_OWNER_DVR_INTERFACE) + ha_iface_ids_query = _get_ha_router_interface_ids(session, network_id) + query = query.filter(models_v2.Port.id.notin_(ha_iface_ids_query)) return [(bind, agent) for bind, agent in query.all() if get_agent_ip(agent)] -def get_distributed_active_network_ports(session, network_id): +def get_dvr_active_network_ports(session, network_id): with session.begin(subtransactions=True): query = session.query(ml2_models.DistributedPortBinding, agents_db.Agent) @@ -94,6 +117,44 @@ def get_distributed_active_network_ports(session, network_id): if get_agent_ip(agent)] +def get_distributed_active_network_ports(session, network_id): + return (get_dvr_active_network_ports(session, network_id) + + get_ha_active_network_ports(session, network_id)) + + +def get_ha_active_network_ports(session, network_id): + agents = get_ha_agents(session, network_id=network_id) + return [(None, agent) for agent in agents] + + +def get_ha_agents(session, network_id=None, router_id=None): + query = session.query(agents_db.Agent.host).distinct() + query = query.join(l3_hamode_db.L3HARouterAgentPortBinding, + l3_hamode_db.L3HARouterAgentPortBinding.l3_agent_id == + agents_db.Agent.id) + if router_id: + query = query.filter( + l3_hamode_db.L3HARouterAgentPortBinding.router_id == router_id) + elif network_id: + query = query.join(models_v2.Port, models_v2.Port.device_id == + l3_hamode_db.L3HARouterAgentPortBinding.router_id) + query = query.filter(models_v2.Port.network_id == network_id, + models_v2.Port.status == const.PORT_STATUS_ACTIVE, + models_v2.Port.device_owner.in_(HA_ROUTER_PORTS)) + else: + return [] + # L3HARouterAgentPortBinding will have l3 agent ids of hosting agents. + # But we need l2 agent(for tunneling ip) while creating FDB entries. + agents_query = session.query(agents_db.Agent) + agents_query = agents_query.filter(agents_db.Agent.host.in_(query)) + return [agent for agent in agents_query + if get_agent_ip(agent)] + + +def get_ha_agents_by_router_id(session, router_id): + return get_ha_agents(session, router_id=router_id) + + def get_agent_network_active_port_count(session, agent_host, network_id): with session.begin(subtransactions=True): @@ -105,6 +166,12 @@ def get_agent_network_active_port_count(session, agent_host, models_v2.Port.device_owner != const.DEVICE_OWNER_DVR_INTERFACE, ml2_models.PortBinding.host == agent_host) + + ha_iface_ids_query = _get_ha_router_interface_ids(session, network_id) + query1 = query1.filter(models_v2.Port.id.notin_(ha_iface_ids_query)) + ha_port_count = get_ha_router_active_port_count( + session, agent_host, network_id) + query2 = query.join(ml2_models.DistributedPortBinding) query2 = query2.filter(models_v2.Port.network_id == network_id, ml2_models.DistributedPortBinding.status == @@ -113,4 +180,13 @@ def get_agent_network_active_port_count(session, agent_host, const.DEVICE_OWNER_DVR_INTERFACE, ml2_models.DistributedPortBinding.host == agent_host) - return (query1.count() + query2.count()) + return (query1.count() + query2.count() + ha_port_count) + + +def get_ha_router_active_port_count(session, agent_host, network_id): + # Return num of HA router interfaces on the given network and host + query = _ha_router_interfaces_on_network_query(session, network_id) + query = query.filter(models_v2.Port.status == const.PORT_STATUS_ACTIVE) + query = query.join(agents_db.Agent) + query = query.filter(agents_db.Agent.host == agent_host) + return query.count() diff --git a/neutron/plugins/ml2/drivers/l2pop/mech_driver.py b/neutron/plugins/ml2/drivers/l2pop/mech_driver.py index 693df64b713..a556df24f31 100644 --- a/neutron/plugins/ml2/drivers/l2pop/mech_driver.py +++ b/neutron/plugins/ml2/drivers/l2pop/mech_driver.py @@ -21,6 +21,9 @@ from oslo_log import log as logging from neutron._i18n import _, _LW from neutron import context as n_context from neutron.db import api as db_api +from neutron.db import l3_hamode_db +from neutron import manager +from neutron.plugins.common import constants as service_constants from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2.drivers.l2pop import config # noqa from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db @@ -52,11 +55,30 @@ class L2populationMechanismDriver(api.MechanismDriver): """L2population driver vlan transparency support.""" return True + def _get_ha_port_agents_fdb( + self, session, network_id, router_id): + other_fdb_ports = {} + for agent in l2pop_db.get_ha_agents_by_router_id(session, router_id): + agent_active_ports = l2pop_db.get_agent_network_active_port_count( + session, agent.host, network_id) + if agent_active_ports == 0: + ip = l2pop_db.get_agent_ip(agent) + other_fdb_ports[ip] = [const.FLOODING_ENTRY] + + return other_fdb_ports + def delete_port_postcommit(self, context): port = context.current agent_host = context.host fdb_entries = self._get_agent_fdb(context.bottom_bound_segment, port, agent_host) + session = db_api.get_session() + if port['device_owner'] in l2pop_db.HA_ROUTER_PORTS: + network_id = port['network_id'] + other_fdb_ports = self._get_ha_port_agents_fdb( + session, network_id, port['device_id']) + fdb_entries[network_id]['ports'] = other_fdb_ports + self.L2populationAgentNotify.remove_fdb_entries(self.rpc_ctx, fdb_entries) @@ -125,13 +147,15 @@ class L2populationMechanismDriver(api.MechanismDriver): def update_port_postcommit(self, context): port = context.current orig = context.original - + if l3_hamode_db.is_ha_router_port(port['device_owner'], + port['device_id']): + return diff_ips = self._get_diff_ips(orig, port) if diff_ips: self._fixed_ips_changed(context, orig, port, diff_ips) if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE: if context.status == const.PORT_STATUS_ACTIVE: - self._update_port_up(context) + self.update_port_up(context) if context.status == const.PORT_STATUS_DOWN: agent_host = context.host fdb_entries = self._get_agent_fdb( @@ -150,7 +174,7 @@ class L2populationMechanismDriver(api.MechanismDriver): self.rpc_ctx, fdb_entries) elif context.status != context.original_status: if context.status == const.PORT_STATUS_ACTIVE: - self._update_port_up(context) + self.update_port_up(context) elif context.status == const.PORT_STATUS_DOWN: fdb_entries = self._get_agent_fdb( context.bottom_bound_segment, port, context.host) @@ -209,7 +233,24 @@ class L2populationMechanismDriver(api.MechanismDriver): return agents - def _update_port_up(self, context): + def update_port_down(self, context): + port = context.current + agent_host = context.host + l3plugin = manager.NeutronManager.get_service_plugins().get( + service_constants.L3_ROUTER_NAT) + # when agent transitions to backup, don't remove flood flows + if agent_host and l3plugin and getattr( + l3plugin, "list_router_ids_on_host", None): + admin_context = n_context.get_admin_context() + if l3plugin.list_router_ids_on_host( + admin_context, agent_host, [port['device_id']]): + return + fdb_entries = self._get_agent_fdb( + context.bottom_bound_segment, port, agent_host) + self.L2populationAgentNotify.remove_fdb_entries( + self.rpc_ctx, fdb_entries) + + def update_port_up(self, context): port = context.current agent_host = context.host session = db_api.get_session() @@ -249,7 +290,9 @@ class L2populationMechanismDriver(api.MechanismDriver): self.rpc_ctx, agent_fdb_entries, agent_host) # Notify other agents to add fdb rule for current port - if port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE: + if (port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE and + not l3_hamode_db.is_ha_router_port(port['device_owner'], + port['device_id'])): other_fdb_ports[agent_ip] += self._get_port_fdb_entries(port) self.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx, @@ -278,7 +321,9 @@ class L2populationMechanismDriver(api.MechanismDriver): other_fdb_entries[network_id]['ports'][agent_ip].append( const.FLOODING_ENTRY) # Notify other agents to remove fdb rules for current port - if port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE: + if (port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE and + not l3_hamode_db.is_ha_router_port(port['device_owner'], + port['device_id'])): fdb_entries = self._get_port_fdb_entries(port) other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 3d0ec161327..353870dab8a 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -25,6 +25,7 @@ from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc from neutron.callbacks import resources from neutron.common import rpc as n_rpc from neutron.common import topics +from neutron.db import l3_hamode_db from neutron.db import provisioning_blocks from neutron.extensions import portbindings from neutron.extensions import portsecurity as psec @@ -182,16 +183,18 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): LOG.debug("Device %(device)s not bound to the" " agent host %(host)s", {'device': device, 'host': host}) - return {'device': device, - 'exists': port_exists} - - try: - port_exists = bool(plugin.update_port_status( - rpc_context, port_id, n_const.PORT_STATUS_DOWN, host)) - except exc.StaleDataError: - port_exists = False - LOG.debug("delete_port and update_device_down are being executed " - "concurrently. Ignoring StaleDataError.") + else: + try: + port_exists = bool(plugin.update_port_status( + rpc_context, port_id, n_const.PORT_STATUS_DOWN, host)) + except exc.StaleDataError: + port_exists = False + LOG.debug("delete_port and update_device_down are being " + "executed concurrently. Ignoring StaleDataError.") + return {'device': device, + 'exists': port_exists} + self.notify_ha_port_status(port_id, rpc_context, + n_const.PORT_STATUS_DOWN, host) return {'device': device, 'exists': port_exists} @@ -217,11 +220,19 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): port = plugin._get_port(rpc_context, port_id) except exceptions.PortNotFound: LOG.debug("Port %s not found, will not notify nova.", port_id) + return else: if port.device_owner.startswith( n_const.DEVICE_OWNER_COMPUTE_PREFIX): plugin.nova_notifier.notify_port_active_direct(port) - return + return + else: + self.update_port_status_to_active(port, rpc_context, port_id, host) + self.notify_ha_port_status(port_id, rpc_context, + n_const.PORT_STATUS_ACTIVE, host, port=port) + + def update_port_status_to_active(self, port, rpc_context, port_id, host): + plugin = manager.NeutronManager.get_plugin() if port and port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE: # NOTE(kevinbenton): we have to special case DVR ports because of # the special multi-binding status update logic they have that @@ -241,6 +252,29 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): rpc_context, port['id'], resources.PORT, provisioning_blocks.L2_AGENT_ENTITY) + def notify_ha_port_status(self, port_id, rpc_context, + status, host, port=None): + plugin = manager.NeutronManager.get_plugin() + l2pop_driver = plugin.mechanism_manager.mech_drivers.get( + 'l2population') + if not l2pop_driver: + return + if not port: + port = ml2_db.get_port(rpc_context.session, port_id) + if not port: + return + is_ha_port = l3_hamode_db.is_ha_router_port(port['device_owner'], + port['device_id']) + if is_ha_port: + port_context = plugin.get_bound_port_context( + rpc_context, port_id) + port_context.current['status'] = status + port_context.current[portbindings.HOST_ID] = host + if status == n_const.PORT_STATUS_ACTIVE: + l2pop_driver.obj.update_port_up(port_context) + else: + l2pop_driver.obj.update_port_down(port_context) + def update_device_list(self, rpc_context, **kwargs): devices_up = [] failed_devices_up = [] diff --git a/neutron/tests/unit/db/test_l3_hamode_db.py b/neutron/tests/unit/db/test_l3_hamode_db.py index ad0e407e761..02f983d5274 100644 --- a/neutron/tests/unit/db/test_l3_hamode_db.py +++ b/neutron/tests/unit/db/test_l3_hamode_db.py @@ -1165,6 +1165,39 @@ class L3HAModeDbTestCase(L3HATestFramework): port = self._get_first_interface(router['id']) self.assertEqual(self.agent1['host'], port[portbindings.HOST_ID]) + def test_is_ha_router_port(self): + network_id = self._create_network(self.core_plugin, self.admin_ctx) + subnet = self._create_subnet(self.core_plugin, self.admin_ctx, + network_id) + interface_info = {'subnet_id': subnet['id']} + + router = self._create_router() + self.plugin.add_router_interface(self.admin_ctx, + router['id'], + interface_info) + port = self._get_first_interface(router['id']) + self.assertTrue(l3_hamode_db.is_ha_router_port( + port['device_owner'], port['device_id'])) + + def test_is_ha_router_port_for_normal_port(self): + network_id = self._create_network(self.core_plugin, self.admin_ctx) + subnet = self._create_subnet(self.core_plugin, self.admin_ctx, + network_id) + interface_info = {'subnet_id': subnet['id']} + + router = self._create_router(ha=False) + self.plugin.add_router_interface(self.admin_ctx, + router['id'], + interface_info) + device_filter = {'device_id': [router['id']], + 'device_owner': + [constants.DEVICE_OWNER_ROUTER_INTF]} + port = self.core_plugin.get_ports( + self.admin_ctx, filters=device_filter)[0] + + self.assertFalse(l3_hamode_db.is_ha_router_port( + port['device_owner'], port['device_id'])) + class L3HAUserTestCase(L3HATestFramework): diff --git a/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_db.py b/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_db.py index ee9558fc061..dbd56054902 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_db.py +++ b/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_db.py @@ -13,23 +13,73 @@ # under the License. from neutron_lib import constants +from oslo_utils import uuidutils +from neutron.common import constants as n_const +from neutron.common import utils from neutron import context +from neutron.db import l3_attrs_db +from neutron.db import l3_db +from neutron.db import l3_hamode_db from neutron.db import models_v2 from neutron.extensions import portbindings from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db from neutron.plugins.ml2 import models from neutron.tests.common import helpers +from neutron.tests import tools from neutron.tests.unit import testlib_api +HOST = helpers.HOST +HOST_2 = 'HOST_2' +HOST_3 = 'HOST_3' +HOST_2_TUNNELING_IP = '20.0.0.2' +HOST_3_TUNNELING_IP = '20.0.0.3' +TEST_ROUTER_ID = 'router_id' +TEST_NETWORK_ID = 'network_id' +TEST_HA_NETWORK_ID = 'ha_network_id' + class TestL2PopulationDBTestCase(testlib_api.SqlTestCase): def setUp(self): super(TestL2PopulationDBTestCase, self).setUp() self.ctx = context.get_admin_context() + self._create_network() + + def _create_network(self, network_id=TEST_NETWORK_ID): + with self.ctx.session.begin(subtransactions=True): + self.ctx.session.add(models_v2.Network(id=network_id)) + + def _create_router(self, distributed=True, ha=False): + with self.ctx.session.begin(subtransactions=True): + self.ctx.session.add(l3_db.Router(id=TEST_ROUTER_ID)) + self.ctx.session.add(l3_attrs_db.RouterExtraAttributes( + router_id=TEST_ROUTER_ID, distributed=distributed, ha=ha)) + + def _create_ha_router(self, distributed=False): + helpers.register_l3_agent(HOST_2) + helpers.register_ovs_agent(HOST_2, tunneling_ip=HOST_2_TUNNELING_IP) + # Register l3 agent on host3, which doesn't host any HA router. + # Tests should test that host3 is not a HA agent host. + helpers.register_l3_agent(HOST_3) + helpers.register_ovs_agent(HOST_3, tunneling_ip=HOST_3_TUNNELING_IP) + with self.ctx.session.begin(subtransactions=True): + self.ctx.session.add(models_v2.Network(id=TEST_HA_NETWORK_ID)) + self._create_router(distributed=distributed, ha=True) + for state, host in [(n_const.HA_ROUTER_STATE_ACTIVE, HOST), + (n_const.HA_ROUTER_STATE_STANDBY, HOST_2)]: + self._setup_port_binding( + network_id=TEST_HA_NETWORK_ID, + device_owner=constants.DEVICE_OWNER_ROUTER_HA_INTF, + device_id=TEST_ROUTER_ID, + host_state=state, + host=host) + + def get_l3_agent_by_host(self, agent_host): + plugin = helpers.FakePlugin() + return plugin._get_agent_by_type_and_host( + self.ctx, constants.AGENT_TYPE_L3, agent_host) def test_get_agent_by_host(self): - # Register a L2 agent + A bunch of other agents on the same host helpers.register_l3_agent() helpers.register_dhcp_agent() helpers.register_ovs_agent() @@ -38,58 +88,70 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase): self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type) def test_get_agent_by_host_no_candidate(self): - # Register a bunch of non-L2 agents on the same host helpers.register_l3_agent() helpers.register_dhcp_agent() agent = l2pop_db.get_agent_by_host( self.ctx.session, helpers.HOST) self.assertIsNone(agent) - def _setup_port_binding(self, network_id='network_id', dvr=True): + def _setup_port_binding(self, **kwargs): with self.ctx.session.begin(subtransactions=True): - self.ctx.session.add(models_v2.Network(id=network_id)) - device_owner = constants.DEVICE_OWNER_DVR_INTERFACE if dvr else '' + mac = utils.get_random_mac('fa:16:3e:00:00:00'.split(':')) + port_id = uuidutils.generate_uuid() + network_id = kwargs.get('network_id', TEST_NETWORK_ID) + device_owner = kwargs.get('device_owner', '') + device_id = kwargs.get('device_id', '') + host = kwargs.get('host', helpers.HOST) + self.ctx.session.add(models_v2.Port( - id='port_id', - network_id=network_id, - mac_address='00:11:22:33:44:55', - admin_state_up=True, - status=constants.PORT_STATUS_ACTIVE, - device_id='', - device_owner=device_owner)) - port_binding_cls = (models.DistributedPortBinding if dvr - else models.PortBinding) - binding_kwarg = { - 'port_id': 'port_id', - 'host': helpers.HOST, - 'vif_type': portbindings.VIF_TYPE_UNBOUND, - 'vnic_type': portbindings.VNIC_NORMAL - } - if dvr: - binding_kwarg['router_id'] = 'router_id' + id=port_id, network_id=network_id, mac_address=mac, + admin_state_up=True, status=constants.PORT_STATUS_ACTIVE, + device_id=device_id, device_owner=device_owner)) + + port_binding_cls = models.PortBinding + binding_kwarg = {'port_id': port_id, + 'host': host, + 'vif_type': portbindings.VIF_TYPE_UNBOUND, + 'vnic_type': portbindings.VNIC_NORMAL} + + if device_owner == constants.DEVICE_OWNER_DVR_INTERFACE: + port_binding_cls = models.DistributedPortBinding + binding_kwarg['router_id'] = TEST_ROUTER_ID binding_kwarg['status'] = constants.PORT_STATUS_DOWN self.ctx.session.add(port_binding_cls(**binding_kwarg)) + if network_id == TEST_HA_NETWORK_ID: + agent = self.get_l3_agent_by_host(host) + haport_bindings_cls = l3_hamode_db.L3HARouterAgentPortBinding + habinding_kwarg = {'port_id': port_id, + 'router_id': device_id, + 'l3_agent_id': agent['id'], + 'state': kwargs.get('host_state', + n_const.HA_ROUTER_STATE_ACTIVE)} + self.ctx.session.add(haport_bindings_cls(**habinding_kwarg)) + def test_get_distributed_active_network_ports(self): - self._setup_port_binding() + self._setup_port_binding( + device_owner=constants.DEVICE_OWNER_DVR_INTERFACE) # Register a L2 agent + A bunch of other agents on the same host helpers.register_l3_agent() helpers.register_dhcp_agent() helpers.register_ovs_agent() tunnel_network_ports = l2pop_db.get_distributed_active_network_ports( - self.ctx.session, 'network_id') + self.ctx.session, TEST_NETWORK_ID) self.assertEqual(1, len(tunnel_network_ports)) _, agent = tunnel_network_ports[0] self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type) def test_get_distributed_active_network_ports_no_candidate(self): - self._setup_port_binding() + self._setup_port_binding( + device_owner=constants.DEVICE_OWNER_DVR_INTERFACE) # Register a bunch of non-L2 agents on the same host helpers.register_l3_agent() helpers.register_dhcp_agent() tunnel_network_ports = l2pop_db.get_distributed_active_network_ports( - self.ctx.session, 'network_id') + self.ctx.session, TEST_NETWORK_ID) self.assertEqual(0, len(tunnel_network_ports)) def test_get_nondistributed_active_network_ports(self): @@ -99,7 +161,7 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase): helpers.register_dhcp_agent() helpers.register_ovs_agent() fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports( - self.ctx.session, 'network_id') + self.ctx.session, TEST_NETWORK_ID) self.assertEqual(1, len(fdb_network_ports)) _, agent = fdb_network_ports[0] self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type) @@ -110,5 +172,116 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase): helpers.register_l3_agent() helpers.register_dhcp_agent() fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports( - self.ctx.session, 'network_id') + self.ctx.session, TEST_NETWORK_ID) self.assertEqual(0, len(fdb_network_ports)) + + def test__get_ha_router_interface_ids_with_ha_dvr_snat_port(self): + helpers.register_dhcp_agent() + helpers.register_l3_agent() + helpers.register_ovs_agent() + self._create_ha_router() + self._setup_port_binding( + device_owner=constants.DEVICE_OWNER_ROUTER_SNAT, + device_id=TEST_ROUTER_ID) + ha_iface_ids = l2pop_db._get_ha_router_interface_ids( + self.ctx.session, TEST_NETWORK_ID) + self.assertEqual(1, len(list(ha_iface_ids))) + + def test__get_ha_router_interface_ids_with_ha_replicated_port(self): + helpers.register_dhcp_agent() + helpers.register_l3_agent() + helpers.register_ovs_agent() + self._create_ha_router() + self._setup_port_binding( + device_owner=constants.DEVICE_OWNER_HA_REPLICATED_INT, + device_id=TEST_ROUTER_ID) + ha_iface_ids = l2pop_db._get_ha_router_interface_ids( + self.ctx.session, TEST_NETWORK_ID) + self.assertEqual(1, len(list(ha_iface_ids))) + + def test__get_ha_router_interface_ids_with_no_ha_port(self): + self._create_router() + self._setup_port_binding( + device_owner=constants.DEVICE_OWNER_ROUTER_SNAT, + device_id=TEST_ROUTER_ID) + ha_iface_ids = l2pop_db._get_ha_router_interface_ids( + self.ctx.session, TEST_NETWORK_ID) + self.assertEqual(0, len(list(ha_iface_ids))) + + def test_active_network_ports_with_dvr_snat_port(self): + # Test to get agent hosting dvr snat port + helpers.register_l3_agent() + helpers.register_dhcp_agent() + helpers.register_ovs_agent() + # create DVR router + self._create_router() + # setup DVR snat port + self._setup_port_binding( + device_owner=constants.DEVICE_OWNER_ROUTER_SNAT, + device_id=TEST_ROUTER_ID) + helpers.register_dhcp_agent() + fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports( + self.ctx.session, TEST_NETWORK_ID) + self.assertEqual(1, len(fdb_network_ports)) + + def test_active_network_ports_with_ha_dvr_snat_port(self): + # test to get HA agents hosting HA+DVR snat port + helpers.register_dhcp_agent() + helpers.register_l3_agent() + helpers.register_ovs_agent() + # create HA+DVR router + self._create_ha_router() + # setup HA snat port + self._setup_port_binding( + device_owner=constants.DEVICE_OWNER_ROUTER_SNAT, + device_id=TEST_ROUTER_ID) + fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports( + self.ctx.session, TEST_NETWORK_ID) + self.assertEqual(0, len(fdb_network_ports)) + ha_ports = l2pop_db.get_ha_active_network_ports( + self.ctx.session, TEST_NETWORK_ID) + self.assertEqual(2, len(ha_ports)) + + def test_active_port_count_with_dvr_snat_port(self): + helpers.register_l3_agent() + helpers.register_dhcp_agent() + helpers.register_ovs_agent() + self._create_router() + self._setup_port_binding( + device_owner=constants.DEVICE_OWNER_ROUTER_SNAT, + device_id=TEST_ROUTER_ID) + helpers.register_dhcp_agent() + port_count = l2pop_db.get_agent_network_active_port_count( + self.ctx.session, HOST, TEST_NETWORK_ID) + self.assertEqual(1, port_count) + port_count = l2pop_db.get_agent_network_active_port_count( + self.ctx.session, HOST_2, TEST_NETWORK_ID) + self.assertEqual(0, port_count) + + def test_active_port_count_with_ha_dvr_snat_port(self): + helpers.register_dhcp_agent() + helpers.register_l3_agent() + helpers.register_ovs_agent() + self._create_ha_router() + self._setup_port_binding( + device_owner=constants.DEVICE_OWNER_ROUTER_SNAT, + device_id=TEST_ROUTER_ID) + port_count = l2pop_db.get_agent_network_active_port_count( + self.ctx.session, HOST, TEST_NETWORK_ID) + self.assertEqual(1, port_count) + port_count = l2pop_db.get_agent_network_active_port_count( + self.ctx.session, HOST_2, TEST_NETWORK_ID) + self.assertEqual(1, port_count) + + def test_get_ha_agents_by_router_id(self): + helpers.register_dhcp_agent() + helpers.register_l3_agent() + helpers.register_ovs_agent() + self._create_ha_router() + self._setup_port_binding( + device_owner=constants.DEVICE_OWNER_ROUTER_SNAT, + device_id=TEST_ROUTER_ID) + agents = l2pop_db.get_ha_agents_by_router_id( + self.ctx.session, TEST_ROUTER_ID) + ha_agents = [agent.host for agent in agents] + self.assertEqual(tools.UnorderedList([HOST, HOST_2]), ha_agents) diff --git a/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_mech_driver.py b/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_mech_driver.py index f87505548e0..5a784ff3e04 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_mech_driver.py +++ b/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_mech_driver.py @@ -19,11 +19,18 @@ from neutron_lib import exceptions from oslo_serialization import jsonutils import testtools +from neutron.api.v2 import attributes +from neutron.common import constants as n_const from neutron.common import topics from neutron import context +from neutron.db import agents_db +from neutron.db import common_db_mixin +from neutron.db import l3_agentschedulers_db +from neutron.db import l3_hamode_db from neutron.extensions import portbindings from neutron.extensions import providernet as pnet from neutron import manager +from neutron.plugins.common import constants as service_constants from neutron.plugins.ml2 import driver_context from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db from neutron.plugins.ml2.drivers.l2pop import mech_driver as l2pop_mech_driver @@ -31,6 +38,7 @@ from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc from neutron.plugins.ml2 import managers from neutron.plugins.ml2 import rpc +from neutron.scheduler import l3_agent_scheduler from neutron.tests import base from neutron.tests.common import helpers from neutron.tests.unit.plugins.ml2 import test_plugin @@ -40,12 +48,20 @@ HOST_2 = HOST + '_2' HOST_3 = HOST + '_3' HOST_4 = HOST + '_4' HOST_5 = HOST + '_5' +TEST_ROUTER_ID = 'router_id' NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi' DEVICE_OWNER_COMPUTE = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake' +class FakeL3PluginWithAgents(common_db_mixin.CommonDbMixin, + l3_hamode_db.L3_HA_NAT_db_mixin, + l3_agentschedulers_db.L3AgentSchedulerDbMixin, + agents_db.AgentDbMixin): + pass + + class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase): _mechanism_drivers = ['openvswitch', 'fake_agent', 'l2population'] @@ -101,6 +117,18 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase): uptime_patch = mock.patch(uptime, return_value=190) uptime_patch.start() + def _setup_l3(self): + notif_p = mock.patch.object(l3_hamode_db.L3_HA_NAT_db_mixin, + '_notify_ha_interfaces_updated') + self.notif_m = notif_p.start() + self.plugin = FakeL3PluginWithAgents() + self._register_ml2_agents() + self._register_l3_agents() + + def _register_l3_agents(self): + self.agent1 = helpers.register_l3_agent(host=HOST) + self.agent2 = helpers.register_l3_agent(host=HOST_2) + def _register_ml2_agents(self): helpers.register_ovs_agent(host=HOST, tunneling_ip='20.0.0.1') helpers.register_ovs_agent(host=HOST_2, tunneling_ip='20.0.0.2') @@ -167,6 +195,216 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase): result = jsonutils.loads(jsonutils.dumps(payload)) self.assertEqual(entry, result['netuuid']['ports']['1'][0]) + def _create_router(self, ha=True, tenant_id='tenant1', + distributed=None, ctx=None): + if ctx is None: + ctx = self.adminContext + ctx.tenant_id = tenant_id + router = {'name': TEST_ROUTER_ID, 'admin_state_up': True, + 'tenant_id': ctx.tenant_id} + if ha is not None: + router['ha'] = ha + if distributed is not None: + router['distributed'] = distributed + return self.plugin.create_router(ctx, {'router': router}) + + def _bind_router(self, router_id): + with self.adminContext.session.begin(subtransactions=True): + scheduler = l3_agent_scheduler.ChanceScheduler() + filters = {'agent_type': [constants.AGENT_TYPE_L3]} + agents_db = self.plugin.get_agents_db(self.adminContext, + filters=filters) + scheduler._bind_ha_router_to_agents( + self.plugin, + self.adminContext, + router_id, + agents_db) + self._bind_ha_network_ports(router_id) + + def _bind_ha_network_ports(self, router_id): + port_bindings = self.plugin.get_ha_router_port_bindings( + self.adminContext, [router_id]) + plugin = manager.NeutronManager.get_plugin() + + for port_binding in port_bindings: + filters = {'id': [port_binding.port_id]} + port = plugin.get_ports(self.adminContext, filters=filters)[0] + if port_binding.l3_agent_id == self.agent1['id']: + port[portbindings.HOST_ID] = self.agent1['host'] + else: + port[portbindings.HOST_ID] = self.agent2['host'] + plugin.update_port(self.adminContext, port['id'], + {attributes.PORT: port}) + + def _get_first_interface(self, net_id, router_id): + plugin = manager.NeutronManager.get_plugin() + device_filter = {'device_id': [router_id], + 'device_owner': + [constants.DEVICE_OWNER_HA_REPLICATED_INT]} + return plugin.get_ports(self.adminContext, filters=device_filter)[0] + + def _add_router_interface(self, subnet, router, host): + interface_info = {'subnet_id': subnet['id']} + self.plugin.add_router_interface(self.adminContext, + router['id'], interface_info) + self.plugin.update_routers_states( + self.adminContext, + {router['id']: n_const.HA_ROUTER_STATE_ACTIVE}, host) + + port = self._get_first_interface(subnet['network_id'], router['id']) + + self.mock_cast.reset_mock() + self.mock_fanout.reset_mock() + self.callbacks.update_device_up(self.adminContext, agent_id=host, + device=port['id'], host=host) + return port + + def _create_ha_router(self): + self._setup_l3() + router = self._create_router() + self._bind_router(router['id']) + return router + + def _verify_remove_fdb(self, expected, agent_id, device, host=None): + self.mock_fanout.reset_mock() + self.callbacks.update_device_down(self.adminContext, agent_id=host, + device=device, host=host) + self.mock_fanout.assert_called_with( + mock.ANY, 'remove_fdb_entries', expected) + + def test_other_agents_get_flood_entries_for_ha_agents(self): + # First HA router port is added on HOST and HOST2, then network port + # is added on HOST4. + # HOST4 should get flood entries for HOST1 and HOST2 + router = self._create_ha_router() + service_plugins = manager.NeutronManager.get_service_plugins() + service_plugins[service_constants.L3_ROUTER_NAT] = self.plugin + with self.subnet(network=self._network, enable_dhcp=False) as snet, \ + mock.patch('neutron.manager.NeutronManager.get_service_plugins', + return_value=service_plugins): + subnet = snet['subnet'] + port = self._add_router_interface(subnet, router, HOST) + + host_arg = {portbindings.HOST_ID: HOST_4, 'admin_state_up': True} + with self.port(subnet=snet, + device_owner=DEVICE_OWNER_COMPUTE, + arg_list=(portbindings.HOST_ID,), + **host_arg) as port1: + p1 = port1['port'] + device1 = 'tap' + p1['id'] + + self.mock_cast.reset_mock() + self.mock_fanout.reset_mock() + self.callbacks.update_device_up( + self.adminContext, agent_id=HOST_4, device=device1) + + cast_expected = { + port['network_id']: { + 'ports': {'20.0.0.1': [constants.FLOODING_ENTRY], + '20.0.0.2': [constants.FLOODING_ENTRY]}, + 'network_type': 'vxlan', 'segment_id': 1}} + self.assertEqual(1, self.mock_cast.call_count) + self.mock_cast.assert_called_with( + mock.ANY, 'add_fdb_entries', cast_expected, HOST_4) + + def test_delete_ha_port(self): + # First network port is added on HOST, and then HA router port + # is added on HOST and HOST2. + # Remove_fdb should carry flood entry of only HOST2 and not HOST + router = self._create_ha_router() + + service_plugins = manager.NeutronManager.get_service_plugins() + service_plugins[service_constants.L3_ROUTER_NAT] = self.plugin + with self.subnet(network=self._network, enable_dhcp=False) as snet, \ + mock.patch('neutron.manager.NeutronManager.get_service_plugins', + return_value=service_plugins): + host_arg = {portbindings.HOST_ID: HOST, 'admin_state_up': True} + with self.port(subnet=snet, + device_owner=DEVICE_OWNER_COMPUTE, + arg_list=(portbindings.HOST_ID,), + **host_arg) as port1: + p1 = port1['port'] + device1 = 'tap' + p1['id'] + self.callbacks.update_device_up(self.adminContext, + agent_id=HOST, device=device1) + + subnet = snet['subnet'] + port = self._add_router_interface(subnet, router, HOST) + + expected = {port['network_id']: + {'ports': {'20.0.0.2': [constants.FLOODING_ENTRY]}, + 'network_type': 'vxlan', 'segment_id': 1}} + + self.mock_fanout.reset_mock() + interface_info = {'subnet_id': subnet['id']} + self.plugin.remove_router_interface(self.adminContext, + router['id'], interface_info) + self.mock_fanout.assert_called_with( + mock.ANY, 'remove_fdb_entries', expected) + + def test_ha_agents_get_other_fdb(self): + # First network port is added on HOST4, then HA router port is + # added on HOST and HOST2. + # Both HA agents should create tunnels to HOST4 and among themselves. + # Both HA agents should be notified to other agents. + router = self._create_ha_router() + + service_plugins = manager.NeutronManager.get_service_plugins() + service_plugins[service_constants.L3_ROUTER_NAT] = self.plugin + with self.subnet(network=self._network, enable_dhcp=False) as snet, \ + mock.patch('neutron.manager.NeutronManager.get_service_plugins', + return_value=service_plugins): + host_arg = {portbindings.HOST_ID: HOST_4, 'admin_state_up': True} + with self.port(subnet=snet, + device_owner=DEVICE_OWNER_COMPUTE, + arg_list=(portbindings.HOST_ID,), + **host_arg) as port1: + p1 = port1['port'] + device1 = 'tap' + p1['id'] + self.callbacks.update_device_up( + self.adminContext, agent_id=HOST_4, device=device1) + p1_ips = [p['ip_address'] for p in p1['fixed_ips']] + + subnet = snet['subnet'] + port = self._add_router_interface(subnet, router, HOST) + fanout_expected = {port['network_id']: { + 'ports': {'20.0.0.1': [constants.FLOODING_ENTRY]}, + 'network_type': 'vxlan', 'segment_id': 1}} + + cast_expected_host = {port['network_id']: { + 'ports': { + '20.0.0.4': [constants.FLOODING_ENTRY, + l2pop_rpc.PortInfo(p1['mac_address'], + p1_ips[0])], + '20.0.0.2': [constants.FLOODING_ENTRY]}, + 'network_type': 'vxlan', 'segment_id': 1}} + self.mock_cast.assert_called_with( + mock.ANY, 'add_fdb_entries', cast_expected_host, HOST) + self.mock_fanout.assert_called_with( + mock.ANY, 'add_fdb_entries', fanout_expected) + + self.mock_cast.reset_mock() + self.mock_fanout.reset_mock() + + self.callbacks.update_device_up( + self.adminContext, agent_id=HOST_2, + device=port['id'], host=HOST_2) + + cast_expected_host2 = {port['network_id']: { + 'ports': { + '20.0.0.4': [constants.FLOODING_ENTRY, + l2pop_rpc.PortInfo(p1['mac_address'], + p1_ips[0])], + '20.0.0.1': [constants.FLOODING_ENTRY]}, + 'network_type': 'vxlan', 'segment_id': 1}} + fanout_expected = {port['network_id']: { + 'ports': {'20.0.0.2': [constants.FLOODING_ENTRY]}, + 'network_type': 'vxlan', 'segment_id': 1}} + self.mock_cast.assert_called_with( + mock.ANY, 'add_fdb_entries', cast_expected_host2, HOST_2) + self.mock_fanout.assert_called_with( + mock.ANY, 'add_fdb_entries', fanout_expected) + def test_fdb_add_called(self): self._register_ml2_agents() @@ -842,12 +1080,15 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase): l2pop_mech = l2pop_mech_driver.L2populationMechanismDriver() l2pop_mech.L2PopulationAgentNotify = mock.Mock() l2pop_mech.rpc_ctx = mock.Mock() + port = {'device_owner': ''} + context = mock.Mock() + context.current = port with mock.patch.object(l2pop_mech, '_get_agent_fdb', return_value=None) as upd_port_down,\ mock.patch.object(l2pop_mech.L2PopulationAgentNotify, 'remove_fdb_entries'): - l2pop_mech.delete_port_postcommit(mock.Mock()) + l2pop_mech.delete_port_postcommit(context) self.assertTrue(upd_port_down.called) def test_delete_unbound_port(self): diff --git a/neutron/tests/unit/plugins/ml2/test_rpc.py b/neutron/tests/unit/plugins/ml2/test_rpc.py index 266300d8e38..3351f897338 100644 --- a/neutron/tests/unit/plugins/ml2/test_rpc.py +++ b/neutron/tests/unit/plugins/ml2/test_rpc.py @@ -59,7 +59,8 @@ class RpcCallbacksTestCase(base.BaseTestCase): 'host': host } with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin' - '._device_to_port_id'): + '._device_to_port_id'),\ + mock.patch.object(self.callbacks, 'notify_ha_port_status'): with mock.patch('neutron.db.provisioning_blocks.' 'provisioning_complete') as pc: self.callbacks.update_device_up(mock.Mock(), **kwargs) @@ -212,6 +213,7 @@ class RpcCallbacksTestCase(base.BaseTestCase): def _test_update_device_not_bound_to_host(self, func): self.plugin.port_bound_to_host.return_value = False + self.callbacks.notify_ha_port_status = mock.Mock() self.plugin._device_to_port_id.return_value = 'fake_port_id' res = func(mock.Mock(), device='fake_device', host='fake_host') self.plugin.port_bound_to_host.assert_called_once_with(mock.ANY, @@ -234,6 +236,7 @@ class RpcCallbacksTestCase(base.BaseTestCase): def test_update_device_down_call_update_port_status(self): self.plugin.update_port_status.return_value = False + self.callbacks.notify_ha_port_status = mock.Mock() self.plugin._device_to_port_id.return_value = 'fake_port_id' self.assertEqual( {'device': 'fake_device', 'exists': False},