l2pop fdb flows for HA router ports

This patch makes L3 HA failover not depended on neutron components
(during failover).

All HA agents(active and backup) call update_device_up/down after wiring
the ports. But l2pop driver is called for only active agent as port
binding in DB reflects active agent. Then l2pop creates unicast and
multicast flows for active agent.
On failover, flows to new active agent is created. For this to happen -
all of database, messaging server, neutron-server and destination L3
agent should be active during failover. This creates two issues -
1) When any of the above resources(i.e neutron-server, .. ) are dead,
   flows between new master and other agents won't be created and
   L3 Ha failover is not working. In same scenario, L3 Ha failover will
   work if l2pop is disabled.
2) Packet loss during failover is higher as above neutron resources
   interact multiple times, so will take time to create l2 flows.

In this change, we allow plugin to notify l2pop when update_device_up/down
is called by backup agents also. Then l2pop will create flood flows to
all HA agents(both active and slave). L2pop won't create unicast flow for
this port, instead unicast flow is created by learning action of table 10
when keepalived sends GARP after assigning ip address to master router's
qr-xx port. As flood flows are already created and unicast flow is
dynamically added, L3 HA failover is not depended on l2pop.

This solves two isses
1) with L3 HA + l2pop, failover will work even if any of above agents
   or processes dead.
2) Reduce failover time as we are not depending on neutron to create
   flows during failover.
We use L3HARouterAgentPortBinding table for getting all HA agents of a
router port. HA router port on slave agent is also considered for l2pop
distributed_active_network_ports and agent_network_active_port_count

Closes-bug: #1522980
Closes-bug: #1602614
Change-Id: Ie1f5289390b3ff3f7f3ed7ffc8f6a8258ee8662e
This commit is contained in:
venkata anil 2016-08-04 07:14:47 +00:00 committed by Carl Baldwin
parent dc6e83771c
commit 26d8702b9d
8 changed files with 670 additions and 49 deletions

View File

@ -35,8 +35,10 @@ from neutron.api.v2 import attributes
from neutron.common import constants as n_const
from neutron.common import utils as n_utils
from neutron.db import agents_db
from neutron.db import api as db_api
from neutron.db.availability_zone import router as router_az_db
from neutron.db import common_db_mixin
from neutron.db import l3_attrs_db
from neutron.db import l3_db
from neutron.db import l3_dvr_db
from neutron.db.l3_dvr_db import is_distributed_router
@ -788,3 +790,17 @@ def is_ha_router(router):
if validators.is_attr_set(requested_router_type):
return requested_router_type
return cfg.CONF.l3_ha
def is_ha_router_port(device_owner, router_id):
session = db_api.get_session()
if device_owner == constants.DEVICE_OWNER_HA_REPLICATED_INT:
return True
elif device_owner == constants.DEVICE_OWNER_ROUTER_SNAT:
query = session.query(l3_attrs_db.RouterExtraAttributes)
query = query.filter_by(ha=True)
query = query.filter(l3_attrs_db.RouterExtraAttributes.router_id ==
router_id)
return bool(query.limit(1).count())
else:
return False

View File

@ -18,10 +18,15 @@ from oslo_serialization import jsonutils
from oslo_utils import timeutils
from neutron.db import agents_db
from neutron.db import l3_hamode_db
from neutron.db import models_v2
from neutron.plugins.ml2 import models as ml2_models
HA_ROUTER_PORTS = (const.DEVICE_OWNER_HA_REPLICATED_INT,
const.DEVICE_OWNER_ROUTER_SNAT)
def get_agent_ip_by_host(session, agent_host):
agent = get_agent_by_host(session, agent_host)
if agent:
@ -70,15 +75,33 @@ def _get_active_network_ports(session, network_id):
return query
def _ha_router_interfaces_on_network_query(session, network_id):
query = session.query(models_v2.Port)
query = query.join(l3_hamode_db.L3HARouterAgentPortBinding,
l3_hamode_db.L3HARouterAgentPortBinding.router_id ==
models_v2.Port.device_id)
return query.filter(
models_v2.Port.network_id == network_id,
models_v2.Port.device_owner.in_(HA_ROUTER_PORTS))
def _get_ha_router_interface_ids(session, network_id):
query = _ha_router_interfaces_on_network_query(session, network_id)
return query.from_self(models_v2.Port.id).distinct()
def get_nondistributed_active_network_ports(session, network_id):
query = _get_active_network_ports(session, network_id)
# Exclude DVR and HA router interfaces
query = query.filter(models_v2.Port.device_owner !=
const.DEVICE_OWNER_DVR_INTERFACE)
ha_iface_ids_query = _get_ha_router_interface_ids(session, network_id)
query = query.filter(models_v2.Port.id.notin_(ha_iface_ids_query))
return [(bind, agent) for bind, agent in query.all()
if get_agent_ip(agent)]
def get_distributed_active_network_ports(session, network_id):
def get_dvr_active_network_ports(session, network_id):
with session.begin(subtransactions=True):
query = session.query(ml2_models.DistributedPortBinding,
agents_db.Agent)
@ -94,6 +117,44 @@ def get_distributed_active_network_ports(session, network_id):
if get_agent_ip(agent)]
def get_distributed_active_network_ports(session, network_id):
return (get_dvr_active_network_ports(session, network_id) +
get_ha_active_network_ports(session, network_id))
def get_ha_active_network_ports(session, network_id):
agents = get_ha_agents(session, network_id=network_id)
return [(None, agent) for agent in agents]
def get_ha_agents(session, network_id=None, router_id=None):
query = session.query(agents_db.Agent.host).distinct()
query = query.join(l3_hamode_db.L3HARouterAgentPortBinding,
l3_hamode_db.L3HARouterAgentPortBinding.l3_agent_id ==
agents_db.Agent.id)
if router_id:
query = query.filter(
l3_hamode_db.L3HARouterAgentPortBinding.router_id == router_id)
elif network_id:
query = query.join(models_v2.Port, models_v2.Port.device_id ==
l3_hamode_db.L3HARouterAgentPortBinding.router_id)
query = query.filter(models_v2.Port.network_id == network_id,
models_v2.Port.status == const.PORT_STATUS_ACTIVE,
models_v2.Port.device_owner.in_(HA_ROUTER_PORTS))
else:
return []
# L3HARouterAgentPortBinding will have l3 agent ids of hosting agents.
# But we need l2 agent(for tunneling ip) while creating FDB entries.
agents_query = session.query(agents_db.Agent)
agents_query = agents_query.filter(agents_db.Agent.host.in_(query))
return [agent for agent in agents_query
if get_agent_ip(agent)]
def get_ha_agents_by_router_id(session, router_id):
return get_ha_agents(session, router_id=router_id)
def get_agent_network_active_port_count(session, agent_host,
network_id):
with session.begin(subtransactions=True):
@ -105,6 +166,12 @@ def get_agent_network_active_port_count(session, agent_host,
models_v2.Port.device_owner !=
const.DEVICE_OWNER_DVR_INTERFACE,
ml2_models.PortBinding.host == agent_host)
ha_iface_ids_query = _get_ha_router_interface_ids(session, network_id)
query1 = query1.filter(models_v2.Port.id.notin_(ha_iface_ids_query))
ha_port_count = get_ha_router_active_port_count(
session, agent_host, network_id)
query2 = query.join(ml2_models.DistributedPortBinding)
query2 = query2.filter(models_v2.Port.network_id == network_id,
ml2_models.DistributedPortBinding.status ==
@ -113,4 +180,13 @@ def get_agent_network_active_port_count(session, agent_host,
const.DEVICE_OWNER_DVR_INTERFACE,
ml2_models.DistributedPortBinding.host ==
agent_host)
return (query1.count() + query2.count())
return (query1.count() + query2.count() + ha_port_count)
def get_ha_router_active_port_count(session, agent_host, network_id):
# Return num of HA router interfaces on the given network and host
query = _ha_router_interfaces_on_network_query(session, network_id)
query = query.filter(models_v2.Port.status == const.PORT_STATUS_ACTIVE)
query = query.join(agents_db.Agent)
query = query.filter(agents_db.Agent.host == agent_host)
return query.count()

View File

@ -21,6 +21,9 @@ from oslo_log import log as logging
from neutron._i18n import _, _LW
from neutron import context as n_context
from neutron.db import api as db_api
from neutron.db import l3_hamode_db
from neutron import manager
from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers.l2pop import config # noqa
from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db
@ -52,11 +55,30 @@ class L2populationMechanismDriver(api.MechanismDriver):
"""L2population driver vlan transparency support."""
return True
def _get_ha_port_agents_fdb(
self, session, network_id, router_id):
other_fdb_ports = {}
for agent in l2pop_db.get_ha_agents_by_router_id(session, router_id):
agent_active_ports = l2pop_db.get_agent_network_active_port_count(
session, agent.host, network_id)
if agent_active_ports == 0:
ip = l2pop_db.get_agent_ip(agent)
other_fdb_ports[ip] = [const.FLOODING_ENTRY]
return other_fdb_ports
def delete_port_postcommit(self, context):
port = context.current
agent_host = context.host
fdb_entries = self._get_agent_fdb(context.bottom_bound_segment,
port, agent_host)
session = db_api.get_session()
if port['device_owner'] in l2pop_db.HA_ROUTER_PORTS:
network_id = port['network_id']
other_fdb_ports = self._get_ha_port_agents_fdb(
session, network_id, port['device_id'])
fdb_entries[network_id]['ports'] = other_fdb_ports
self.L2populationAgentNotify.remove_fdb_entries(self.rpc_ctx,
fdb_entries)
@ -125,13 +147,15 @@ class L2populationMechanismDriver(api.MechanismDriver):
def update_port_postcommit(self, context):
port = context.current
orig = context.original
if l3_hamode_db.is_ha_router_port(port['device_owner'],
port['device_id']):
return
diff_ips = self._get_diff_ips(orig, port)
if diff_ips:
self._fixed_ips_changed(context, orig, port, diff_ips)
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
if context.status == const.PORT_STATUS_ACTIVE:
self._update_port_up(context)
self.update_port_up(context)
if context.status == const.PORT_STATUS_DOWN:
agent_host = context.host
fdb_entries = self._get_agent_fdb(
@ -150,7 +174,7 @@ class L2populationMechanismDriver(api.MechanismDriver):
self.rpc_ctx, fdb_entries)
elif context.status != context.original_status:
if context.status == const.PORT_STATUS_ACTIVE:
self._update_port_up(context)
self.update_port_up(context)
elif context.status == const.PORT_STATUS_DOWN:
fdb_entries = self._get_agent_fdb(
context.bottom_bound_segment, port, context.host)
@ -209,7 +233,24 @@ class L2populationMechanismDriver(api.MechanismDriver):
return agents
def _update_port_up(self, context):
def update_port_down(self, context):
port = context.current
agent_host = context.host
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
# when agent transitions to backup, don't remove flood flows
if agent_host and l3plugin and getattr(
l3plugin, "list_router_ids_on_host", None):
admin_context = n_context.get_admin_context()
if l3plugin.list_router_ids_on_host(
admin_context, agent_host, [port['device_id']]):
return
fdb_entries = self._get_agent_fdb(
context.bottom_bound_segment, port, agent_host)
self.L2populationAgentNotify.remove_fdb_entries(
self.rpc_ctx, fdb_entries)
def update_port_up(self, context):
port = context.current
agent_host = context.host
session = db_api.get_session()
@ -249,7 +290,9 @@ class L2populationMechanismDriver(api.MechanismDriver):
self.rpc_ctx, agent_fdb_entries, agent_host)
# Notify other agents to add fdb rule for current port
if port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE:
if (port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE and
not l3_hamode_db.is_ha_router_port(port['device_owner'],
port['device_id'])):
other_fdb_ports[agent_ip] += self._get_port_fdb_entries(port)
self.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
@ -278,7 +321,9 @@ class L2populationMechanismDriver(api.MechanismDriver):
other_fdb_entries[network_id]['ports'][agent_ip].append(
const.FLOODING_ENTRY)
# Notify other agents to remove fdb rules for current port
if port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE:
if (port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE and
not l3_hamode_db.is_ha_router_port(port['device_owner'],
port['device_id'])):
fdb_entries = self._get_port_fdb_entries(port)
other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries

View File

@ -25,6 +25,7 @@ from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.callbacks import resources
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import l3_hamode_db
from neutron.db import provisioning_blocks
from neutron.extensions import portbindings
from neutron.extensions import portsecurity as psec
@ -182,16 +183,18 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
LOG.debug("Device %(device)s not bound to the"
" agent host %(host)s",
{'device': device, 'host': host})
return {'device': device,
'exists': port_exists}
try:
port_exists = bool(plugin.update_port_status(
rpc_context, port_id, n_const.PORT_STATUS_DOWN, host))
except exc.StaleDataError:
port_exists = False
LOG.debug("delete_port and update_device_down are being executed "
"concurrently. Ignoring StaleDataError.")
else:
try:
port_exists = bool(plugin.update_port_status(
rpc_context, port_id, n_const.PORT_STATUS_DOWN, host))
except exc.StaleDataError:
port_exists = False
LOG.debug("delete_port and update_device_down are being "
"executed concurrently. Ignoring StaleDataError.")
return {'device': device,
'exists': port_exists}
self.notify_ha_port_status(port_id, rpc_context,
n_const.PORT_STATUS_DOWN, host)
return {'device': device,
'exists': port_exists}
@ -217,11 +220,19 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
port = plugin._get_port(rpc_context, port_id)
except exceptions.PortNotFound:
LOG.debug("Port %s not found, will not notify nova.", port_id)
return
else:
if port.device_owner.startswith(
n_const.DEVICE_OWNER_COMPUTE_PREFIX):
plugin.nova_notifier.notify_port_active_direct(port)
return
return
else:
self.update_port_status_to_active(port, rpc_context, port_id, host)
self.notify_ha_port_status(port_id, rpc_context,
n_const.PORT_STATUS_ACTIVE, host, port=port)
def update_port_status_to_active(self, port, rpc_context, port_id, host):
plugin = manager.NeutronManager.get_plugin()
if port and port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE:
# NOTE(kevinbenton): we have to special case DVR ports because of
# the special multi-binding status update logic they have that
@ -241,6 +252,29 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
rpc_context, port['id'], resources.PORT,
provisioning_blocks.L2_AGENT_ENTITY)
def notify_ha_port_status(self, port_id, rpc_context,
status, host, port=None):
plugin = manager.NeutronManager.get_plugin()
l2pop_driver = plugin.mechanism_manager.mech_drivers.get(
'l2population')
if not l2pop_driver:
return
if not port:
port = ml2_db.get_port(rpc_context.session, port_id)
if not port:
return
is_ha_port = l3_hamode_db.is_ha_router_port(port['device_owner'],
port['device_id'])
if is_ha_port:
port_context = plugin.get_bound_port_context(
rpc_context, port_id)
port_context.current['status'] = status
port_context.current[portbindings.HOST_ID] = host
if status == n_const.PORT_STATUS_ACTIVE:
l2pop_driver.obj.update_port_up(port_context)
else:
l2pop_driver.obj.update_port_down(port_context)
def update_device_list(self, rpc_context, **kwargs):
devices_up = []
failed_devices_up = []

View File

@ -1165,6 +1165,39 @@ class L3HAModeDbTestCase(L3HATestFramework):
port = self._get_first_interface(router['id'])
self.assertEqual(self.agent1['host'], port[portbindings.HOST_ID])
def test_is_ha_router_port(self):
network_id = self._create_network(self.core_plugin, self.admin_ctx)
subnet = self._create_subnet(self.core_plugin, self.admin_ctx,
network_id)
interface_info = {'subnet_id': subnet['id']}
router = self._create_router()
self.plugin.add_router_interface(self.admin_ctx,
router['id'],
interface_info)
port = self._get_first_interface(router['id'])
self.assertTrue(l3_hamode_db.is_ha_router_port(
port['device_owner'], port['device_id']))
def test_is_ha_router_port_for_normal_port(self):
network_id = self._create_network(self.core_plugin, self.admin_ctx)
subnet = self._create_subnet(self.core_plugin, self.admin_ctx,
network_id)
interface_info = {'subnet_id': subnet['id']}
router = self._create_router(ha=False)
self.plugin.add_router_interface(self.admin_ctx,
router['id'],
interface_info)
device_filter = {'device_id': [router['id']],
'device_owner':
[constants.DEVICE_OWNER_ROUTER_INTF]}
port = self.core_plugin.get_ports(
self.admin_ctx, filters=device_filter)[0]
self.assertFalse(l3_hamode_db.is_ha_router_port(
port['device_owner'], port['device_id']))
class L3HAUserTestCase(L3HATestFramework):

View File

@ -13,23 +13,73 @@
# under the License.
from neutron_lib import constants
from oslo_utils import uuidutils
from neutron.common import constants as n_const
from neutron.common import utils
from neutron import context
from neutron.db import l3_attrs_db
from neutron.db import l3_db
from neutron.db import l3_hamode_db
from neutron.db import models_v2
from neutron.extensions import portbindings
from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db
from neutron.plugins.ml2 import models
from neutron.tests.common import helpers
from neutron.tests import tools
from neutron.tests.unit import testlib_api
HOST = helpers.HOST
HOST_2 = 'HOST_2'
HOST_3 = 'HOST_3'
HOST_2_TUNNELING_IP = '20.0.0.2'
HOST_3_TUNNELING_IP = '20.0.0.3'
TEST_ROUTER_ID = 'router_id'
TEST_NETWORK_ID = 'network_id'
TEST_HA_NETWORK_ID = 'ha_network_id'
class TestL2PopulationDBTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(TestL2PopulationDBTestCase, self).setUp()
self.ctx = context.get_admin_context()
self._create_network()
def _create_network(self, network_id=TEST_NETWORK_ID):
with self.ctx.session.begin(subtransactions=True):
self.ctx.session.add(models_v2.Network(id=network_id))
def _create_router(self, distributed=True, ha=False):
with self.ctx.session.begin(subtransactions=True):
self.ctx.session.add(l3_db.Router(id=TEST_ROUTER_ID))
self.ctx.session.add(l3_attrs_db.RouterExtraAttributes(
router_id=TEST_ROUTER_ID, distributed=distributed, ha=ha))
def _create_ha_router(self, distributed=False):
helpers.register_l3_agent(HOST_2)
helpers.register_ovs_agent(HOST_2, tunneling_ip=HOST_2_TUNNELING_IP)
# Register l3 agent on host3, which doesn't host any HA router.
# Tests should test that host3 is not a HA agent host.
helpers.register_l3_agent(HOST_3)
helpers.register_ovs_agent(HOST_3, tunneling_ip=HOST_3_TUNNELING_IP)
with self.ctx.session.begin(subtransactions=True):
self.ctx.session.add(models_v2.Network(id=TEST_HA_NETWORK_ID))
self._create_router(distributed=distributed, ha=True)
for state, host in [(n_const.HA_ROUTER_STATE_ACTIVE, HOST),
(n_const.HA_ROUTER_STATE_STANDBY, HOST_2)]:
self._setup_port_binding(
network_id=TEST_HA_NETWORK_ID,
device_owner=constants.DEVICE_OWNER_ROUTER_HA_INTF,
device_id=TEST_ROUTER_ID,
host_state=state,
host=host)
def get_l3_agent_by_host(self, agent_host):
plugin = helpers.FakePlugin()
return plugin._get_agent_by_type_and_host(
self.ctx, constants.AGENT_TYPE_L3, agent_host)
def test_get_agent_by_host(self):
# Register a L2 agent + A bunch of other agents on the same host
helpers.register_l3_agent()
helpers.register_dhcp_agent()
helpers.register_ovs_agent()
@ -38,58 +88,70 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase):
self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type)
def test_get_agent_by_host_no_candidate(self):
# Register a bunch of non-L2 agents on the same host
helpers.register_l3_agent()
helpers.register_dhcp_agent()
agent = l2pop_db.get_agent_by_host(
self.ctx.session, helpers.HOST)
self.assertIsNone(agent)
def _setup_port_binding(self, network_id='network_id', dvr=True):
def _setup_port_binding(self, **kwargs):
with self.ctx.session.begin(subtransactions=True):
self.ctx.session.add(models_v2.Network(id=network_id))
device_owner = constants.DEVICE_OWNER_DVR_INTERFACE if dvr else ''
mac = utils.get_random_mac('fa:16:3e:00:00:00'.split(':'))
port_id = uuidutils.generate_uuid()
network_id = kwargs.get('network_id', TEST_NETWORK_ID)
device_owner = kwargs.get('device_owner', '')
device_id = kwargs.get('device_id', '')
host = kwargs.get('host', helpers.HOST)
self.ctx.session.add(models_v2.Port(
id='port_id',
network_id=network_id,
mac_address='00:11:22:33:44:55',
admin_state_up=True,
status=constants.PORT_STATUS_ACTIVE,
device_id='',
device_owner=device_owner))
port_binding_cls = (models.DistributedPortBinding if dvr
else models.PortBinding)
binding_kwarg = {
'port_id': 'port_id',
'host': helpers.HOST,
'vif_type': portbindings.VIF_TYPE_UNBOUND,
'vnic_type': portbindings.VNIC_NORMAL
}
if dvr:
binding_kwarg['router_id'] = 'router_id'
id=port_id, network_id=network_id, mac_address=mac,
admin_state_up=True, status=constants.PORT_STATUS_ACTIVE,
device_id=device_id, device_owner=device_owner))
port_binding_cls = models.PortBinding
binding_kwarg = {'port_id': port_id,
'host': host,
'vif_type': portbindings.VIF_TYPE_UNBOUND,
'vnic_type': portbindings.VNIC_NORMAL}
if device_owner == constants.DEVICE_OWNER_DVR_INTERFACE:
port_binding_cls = models.DistributedPortBinding
binding_kwarg['router_id'] = TEST_ROUTER_ID
binding_kwarg['status'] = constants.PORT_STATUS_DOWN
self.ctx.session.add(port_binding_cls(**binding_kwarg))
if network_id == TEST_HA_NETWORK_ID:
agent = self.get_l3_agent_by_host(host)
haport_bindings_cls = l3_hamode_db.L3HARouterAgentPortBinding
habinding_kwarg = {'port_id': port_id,
'router_id': device_id,
'l3_agent_id': agent['id'],
'state': kwargs.get('host_state',
n_const.HA_ROUTER_STATE_ACTIVE)}
self.ctx.session.add(haport_bindings_cls(**habinding_kwarg))
def test_get_distributed_active_network_ports(self):
self._setup_port_binding()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_DVR_INTERFACE)
# Register a L2 agent + A bunch of other agents on the same host
helpers.register_l3_agent()
helpers.register_dhcp_agent()
helpers.register_ovs_agent()
tunnel_network_ports = l2pop_db.get_distributed_active_network_ports(
self.ctx.session, 'network_id')
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(1, len(tunnel_network_ports))
_, agent = tunnel_network_ports[0]
self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type)
def test_get_distributed_active_network_ports_no_candidate(self):
self._setup_port_binding()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_DVR_INTERFACE)
# Register a bunch of non-L2 agents on the same host
helpers.register_l3_agent()
helpers.register_dhcp_agent()
tunnel_network_ports = l2pop_db.get_distributed_active_network_ports(
self.ctx.session, 'network_id')
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(0, len(tunnel_network_ports))
def test_get_nondistributed_active_network_ports(self):
@ -99,7 +161,7 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase):
helpers.register_dhcp_agent()
helpers.register_ovs_agent()
fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports(
self.ctx.session, 'network_id')
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(1, len(fdb_network_ports))
_, agent = fdb_network_ports[0]
self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type)
@ -110,5 +172,116 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase):
helpers.register_l3_agent()
helpers.register_dhcp_agent()
fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports(
self.ctx.session, 'network_id')
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(0, len(fdb_network_ports))
def test__get_ha_router_interface_ids_with_ha_dvr_snat_port(self):
helpers.register_dhcp_agent()
helpers.register_l3_agent()
helpers.register_ovs_agent()
self._create_ha_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
ha_iface_ids = l2pop_db._get_ha_router_interface_ids(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(1, len(list(ha_iface_ids)))
def test__get_ha_router_interface_ids_with_ha_replicated_port(self):
helpers.register_dhcp_agent()
helpers.register_l3_agent()
helpers.register_ovs_agent()
self._create_ha_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_HA_REPLICATED_INT,
device_id=TEST_ROUTER_ID)
ha_iface_ids = l2pop_db._get_ha_router_interface_ids(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(1, len(list(ha_iface_ids)))
def test__get_ha_router_interface_ids_with_no_ha_port(self):
self._create_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
ha_iface_ids = l2pop_db._get_ha_router_interface_ids(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(0, len(list(ha_iface_ids)))
def test_active_network_ports_with_dvr_snat_port(self):
# Test to get agent hosting dvr snat port
helpers.register_l3_agent()
helpers.register_dhcp_agent()
helpers.register_ovs_agent()
# create DVR router
self._create_router()
# setup DVR snat port
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
helpers.register_dhcp_agent()
fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(1, len(fdb_network_ports))
def test_active_network_ports_with_ha_dvr_snat_port(self):
# test to get HA agents hosting HA+DVR snat port
helpers.register_dhcp_agent()
helpers.register_l3_agent()
helpers.register_ovs_agent()
# create HA+DVR router
self._create_ha_router()
# setup HA snat port
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(0, len(fdb_network_ports))
ha_ports = l2pop_db.get_ha_active_network_ports(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(2, len(ha_ports))
def test_active_port_count_with_dvr_snat_port(self):
helpers.register_l3_agent()
helpers.register_dhcp_agent()
helpers.register_ovs_agent()
self._create_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
helpers.register_dhcp_agent()
port_count = l2pop_db.get_agent_network_active_port_count(
self.ctx.session, HOST, TEST_NETWORK_ID)
self.assertEqual(1, port_count)
port_count = l2pop_db.get_agent_network_active_port_count(
self.ctx.session, HOST_2, TEST_NETWORK_ID)
self.assertEqual(0, port_count)
def test_active_port_count_with_ha_dvr_snat_port(self):
helpers.register_dhcp_agent()
helpers.register_l3_agent()
helpers.register_ovs_agent()
self._create_ha_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
port_count = l2pop_db.get_agent_network_active_port_count(
self.ctx.session, HOST, TEST_NETWORK_ID)
self.assertEqual(1, port_count)
port_count = l2pop_db.get_agent_network_active_port_count(
self.ctx.session, HOST_2, TEST_NETWORK_ID)
self.assertEqual(1, port_count)
def test_get_ha_agents_by_router_id(self):
helpers.register_dhcp_agent()
helpers.register_l3_agent()
helpers.register_ovs_agent()
self._create_ha_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
agents = l2pop_db.get_ha_agents_by_router_id(
self.ctx.session, TEST_ROUTER_ID)
ha_agents = [agent.host for agent in agents]
self.assertEqual(tools.UnorderedList([HOST, HOST_2]), ha_agents)

View File

@ -19,11 +19,18 @@ from neutron_lib import exceptions
from oslo_serialization import jsonutils
import testtools
from neutron.api.v2 import attributes
from neutron.common import constants as n_const
from neutron.common import topics
from neutron import context
from neutron.db import agents_db
from neutron.db import common_db_mixin
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_hamode_db
from neutron.extensions import portbindings
from neutron.extensions import providernet as pnet
from neutron import manager
from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2 import driver_context
from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db
from neutron.plugins.ml2.drivers.l2pop import mech_driver as l2pop_mech_driver
@ -31,6 +38,7 @@ from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc
from neutron.plugins.ml2 import managers
from neutron.plugins.ml2 import rpc
from neutron.scheduler import l3_agent_scheduler
from neutron.tests import base
from neutron.tests.common import helpers
from neutron.tests.unit.plugins.ml2 import test_plugin
@ -40,12 +48,20 @@ HOST_2 = HOST + '_2'
HOST_3 = HOST + '_3'
HOST_4 = HOST + '_4'
HOST_5 = HOST + '_5'
TEST_ROUTER_ID = 'router_id'
NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
DEVICE_OWNER_COMPUTE = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
class FakeL3PluginWithAgents(common_db_mixin.CommonDbMixin,
l3_hamode_db.L3_HA_NAT_db_mixin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agents_db.AgentDbMixin):
pass
class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
_mechanism_drivers = ['openvswitch', 'fake_agent', 'l2population']
@ -101,6 +117,18 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
uptime_patch = mock.patch(uptime, return_value=190)
uptime_patch.start()
def _setup_l3(self):
notif_p = mock.patch.object(l3_hamode_db.L3_HA_NAT_db_mixin,
'_notify_ha_interfaces_updated')
self.notif_m = notif_p.start()
self.plugin = FakeL3PluginWithAgents()
self._register_ml2_agents()
self._register_l3_agents()
def _register_l3_agents(self):
self.agent1 = helpers.register_l3_agent(host=HOST)
self.agent2 = helpers.register_l3_agent(host=HOST_2)
def _register_ml2_agents(self):
helpers.register_ovs_agent(host=HOST, tunneling_ip='20.0.0.1')
helpers.register_ovs_agent(host=HOST_2, tunneling_ip='20.0.0.2')
@ -167,6 +195,216 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
result = jsonutils.loads(jsonutils.dumps(payload))
self.assertEqual(entry, result['netuuid']['ports']['1'][0])
def _create_router(self, ha=True, tenant_id='tenant1',
distributed=None, ctx=None):
if ctx is None:
ctx = self.adminContext
ctx.tenant_id = tenant_id
router = {'name': TEST_ROUTER_ID, 'admin_state_up': True,
'tenant_id': ctx.tenant_id}
if ha is not None:
router['ha'] = ha
if distributed is not None:
router['distributed'] = distributed
return self.plugin.create_router(ctx, {'router': router})
def _bind_router(self, router_id):
with self.adminContext.session.begin(subtransactions=True):
scheduler = l3_agent_scheduler.ChanceScheduler()
filters = {'agent_type': [constants.AGENT_TYPE_L3]}
agents_db = self.plugin.get_agents_db(self.adminContext,
filters=filters)
scheduler._bind_ha_router_to_agents(
self.plugin,
self.adminContext,
router_id,
agents_db)
self._bind_ha_network_ports(router_id)
def _bind_ha_network_ports(self, router_id):
port_bindings = self.plugin.get_ha_router_port_bindings(
self.adminContext, [router_id])
plugin = manager.NeutronManager.get_plugin()
for port_binding in port_bindings:
filters = {'id': [port_binding.port_id]}
port = plugin.get_ports(self.adminContext, filters=filters)[0]
if port_binding.l3_agent_id == self.agent1['id']:
port[portbindings.HOST_ID] = self.agent1['host']
else:
port[portbindings.HOST_ID] = self.agent2['host']
plugin.update_port(self.adminContext, port['id'],
{attributes.PORT: port})
def _get_first_interface(self, net_id, router_id):
plugin = manager.NeutronManager.get_plugin()
device_filter = {'device_id': [router_id],
'device_owner':
[constants.DEVICE_OWNER_HA_REPLICATED_INT]}
return plugin.get_ports(self.adminContext, filters=device_filter)[0]
def _add_router_interface(self, subnet, router, host):
interface_info = {'subnet_id': subnet['id']}
self.plugin.add_router_interface(self.adminContext,
router['id'], interface_info)
self.plugin.update_routers_states(
self.adminContext,
{router['id']: n_const.HA_ROUTER_STATE_ACTIVE}, host)
port = self._get_first_interface(subnet['network_id'], router['id'])
self.mock_cast.reset_mock()
self.mock_fanout.reset_mock()
self.callbacks.update_device_up(self.adminContext, agent_id=host,
device=port['id'], host=host)
return port
def _create_ha_router(self):
self._setup_l3()
router = self._create_router()
self._bind_router(router['id'])
return router
def _verify_remove_fdb(self, expected, agent_id, device, host=None):
self.mock_fanout.reset_mock()
self.callbacks.update_device_down(self.adminContext, agent_id=host,
device=device, host=host)
self.mock_fanout.assert_called_with(
mock.ANY, 'remove_fdb_entries', expected)
def test_other_agents_get_flood_entries_for_ha_agents(self):
# First HA router port is added on HOST and HOST2, then network port
# is added on HOST4.
# HOST4 should get flood entries for HOST1 and HOST2
router = self._create_ha_router()
service_plugins = manager.NeutronManager.get_service_plugins()
service_plugins[service_constants.L3_ROUTER_NAT] = self.plugin
with self.subnet(network=self._network, enable_dhcp=False) as snet, \
mock.patch('neutron.manager.NeutronManager.get_service_plugins',
return_value=service_plugins):
subnet = snet['subnet']
port = self._add_router_interface(subnet, router, HOST)
host_arg = {portbindings.HOST_ID: HOST_4, 'admin_state_up': True}
with self.port(subnet=snet,
device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
p1 = port1['port']
device1 = 'tap' + p1['id']
self.mock_cast.reset_mock()
self.mock_fanout.reset_mock()
self.callbacks.update_device_up(
self.adminContext, agent_id=HOST_4, device=device1)
cast_expected = {
port['network_id']: {
'ports': {'20.0.0.1': [constants.FLOODING_ENTRY],
'20.0.0.2': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
self.assertEqual(1, self.mock_cast.call_count)
self.mock_cast.assert_called_with(
mock.ANY, 'add_fdb_entries', cast_expected, HOST_4)
def test_delete_ha_port(self):
# First network port is added on HOST, and then HA router port
# is added on HOST and HOST2.
# Remove_fdb should carry flood entry of only HOST2 and not HOST
router = self._create_ha_router()
service_plugins = manager.NeutronManager.get_service_plugins()
service_plugins[service_constants.L3_ROUTER_NAT] = self.plugin
with self.subnet(network=self._network, enable_dhcp=False) as snet, \
mock.patch('neutron.manager.NeutronManager.get_service_plugins',
return_value=service_plugins):
host_arg = {portbindings.HOST_ID: HOST, 'admin_state_up': True}
with self.port(subnet=snet,
device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
p1 = port1['port']
device1 = 'tap' + p1['id']
self.callbacks.update_device_up(self.adminContext,
agent_id=HOST, device=device1)
subnet = snet['subnet']
port = self._add_router_interface(subnet, router, HOST)
expected = {port['network_id']:
{'ports': {'20.0.0.2': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
self.mock_fanout.reset_mock()
interface_info = {'subnet_id': subnet['id']}
self.plugin.remove_router_interface(self.adminContext,
router['id'], interface_info)
self.mock_fanout.assert_called_with(
mock.ANY, 'remove_fdb_entries', expected)
def test_ha_agents_get_other_fdb(self):
# First network port is added on HOST4, then HA router port is
# added on HOST and HOST2.
# Both HA agents should create tunnels to HOST4 and among themselves.
# Both HA agents should be notified to other agents.
router = self._create_ha_router()
service_plugins = manager.NeutronManager.get_service_plugins()
service_plugins[service_constants.L3_ROUTER_NAT] = self.plugin
with self.subnet(network=self._network, enable_dhcp=False) as snet, \
mock.patch('neutron.manager.NeutronManager.get_service_plugins',
return_value=service_plugins):
host_arg = {portbindings.HOST_ID: HOST_4, 'admin_state_up': True}
with self.port(subnet=snet,
device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
p1 = port1['port']
device1 = 'tap' + p1['id']
self.callbacks.update_device_up(
self.adminContext, agent_id=HOST_4, device=device1)
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
subnet = snet['subnet']
port = self._add_router_interface(subnet, router, HOST)
fanout_expected = {port['network_id']: {
'ports': {'20.0.0.1': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
cast_expected_host = {port['network_id']: {
'ports': {
'20.0.0.4': [constants.FLOODING_ENTRY,
l2pop_rpc.PortInfo(p1['mac_address'],
p1_ips[0])],
'20.0.0.2': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
self.mock_cast.assert_called_with(
mock.ANY, 'add_fdb_entries', cast_expected_host, HOST)
self.mock_fanout.assert_called_with(
mock.ANY, 'add_fdb_entries', fanout_expected)
self.mock_cast.reset_mock()
self.mock_fanout.reset_mock()
self.callbacks.update_device_up(
self.adminContext, agent_id=HOST_2,
device=port['id'], host=HOST_2)
cast_expected_host2 = {port['network_id']: {
'ports': {
'20.0.0.4': [constants.FLOODING_ENTRY,
l2pop_rpc.PortInfo(p1['mac_address'],
p1_ips[0])],
'20.0.0.1': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
fanout_expected = {port['network_id']: {
'ports': {'20.0.0.2': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
self.mock_cast.assert_called_with(
mock.ANY, 'add_fdb_entries', cast_expected_host2, HOST_2)
self.mock_fanout.assert_called_with(
mock.ANY, 'add_fdb_entries', fanout_expected)
def test_fdb_add_called(self):
self._register_ml2_agents()
@ -842,12 +1080,15 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
l2pop_mech = l2pop_mech_driver.L2populationMechanismDriver()
l2pop_mech.L2PopulationAgentNotify = mock.Mock()
l2pop_mech.rpc_ctx = mock.Mock()
port = {'device_owner': ''}
context = mock.Mock()
context.current = port
with mock.patch.object(l2pop_mech,
'_get_agent_fdb',
return_value=None) as upd_port_down,\
mock.patch.object(l2pop_mech.L2PopulationAgentNotify,
'remove_fdb_entries'):
l2pop_mech.delete_port_postcommit(mock.Mock())
l2pop_mech.delete_port_postcommit(context)
self.assertTrue(upd_port_down.called)
def test_delete_unbound_port(self):

View File

@ -59,7 +59,8 @@ class RpcCallbacksTestCase(base.BaseTestCase):
'host': host
}
with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
'._device_to_port_id'):
'._device_to_port_id'),\
mock.patch.object(self.callbacks, 'notify_ha_port_status'):
with mock.patch('neutron.db.provisioning_blocks.'
'provisioning_complete') as pc:
self.callbacks.update_device_up(mock.Mock(), **kwargs)
@ -212,6 +213,7 @@ class RpcCallbacksTestCase(base.BaseTestCase):
def _test_update_device_not_bound_to_host(self, func):
self.plugin.port_bound_to_host.return_value = False
self.callbacks.notify_ha_port_status = mock.Mock()
self.plugin._device_to_port_id.return_value = 'fake_port_id'
res = func(mock.Mock(), device='fake_device', host='fake_host')
self.plugin.port_bound_to_host.assert_called_once_with(mock.ANY,
@ -234,6 +236,7 @@ class RpcCallbacksTestCase(base.BaseTestCase):
def test_update_device_down_call_update_port_status(self):
self.plugin.update_port_status.return_value = False
self.callbacks.notify_ha_port_status = mock.Mock()
self.plugin._device_to_port_id.return_value = 'fake_port_id'
self.assertEqual(
{'device': 'fake_device', 'exists': False},