Add support for policy exclude list

The exclude list resource has only 1 instance, which can only be
updated ro retrived.
It contains a list of members (groups) which will be excluded from
the security policies.

Change-Id: I213b01052c49b0bee74c12eb1cd4625b6a75538d
This commit is contained in:
Adit Sarfaty 2019-03-28 14:17:26 +02:00
parent a754908731
commit 74e1525991
4 changed files with 113 additions and 0 deletions

View File

@ -3855,3 +3855,46 @@ class TestPolicyCertificate(NsxPolicyLibTestCase):
key_algo=key_algo
)
self.assert_called_with_def(update_call, expected_def)
class TestPolicyExcludeList(NsxPolicyLibTestCase):
def setUp(self, *args, **kwargs):
super(TestPolicyExcludeList, self).setUp()
self.resourceApi = self.policy_lib.exclude_list
def test_create_or_overwrite(self):
members = ["/infra/domains/default/groups/adit1"]
with mock.patch.object(self.policy_api,
"create_or_update") as api_call:
result = self.resourceApi.create_or_overwrite(
members=members,
tenant=TEST_TENANT)
expected_def = (
core_defs.ExcludeListDef(
policy_id=self.resourceApi.DEFAULT_ENTRY_ID,
name=self.resourceApi.DEFAULT_NAME,
members=members,
tenant=TEST_TENANT))
self.assert_called_with_def(api_call, expected_def)
self.assertIsNotNone(result)
def test_delete(self):
self.skipTest("The action is not supported by this resource")
def test_get(self):
obj_id = self.resourceApi.DEFAULT_ENTRY_ID
with mock.patch.object(self.policy_api, "get",
return_value={'id': obj_id}) as api_call:
result = self.resourceApi.get(obj_id, tenant=TEST_TENANT)
expected_def = core_defs.ExcludeListDef(
policy_id=obj_id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
self.assertEqual(obj_id, result['id'])
def test_list(self):
self.skipTest("The action is not supported by this resource")
def test_update(self):
self.skipTest("The action is not supported by this resource")

View File

@ -107,6 +107,7 @@ class NsxPolicyLib(lib.NsxLibBase):
core_resources.NsxIpv6NdraProfileApi(*args))
self.dhcp_relay_config = core_resources.NsxDhcpRelayConfigApi(*args)
self.certificate = core_resources.NsxPolicyCertApi(*args)
self.exclude_list = core_resources.NsxPolicyExcludeList(*args)
self.load_balancer = lb_resources.NsxPolicyLoadBalancerApi(*args)
@property

View File

@ -48,6 +48,8 @@ MAC_DISCOVERY_PROFILES_PATH_PATTERN = (TENANTS_PATH_PATTERN +
IPV6_NDRA_PROFILES_PATH_PATTERN = (TENANTS_PATH_PATTERN +
"ipv6-ndra-profiles/")
CERTIFICATE_PATH_PATTERN = TENANTS_PATH_PATTERN + "certificates/"
EXCLUDE_LIST_PATH_PATTERN = (TENANTS_PATH_PATTERN +
"firewall-configuration/exclude-list/")
REALIZATION_PATH = "infra/realized-state/realized-entities?intent_path=%s"
DHCP_REALY_PATTERN = TENANTS_PATH_PATTERN + "dhcp-relay-configs/"
@ -1645,6 +1647,26 @@ class CertificateDef(ResourceDef):
return body
class ExcludeListDef(ResourceDef):
@property
def path_pattern(self):
return EXCLUDE_LIST_PATH_PATTERN
@property
def path_ids(self):
return ('tenant', 'policy_id')
@staticmethod
def resource_type():
return "PolicyExcludeList"
def get_obj_dict(self):
body = super(ExcludeListDef, self).get_obj_dict()
self._set_attr_if_specified(body, 'members')
return body
class NsxPolicyApi(object):
def __init__(self, client):

View File

@ -3436,3 +3436,50 @@ class NsxPolicyCertApi(NsxPolicyResourceBase):
passphrase=passphrase,
key_algo=key_algo,
tenant=tenant)
class NsxPolicyExcludeList(NsxPolicyResourceBase):
"""NSX Policy Exclude list."""
DEFAULT_ENTRY_ID = 'security-policy'
DEFAULT_NAME = 'security-policy'
@property
def entry_def(self):
return core_defs.ExcludeListDef
def create_or_overwrite(self, name=DEFAULT_NAME,
policy_id=DEFAULT_ENTRY_ID,
members=IGNORE,
tenant=constants.POLICY_INFRA_TENANT):
exclude_list_def = self._init_def(policy_id=policy_id,
name=name,
members=members,
tenant=tenant)
self._create_or_store(exclude_list_def)
return policy_id
def delete(self, policy_id=DEFAULT_ENTRY_ID,
tenant=constants.POLICY_INFRA_TENANT):
err_msg = (_("This action is not supported"))
raise exceptions.ManagerError(details=err_msg)
def get(self, policy_id=DEFAULT_ENTRY_ID,
tenant=constants.POLICY_INFRA_TENANT, silent=False):
exclude_list_def = self.entry_def(policy_id=policy_id,
tenant=tenant)
return self.policy_api.get(exclude_list_def, silent=silent)
def list(self, tenant=constants.POLICY_INFRA_TENANT):
err_msg = (_("This action is not supported"))
raise exceptions.ManagerError(details=err_msg)
def update(self, name=DEFAULT_NAME,
policy_id=DEFAULT_ENTRY_ID,
members=IGNORE,
tenant=constants.POLICY_INFRA_TENANT):
err_msg = (_("This action is not supported"))
raise exceptions.ManagerError(details=err_msg)
# TODO(asarfaty): Add support for add/remove member