Browse Source

Add 'unknown' to the address list if the port's port security is disabled

When a port's port security is disabled then the port should be allowed to use
any mac address. And for this to work, OVN expects 'unknown' address
to be added into the Logical_Switch_Port.addresses column. This patch
adds this 'unknown' address if the port security is disabled.

Change-Id: I5f25f4d4b2acccada3ab2944e9cfd2461149ef3e
Closes-bug: #1815270
Co-Authored-By: Lucas Alvares Gomes <lucasagomes@gmail.com>
changes/11/636011/5
Numan Siddique 3 years ago
committed by Lucas Alvares Gomes
parent
commit
ef5e9d31ec
  1. 23
      networking_ovn/common/maintenance.py
  2. 8
      networking_ovn/common/ovn_client.py
  3. 58
      networking_ovn/tests/functional/test_maintenance.py
  4. 21
      networking_ovn/tests/functional/test_ovn_db_sync.py
  5. 20
      networking_ovn/tests/unit/ml2/test_mech_driver.py

23
networking_ovn/common/maintenance.py

@ -384,3 +384,26 @@ class DBInconsistenciesPeriodics(object):
self._ovn_client.create_metadata_port(admin_context, n)
raise periodics.NeverAgain()
# TODO(lucasagomes): Remove this in the T cycle
# A static spacing value is used here, but this method will only run
# once per lock due to the use of periodics.NeverAgain().
@periodics.periodic(spacing=600, run_immediately=True)
def check_for_port_security_unknown_address(self):
if not self.has_lock:
return
for port in self._nb_idl.lsp_list().execute(check_error=True):
addresses = port.addresses
if not port.port_security and 'unknown' not in addresses:
addresses.append('unknown')
elif port.port_security and 'unknown' in addresses:
addresses.remove('unknown')
else:
continue
self._nb_idl.lsp_set_addresses(
port.name, addresses=addresses).execute(check_error=True)
raise periodics.NeverAgain()

8
networking_ovn/common/ovn_client.py

@ -227,6 +227,14 @@ class OVNClient(object):
addresses = [address]
addresses.extend(new_macs)
if not port_security:
# Port security is disabled for this port.
# So this port can send traffic with any mac address.
# OVN allows any mac address from a port if "unknown"
# is added to the Logical_Switch_Port.addresses column.
# So add it.
addresses.append("unknown")
# Only adjust the OVN type if the port is not owned by Neutron
# DHCP agents.
if (port['device_owner'] == const.DEVICE_OWNER_DHCP and

58
networking_ovn/tests/functional/test_maintenance.py

@ -735,3 +735,61 @@ class TestMaintenance(_TestMaintenanceHelper):
# Assert create_metadata_port() wasn't called since metadata
# is not enabled
self.assertFalse(mock_create_port.called)
def test_check_for_port_security_unknown_address(self):
neutron_net = self._create_network('network1')
neutron_port = self._create_port('port1', neutron_net['id'])
# Let's force disabling port security for the LSP
self.nb_api.lsp_set_port_security(neutron_port['id'], []).execute(
check_error=True)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
# Assert that port security is now disabled but the 'unknown'
# is not set in the addresses column
self.assertFalse(ovn_port['port_security'])
self.assertNotIn('unknown', ovn_port['addresses'])
# Call the maintenance task to fix the problem. Note that
# NeverAgain is raised so it only runs once at start up
self.assertRaises(periodics.NeverAgain,
self.maint.check_for_port_security_unknown_address)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
# Assert that 'unknown' was set in the addresses column for
# the port
self.assertFalse(ovn_port['port_security'])
self.assertIn('unknown', ovn_port['addresses'])
# Now the other way around, let's set port_security in the OVN
# table while the 'unknown' address is set in the addresses column
self.nb_api.lsp_set_port_security(
neutron_port['id'], ovn_port['addresses']).execute(
check_error=True)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
self.assertTrue(ovn_port['port_security'])
self.assertIn('unknown', ovn_port['addresses'])
# Call the maintenance task to fix the problem. Note that
# NeverAgain is raised so it only runs once at start up
self.assertRaises(periodics.NeverAgain,
self.maint.check_for_port_security_unknown_address)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
# Assert that 'unknown' was removed from the addresses column
# for the port
self.assertTrue(ovn_port['port_security'])
self.assertNotIn('unknown', ovn_port['addresses'])

21
networking_ovn/tests/functional/test_ovn_db_sync.py

@ -20,6 +20,7 @@ from neutron.tests.unit.extensions import test_extraroute
from neutron.tests.unit.extensions import test_securitygroup
from neutron_lib.api.definitions import dns as dns_apidef
from neutron_lib.api.definitions import l3
from neutron_lib.api.definitions import port_security as ps
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.plugins import directory
@ -78,6 +79,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self.lport_dhcp_ignored = []
self.match_old_mac_dhcp_subnets = []
self.expected_dns_records = []
self.expected_ports_with_unknown_addr = []
ovn_config.cfg.CONF.set_override('ovn_metadata_enabled', True,
group='ovn')
@ -139,9 +141,11 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
n1_port_dict = {}
for p in ['p1', 'p2', 'p3', 'p4', 'p5', 'p6', 'p7']:
if p in ['p1', 'p5']:
port_kwargs = {'arg_list': (dns_apidef.DNSNAME,),
dns_apidef.DNSNAME: 'n1-' + p,
'device_id': 'n1-' + p}
port_kwargs = {
'arg_list': (dns_apidef.DNSNAME, ps.PORTSECURITY),
dns_apidef.DNSNAME: 'n1-' + p,
ps.PORTSECURITY: 'False',
'device_id': 'n1-' + p}
else:
port_kwargs = {}
@ -152,6 +156,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
port = self.deserialize(self.fmt, res)
n1_port_dict[p] = port['port']['id']
lport_name = port['port']['id']
lswitch_name = 'neutron-' + n1['network']['id']
if p in ['p1', 'p5']:
port_ips = " ".join([f['ip_address']
@ -160,6 +165,8 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self.expected_dns_records[0]['records'][hname] = port_ips
hname = 'n1-' + p + '.ovn.test'
self.expected_dns_records[0]['records'][hname] = port_ips
self.expected_ports_with_unknown_addr.append(lport_name)
if p == 'p1':
fake_subnet = {'cidr': '11.11.11.11/24'}
dhcp_acls = acl_utils.add_acl_dhcp(port['port'], fake_subnet)
@ -974,6 +981,14 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
plugin_lport_ids_dhcpv6_enabled)
self.assertItemsEqual(expected_dhcpv6_options_ports_ids,
monitor_lport_ids_dhcpv6_enabled)
# Check if unknow address is set for the expected lports.
for row in (
self.nb_api.tables['Logical_Switch_Port'].
rows.values()):
if row.name in self.expected_ports_with_unknown_addr:
self.assertIn('unknown', row.addresses)
else:
self.assertRaises(
AssertionError, self.assertItemsEqual, db_port_ids,

20
networking_ovn/tests/unit/ml2/test_mech_driver.py

@ -405,6 +405,8 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
self.assertEqual([],
called_args_dict.get('port_security'))
self.assertEqual('unknown',
called_args_dict.get('addresses')[1])
data = {'port': {'mac_address': '00:00:00:00:00:01'}}
req = self.new_update_request(
'ports',
@ -416,6 +418,24 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
).call_args_list[0][1])
self.assertEqual([],
called_args_dict.get('port_security'))
self.assertEqual(2, len(called_args_dict.get('addresses')))
self.assertEqual('unknown',
called_args_dict.get('addresses')[1])
# Enable port security
data = {'port': {'port_security_enabled': 'True'}}
req = self.new_update_request(
'ports',
data, port['port']['id'])
req.get_response(self.api)
called_args_dict = (
(self.nb_ovn.set_lswitch_port
).call_args_list[1][1])
self.assertEqual(2,
self.nb_ovn.set_lswitch_port.call_count)
self.assertEqual(1, len(called_args_dict.get('addresses')))
self.assertNotIn('unknown',
called_args_dict.get('addresses'))
def test_create_port_security_allowed_address_pairs(self):
kwargs = {'allowed_address_pairs':

Loading…
Cancel
Save