Merge "[OVN] Create the SG rules revision number registers" into stable/2024.1
This commit is contained in:
commit
6af97fc6bb
@ -404,6 +404,11 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
context, security_group['id'],
|
||||
ovn_const.TYPE_SECURITY_GROUPS,
|
||||
std_attr_id=security_group['standard_attr_id'])
|
||||
for sg_rule in security_group['security_group_rules']:
|
||||
ovn_revision_numbers_db.create_initial_revision(
|
||||
context, sg_rule['id'],
|
||||
ovn_const.TYPE_SECURITY_GROUP_RULES,
|
||||
std_attr_id=sg_rule['standard_attr_id'])
|
||||
|
||||
def _create_security_group(self, resource, event, trigger, payload):
|
||||
context = payload.context
|
||||
|
@ -2510,6 +2510,9 @@ class OVNClient(object):
|
||||
self.is_allow_stateless_supported())
|
||||
db_rev.bump_revision(
|
||||
context, security_group, ovn_const.TYPE_SECURITY_GROUPS)
|
||||
for sg_rule in security_group['security_group_rules']:
|
||||
db_rev.bump_revision(
|
||||
context, sg_rule, ovn_const.TYPE_SECURITY_GROUP_RULES)
|
||||
|
||||
def _add_port_to_drop_port_group(self, port, txn):
|
||||
txn.add(self._nb_idl.pg_add_ports(ovn_const.OVN_DROP_PORT_GROUP_NAME,
|
||||
|
@ -26,6 +26,7 @@ from neutron.common.ovn import constants as ovn_const
|
||||
from neutron.common.ovn import utils
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as ovn_config
|
||||
from neutron.db import ovn_revision_numbers_db as rev_db
|
||||
from neutron.tests.functional import base
|
||||
|
||||
|
||||
@ -922,6 +923,25 @@ class TestPortSecurity(base.TestOVNFunctionalBase):
|
||||
self._verify_port_acls(port_id, expected_acls_with_sg_ps_enabled)
|
||||
|
||||
|
||||
class TestSecurityGroups(base.TestOVNFunctionalBase):
|
||||
|
||||
def test_security_group_creation_and_deletion(self):
|
||||
sg = self._make_security_group(self.fmt)['security_group']
|
||||
rev_num = rev_db.get_revision_row(self.context, sg['id'])
|
||||
self.assertEqual(1, rev_num.revision_number)
|
||||
for sg_rule in sg['security_group_rules']:
|
||||
rev_num = rev_db.get_revision_row(self.context, sg_rule['id'])
|
||||
self.assertEqual(0, rev_num.revision_number)
|
||||
|
||||
self._delete('security-groups', sg['id'])
|
||||
self.assertIsNone(rev_db.get_revision_row(self.context, sg['id']))
|
||||
# NOTE(ralonsoh): the deletion of the revision numbers of the SG rules
|
||||
# will be fixed in a follow-up patch.
|
||||
# for sg_rule in sg['security_group_rules']:
|
||||
# self.assertIsNone(rev_db.get_revision_row(self.context,
|
||||
# sg_rule['id']))
|
||||
|
||||
|
||||
class TestDNSRecords(base.TestOVNFunctionalBase):
|
||||
_extension_drivers = ['port_security', 'dns']
|
||||
|
||||
|
@ -661,6 +661,22 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
|
||||
self._check_http_response(res)
|
||||
return self.deserialize(fmt, res)
|
||||
|
||||
def _make_security_group(self, fmt, name=None, expected_res_status=None,
|
||||
project_id=None, is_admin=False):
|
||||
name = name or 'sg-{}'.format(uuidutils.generate_uuid())
|
||||
project_id = project_id or self._tenant_id
|
||||
data = {'security_group': {'name': name,
|
||||
'description': name,
|
||||
'project_id': project_id}}
|
||||
sg_req = self.new_create_request('security-groups', data, fmt,
|
||||
tenant_id=project_id,
|
||||
as_admin=is_admin)
|
||||
sg_res = sg_req.get_response(self.api)
|
||||
if expected_res_status:
|
||||
self.assertEqual(expected_res_status, sg_res.status_int)
|
||||
self._check_http_response(sg_res)
|
||||
return self.deserialize(fmt, sg_res)
|
||||
|
||||
def _create_qos_rule(self, fmt, qos_policy_id, rule_type, max_kbps=None,
|
||||
max_burst_kbps=None, dscp_mark=None, min_kbps=None,
|
||||
direction=constants.EGRESS_DIRECTION,
|
||||
|
@ -291,8 +291,12 @@ class TestOVNMechanismDriver(TestOVNMechanismDriverBase):
|
||||
for c in self.nb_ovn.pg_acl_add.call_args_list:
|
||||
self.assertEqual(expected, c[1]["action"])
|
||||
|
||||
mock_bump.assert_called_once_with(
|
||||
mock.ANY, self.fake_sg, ovn_const.TYPE_SECURITY_GROUPS)
|
||||
calls = [mock.call(mock.ANY, self.fake_sg,
|
||||
ovn_const.TYPE_SECURITY_GROUPS)]
|
||||
for sg_rule in self.fake_sg['security_group_rules']:
|
||||
calls.append(mock.call(mock.ANY, sg_rule,
|
||||
ovn_const.TYPE_SECURITY_GROUP_RULES))
|
||||
mock_bump.assert_has_calls(calls)
|
||||
|
||||
def test__create_security_group_stateful_supported(self):
|
||||
self._test__create_security_group(True, True)
|
||||
|
Loading…
Reference in New Issue
Block a user