Browse Source

Add 'unknown' to the address list if the port's port security is disabled

When a port's port security is disabled then the port should be allowed to use
any mac address. And for this to work, OVN expects 'unknown' address
to be added into the Logical_Switch_Port.addresses column. This patch
adds this 'unknown' address if the port security is disabled.

Change-Id: I5f25f4d4b2acccada3ab2944e9cfd2461149ef3e
Closes-bug: #1815270
Co-Authored-By: Lucas Alvares Gomes <lucasagomes@gmail.com>
(cherry picked from commit ef5e9d31ec)
changes/60/699660/2
Numan Siddique 1 year ago
parent
commit
f695252b24
5 changed files with 125 additions and 3 deletions
  1. +20
    -0
      networking_ovn/common/maintenance.py
  2. +8
    -0
      networking_ovn/common/ovn_client.py
  3. +59
    -0
      networking_ovn/tests/functional/test_maintenance.py
  4. +18
    -3
      networking_ovn/tests/functional/test_ovn_db_sync.py
  5. +20
    -0
      networking_ovn/tests/unit/ml2/test_mech_driver.py

+ 20
- 0
networking_ovn/common/maintenance.py View File

@@ -332,3 +332,23 @@ class DBInconsistenciesPeriodics(object):
router_id = port['device_id']
self._ovn_client._l3_plugin.add_router_interface(
admin_context, router_id, {'port_id': port['id']}, may_exist=True)

@periodics.periodic(spacing=600, run_immediately=True)
def check_for_port_security_unknown_address(self):

if not self.has_lock:
return

for port in self._nb_idl.lsp_list().execute(check_error=True):
addresses = port.addresses
if not port.port_security and 'unknown' not in addresses:
addresses.append('unknown')
elif port.port_security and 'unknown' in addresses:
addresses.remove('unknown')
else:
continue

self._nb_idl.lsp_set_addresses(
port.name, addresses=addresses).execute(check_error=True)

raise periodics.NeverAgain()

+ 8
- 0
networking_ovn/common/ovn_client.py View File

@@ -225,6 +225,14 @@ class OVNClient(object):
port_type = ovn_const.OVN_NEUTRON_OWNER_TO_PORT_TYPE.get(
port['device_owner'], '')

if not port_security:
# Port security is disabled for this port.
# So this port can send traffic with any mac address.
# OVN allows any mac address from a port if "unknown"
# is added to the Logical_Switch_Port.addresses column.
# So add it.
addresses.append("unknown")

dhcpv4_options = self._get_port_dhcp_options(port, const.IP_VERSION_4)
dhcpv6_options = self._get_port_dhcp_options(port, const.IP_VERSION_6)


+ 59
- 0
networking_ovn/tests/functional/test_maintenance.py View File

@@ -15,6 +15,7 @@

import mock

from futurist import periodics
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.extensions import test_extraroute
from neutron.tests.unit.extensions import test_securitygroup
@@ -566,3 +567,61 @@ class TestMaintenance(_TestMaintenanceHelper):

# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(neutron_obj['port_id']))

def test_check_for_port_security_unknown_address(self):
neutron_net = self._create_network('network1')
neutron_port = self._create_port('port1', neutron_net['id'])

# Let's force disabling port security for the LSP
self.nb_api.lsp_set_port_security(neutron_port['id'], []).execute(
check_error=True)

ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]

# Assert that port security is now disabled but the 'unknown'
# is not set in the addresses column
self.assertFalse(ovn_port['port_security'])
self.assertNotIn('unknown', ovn_port['addresses'])

# Call the maintenance task to fix the problem. Note that
# NeverAgain is raised so it only runs once at start up
self.assertRaises(periodics.NeverAgain,
self.maint.check_for_port_security_unknown_address)

ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]

# Assert that 'unknown' was set in the addresses column for
# the port
self.assertFalse(ovn_port['port_security'])
self.assertIn('unknown', ovn_port['addresses'])

# Now the other way around, let's set port_security in the OVN
# table while the 'unknown' address is set in the addresses column
self.nb_api.lsp_set_port_security(
neutron_port['id'], ovn_port['addresses']).execute(
check_error=True)

ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]

self.assertTrue(ovn_port['port_security'])
self.assertIn('unknown', ovn_port['addresses'])

# Call the maintenance task to fix the problem. Note that
# NeverAgain is raised so it only runs once at start up
self.assertRaises(periodics.NeverAgain,
self.maint.check_for_port_security_unknown_address)

ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]

# Assert that 'unknown' was removed from the addresses column
# for the port
self.assertTrue(ovn_port['port_security'])
self.assertNotIn('unknown', ovn_port['addresses'])

+ 18
- 3
networking_ovn/tests/functional/test_ovn_db_sync.py View File

@@ -20,6 +20,7 @@ from neutron.tests.unit.extensions import test_extraroute
from neutron.tests.unit.extensions import test_securitygroup
from neutron_lib.api.definitions import dns as dns_apidef
from neutron_lib.api.definitions import l3
from neutron_lib.api.definitions import port_security as ps
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.plugins import directory
@@ -78,6 +79,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self.lport_dhcp_ignored = []
self.match_old_mac_dhcp_subnets = []
self.expected_dns_records = []
self.expected_ports_with_unknown_addr = []
ovn_config.cfg.CONF.set_override('ovn_metadata_enabled', True,
group='ovn')

@@ -139,9 +141,11 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
n1_port_dict = {}
for p in ['p1', 'p2', 'p3', 'p4', 'p5', 'p6', 'p7']:
if p in ['p1', 'p5']:
port_kwargs = {'arg_list': (dns_apidef.DNSNAME,),
dns_apidef.DNSNAME: 'n1-' + p,
'device_id': 'n1-' + p}
port_kwargs = {
'arg_list': (dns_apidef.DNSNAME, ps.PORTSECURITY),
dns_apidef.DNSNAME: 'n1-' + p,
ps.PORTSECURITY: 'False',
'device_id': 'n1-' + p}
else:
port_kwargs = {}

@@ -152,6 +156,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
port = self.deserialize(self.fmt, res)
n1_port_dict[p] = port['port']['id']
lport_name = port['port']['id']

lswitch_name = 'neutron-' + n1['network']['id']
if p in ['p1', 'p5']:
port_ips = " ".join([f['ip_address']
@@ -160,6 +165,8 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self.expected_dns_records[0]['records'][hname] = port_ips
hname = 'n1-' + p + '.ovn.test'
self.expected_dns_records[0]['records'][hname] = port_ips
self.expected_ports_with_unknown_addr.append(lport_name)

if p == 'p1':
fake_subnet = {'cidr': '11.11.11.11/24'}
dhcp_acls = acl_utils.add_acl_dhcp(port['port'], fake_subnet)
@@ -974,6 +981,14 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
plugin_lport_ids_dhcpv6_enabled)
self.assertItemsEqual(expected_dhcpv6_options_ports_ids,
monitor_lport_ids_dhcpv6_enabled)

# Check if unknow address is set for the expected lports.
for row in (
self.nb_api.tables['Logical_Switch_Port'].
rows.values()):
if row.name in self.expected_ports_with_unknown_addr:
self.assertIn('unknown', row.addresses)

else:
self.assertRaises(
AssertionError, self.assertItemsEqual, db_port_ids,

+ 20
- 0
networking_ovn/tests/unit/ml2/test_mech_driver.py View File

@@ -408,6 +408,8 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
self.assertEqual([],
called_args_dict.get('port_security'))

self.assertEqual('unknown',
called_args_dict.get('addresses')[1])
data = {'port': {'mac_address': '00:00:00:00:00:01'}}
req = self.new_update_request(
'ports',
@@ -419,6 +421,24 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
).call_args_list[0][1])
self.assertEqual([],
called_args_dict.get('port_security'))
self.assertEqual(2, len(called_args_dict.get('addresses')))
self.assertEqual('unknown',
called_args_dict.get('addresses')[1])

# Enable port security
data = {'port': {'port_security_enabled': 'True'}}
req = self.new_update_request(
'ports',
data, port['port']['id'])
req.get_response(self.api)
called_args_dict = (
(self.nb_ovn.set_lswitch_port
).call_args_list[1][1])
self.assertEqual(2,
self.nb_ovn.set_lswitch_port.call_count)
self.assertEqual(1, len(called_args_dict.get('addresses')))
self.assertNotIn('unknown',
called_args_dict.get('addresses'))

def test_create_port_security_allowed_address_pairs(self):
# NOTE(mjozefcz): Lets pretend this is nova port to not

Loading…
Cancel
Save