Add RouteMap and PrefixList policy methods

Change-Id: Ia56a39214c8ac6d7dca91badce3ce17e99e300fd
This commit is contained in:
Danting Liu 2019-11-08 02:48:41 -08:00 committed by Adit Sarfaty
parent 2c247d4c17
commit 6c0cbfe6ae
4 changed files with 489 additions and 0 deletions

View File

@ -5197,3 +5197,227 @@ class TestPolicyExcludeList(NsxPolicyLibTestCase):
def test_update(self):
self.skipTest("The action is not supported by this resource")
class TestPolicyTier0RouteMap(NsxPolicyLibTestCase):
def setUp(self, *args, **kwargs):
super(TestPolicyTier0RouteMap, self).setUp()
self.resourceApi = self.policy_lib.tier0_route_map
def test_create(self):
name = 'route_map_test'
tier0_id = 't0_test'
with mock.patch.object(self.policy_api,
"create_or_update") as api_call:
# test with 'entries'
entry = core_defs.RouteMapEntry('DENY')
result = self.resourceApi.create_or_overwrite(
name, tier0_id, entries=[entry], tenant=TEST_TENANT)
expected_def = core_defs.Tier0RouteMapDef(
tier0_id=tier0_id,
route_map_id=mock.ANY,
name=name,
entries=[entry],
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
self.assertIsNotNone(result)
def test_delete(self):
tier0_id = 't0_test'
route_map_id = 'route_map_test'
with mock.patch.object(self.policy_api, "delete") as api_call:
self.resourceApi.delete(tier0_id, route_map_id, tenant=TEST_TENANT)
expected_def = core_defs.Tier0RouteMapDef(
tier0_id=tier0_id,
route_map_id=route_map_id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get(self):
tier0_id = 't0_test'
route_map_id = 'route_map_test'
entries = []
with mock.patch.object(self.policy_api, "get",
return_value={'id': route_map_id}) as api_call:
result = self.resourceApi.get(tier0_id, route_map_id,
tenant=TEST_TENANT)
expected_def = core_defs.Tier0RouteMapDef(
tier0_id=tier0_id,
route_map_id=route_map_id,
entries=entries,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
self.assertEqual(route_map_id, result['id'])
def test_list(self):
tier0_id = 't0_test'
with mock.patch.object(self.policy_api, "list",
return_value={'results': []}) as api_call:
result = self.resourceApi.list(tier0_id=tier0_id,
tenant=TEST_TENANT)
expected_def = core_defs.Tier0RouteMapDef(
tier0_id=tier0_id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
self.assertEqual([], result)
def test_update(self):
tier0_id = 't0_test'
route_map_id = 'route_map_test'
name = 'new_name'
entries = []
with self.mock_get(tier0_id, name), \
self.mock_create_update() as update_call:
self.resourceApi.update(name, tier0_id, route_map_id, entries,
tenant=TEST_TENANT)
expected_def = core_defs.Tier0RouteMapDef(
tier0_id=tier0_id,
route_map_id=route_map_id,
name=name,
entries=entries,
tenant=TEST_TENANT)
self.assert_called_with_def(update_call, expected_def)
def test_build_route_map_entry(self):
action = constants.ADV_RULE_PERMIT
community_list_matches = mock.ANY
prefix_list_matches = ["prefix_list_matches"]
entry_set = mock.ANY
route_map_entry = self.resourceApi.build_route_map_entry(
action, community_list_matches, prefix_list_matches, entry_set)
self.assertEqual(action, route_map_entry.action)
self.assertEqual(community_list_matches,
route_map_entry.community_list_matches)
self.assertEqual(prefix_list_matches,
route_map_entry.prefix_list_matches)
self.assertEqual(entry_set, route_map_entry.entry_set)
def test_build_route_map_entry_set(self):
local_preference = 100
as_path_prepend = mock.ANY
community = mock.ANY
med = mock.ANY
weight = mock.ANY
entry_set = self.resourceApi.build_route_map_entry_set(
local_preference, as_path_prepend, community, med, weight)
self.assertEqual(local_preference, entry_set.local_preference)
self.assertEqual(as_path_prepend, entry_set.as_path_prepend)
self.assertEqual(community, entry_set.community)
self.assertEqual(med, entry_set.med)
self.assertEqual(weight, entry_set.weight)
def test_build_community_match_criteria(self):
criteria = "test_criteria"
match_operator = mock.ANY
match_criteria = self.resourceApi.build_community_match_criteria(
criteria, match_operator)
self.assertEqual(criteria, match_criteria.criteria)
self.assertEqual(match_operator, match_criteria.match_operator)
class TestPolicyTier0PrefixList(NsxPolicyLibTestCase):
def setUp(self, *args, **kwargs):
super(TestPolicyTier0PrefixList, self).setUp()
self.resourceApi = self.policy_lib.tier0_prefix_list
def test_create(self):
name = 'prefix_list_test'
tier0_id = 't0_test'
with mock.patch.object(self.policy_api,
"create_or_update") as api_call:
# test with 'prefixes'
prefix = core_defs.PrefixEntry('network_test')
result = self.resourceApi.create_or_overwrite(
name, tier0_id, prefixes=[prefix], tenant=TEST_TENANT)
expected_def = core_defs.Tier0PrefixListDef(
tier0_id=tier0_id,
prefix_list_id=mock.ANY,
name=name,
prefixes=[prefix],
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
self.assertIsNotNone(result)
def test_delete(self):
tier0_id = 't0_test'
prefix_list_id = 'prefix_list_test'
with mock.patch.object(self.policy_api, "delete") as api_call:
self.resourceApi.delete(tier0_id, prefix_list_id,
tenant=TEST_TENANT)
expected_def = core_defs.Tier0PrefixListDef(
tier0_id=tier0_id,
prefix_list_id=prefix_list_id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get(self):
tier0_id = 't0_test'
prefix_list_id = 'prefix_list_test'
with mock.patch.object(
self.policy_api, "get",
return_value={'id': prefix_list_id}) as api_call:
result = self.resourceApi.get(tier0_id, prefix_list_id,
tenant=TEST_TENANT)
expected_def = core_defs.Tier0PrefixListDef(
tier0_id=tier0_id,
prefix_list_id=prefix_list_id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
self.assertEqual(prefix_list_id, result['id'])
def test_list(self):
tier0_id = 't0_test'
with mock.patch.object(self.policy_api, "list",
return_value={'results': []}) as api_call:
result = self.resourceApi.list(tier0_id=tier0_id,
tenant=TEST_TENANT)
expected_def = core_defs.Tier0PrefixListDef(
tier0_id=tier0_id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
self.assertEqual([], result)
def test_update(self):
tier0_id = 't0_test'
prefix_list_id = 'prefix_list_test'
name = 'new_name'
prefixes = []
with self.mock_get(tier0_id, name), \
self.mock_create_update() as update_call:
self.resourceApi.update(name, tier0_id, prefix_list_id, prefixes,
tenant=TEST_TENANT)
expected_def = core_defs.Tier0PrefixListDef(
tier0_id=tier0_id,
prefix_list_id=prefix_list_id,
name=name,
prefixes=prefixes,
tenant=TEST_TENANT)
self.assert_called_with_def(update_call, expected_def)
def test_build_prefix_entry(self):
network = "network_test"
le = mock.ANY
ge = mock.ANY
action = constants.ADV_RULE_DENY
prefix_entry = self.resourceApi.build_prefix_entry(
network, le, ge, action)
self.assertEqual(network, prefix_entry.network)
self.assertEqual(le, prefix_entry.le)
self.assertEqual(ge, prefix_entry.ge)
self.assertEqual(action, prefix_entry.action)

View File

@ -68,6 +68,9 @@ class NsxPolicyLib(lib.NsxLibBase):
self.tier0 = core_resources.NsxPolicyTier0Api(*args)
self.tier0_nat_rule = core_resources.NsxPolicyTier0NatRuleApi(
*args)
self.tier0_route_map = core_resources.NsxPolicyTier0RouteMapApi(*args)
self.tier0_prefix_list = core_resources.NsxPolicyTier0PrefixListApi(
*args)
self.tier1 = core_resources.NsxPolicyTier1Api(*args)
self.tier1_segment = core_resources.NsxPolicyTier1SegmentApi(*args)
self.tier1_nat_rule = core_resources.NsxPolicyTier1NatRuleApi(

View File

@ -2116,3 +2116,134 @@ class NsxPolicyApi(object):
entity = self.get_realized_entity(path, silent=silent)
if entity:
return entity['state']
class RouteMapEntry(object):
def __init__(self, action, community_list_matches=None,
prefix_list_matches=None, entry_set=None):
self.action = action
self.community_list_matches = community_list_matches
self.prefix_list_matches = prefix_list_matches
self.entry_set = entry_set
def get_obj_dict(self):
body = {'action': self.action}
if self.community_list_matches:
body['community_list_matches'] = [community.get_obj_dict()
for community in
self.community_list_matches]
if self.prefix_list_matches:
body['prefix_list_matches'] = (
self.prefix_list_matches
if isinstance(self.prefix_list_matches, list) else
[self.prefix_list_matches])
if self.entry_set:
body['set'] = self.entry_set.get_obj_dict()
return body
class RouteMapEntrySet(object):
def __init__(self, local_preference=100, as_path_prepend=None,
community=None, med=None, weight=None):
self.local_preference = local_preference
self.as_path_prepend = as_path_prepend
self.community = community
self.med = med
self.weight = weight
def get_obj_dict(self):
body = {'local_preference': self.local_preference}
if self.as_path_prepend:
body['as_path_prepend'] = self.as_path_prepend
if self.community:
body['community'] = self.community
if self.med:
body['med'] = self.med
if self.weight:
body['weight'] = self.weight
return body
class CommunityMatchCriteria(object):
def __init__(self, criteria, match_operator=None):
self.criteria = criteria
self.match_operator = match_operator
def get_obj_dict(self):
body = {'criteria': self.criteria}
if self.match_operator:
body['match_operator'] = self.match_operator
return body
class Tier0RouteMapDef(ResourceDef):
@property
def path_pattern(self):
return TIER0S_PATH_PATTERN + "%s/route-maps/"
@property
def path_ids(self):
return ('tenant', 'tier0_id', 'route_map_id')
@staticmethod
def resource_type():
return 'Tier0RouteMap'
def path_defs(self):
return (TenantDef, Tier0Def)
def get_obj_dict(self):
body = super(Tier0RouteMapDef, self).get_obj_dict()
entries = self.get_attr('entries')
if entries:
entries = [entry.get_obj_dict()
if isinstance(entry, RouteMapEntry) else entry
for entry in self.get_attr('entries')]
body['entries'] = entries
return body
class PrefixEntry(object):
def __init__(self, network, le=None, ge=None,
action=constants.ADV_RULE_PERMIT):
self.network = network
self.le = le
self.ge = ge
self.action = action
def get_obj_dict(self):
body = {'network': self.network,
'action': self.action}
if self.le is not None:
body['le'] = self.le
if self.ge is not None:
body['ge'] = self.ge
return body
class Tier0PrefixListDef(ResourceDef):
@property
def path_pattern(self):
return TIER0S_PATH_PATTERN + "%s/prefix-lists/"
@property
def path_ids(self):
return ('tenant', 'tier0_id', 'prefix_list_id')
@staticmethod
def resource_type():
return 'PrefixList'
def path_defs(self):
return (TenantDef, Tier0Def)
def get_obj_dict(self):
body = super(Tier0PrefixListDef, self).get_obj_dict()
prefixes = self.get_attr('prefixes')
if prefixes:
prefixes = [prefix.get_obj_dict() for prefix in prefixes]
body['prefixes'] = prefixes
return body

View File

@ -4141,3 +4141,134 @@ class NsxPolicyExcludeListApi(NsxPolicyResourceBase):
raise exceptions.ManagerError(details=err_msg)
# TODO(asarfaty): Add support for add/remove member
class NsxPolicyTier0RouteMapApi(NsxPolicyResourceBase):
@property
def entry_def(self):
return core_defs.Tier0RouteMapDef
def create_or_overwrite(self, name, tier0_id,
route_map_id=None,
entries=IGNORE,
description=IGNORE,
tags=IGNORE,
tenant=constants.POLICY_INFRA_TENANT):
route_map_id = self._init_obj_uuid(route_map_id)
route_map_def = self._init_def(tier0_id=tier0_id,
route_map_id=route_map_id,
name=name,
entries=entries,
description=description,
tags=tags,
tenant=tenant)
self._create_or_store(route_map_def)
return route_map_id
def delete(self, tier0_id, route_map_id,
tenant=constants.POLICY_INFRA_TENANT):
route_map_def = self.entry_def(tier0_id=tier0_id,
route_map_id=route_map_id,
tenant=tenant)
self.policy_api.delete(route_map_def)
def get(self, tier0_id, route_map_id,
tenant=constants.POLICY_INFRA_TENANT):
route_map_def = self.entry_def(tier0_id=tier0_id,
route_map_id=route_map_id,
tenant=tenant)
return self.policy_api.get(route_map_def)
def list(self, tier0_id, tenant=constants.POLICY_INFRA_TENANT):
route_map_def = self.entry_def(tier0_id=tier0_id, tenant=tenant)
return self._list(route_map_def)
def update(self, name, tier0_id,
route_map_id,
entries,
description=IGNORE,
tags=IGNORE,
tenant=constants.POLICY_INFRA_TENANT):
self._update(tier0_id=tier0_id,
route_map_id=route_map_id,
name=name,
entries=entries,
description=description,
tags=tags,
tenant=tenant)
def build_route_map_entry(self, action, community_list_matches=None,
prefix_list_matches=None, entry_set=None):
return core_defs.RouteMapEntry(action, community_list_matches,
prefix_list_matches, entry_set)
def build_route_map_entry_set(self, local_preference=100,
as_path_prepend=None, community=None,
med=None, weight=None):
return core_defs.RouteMapEntrySet(local_preference, as_path_prepend,
community, med, weight)
def build_community_match_criteria(self, criteria, match_operator=None):
return core_defs.CommunityMatchCriteria(criteria, match_operator)
class NsxPolicyTier0PrefixListApi(NsxPolicyResourceBase):
@property
def entry_def(self):
return core_defs.Tier0PrefixListDef
def create_or_overwrite(self, name, tier0_id,
prefix_list_id=None,
prefixes=IGNORE,
description=IGNORE,
tags=IGNORE,
tenant=constants.POLICY_INFRA_TENANT):
prefix_list_id = self._init_obj_uuid(prefix_list_id)
prefix_list_def = self._init_def(tier0_id=tier0_id,
prefix_list_id=prefix_list_id,
name=name,
prefixes=prefixes,
description=description,
tags=tags,
tenant=tenant)
self._create_or_store(prefix_list_def)
return prefix_list_id
def delete(self, tier0_id, prefix_list_id,
tenant=constants.POLICY_INFRA_TENANT):
prefix_list_def = self.entry_def(tier0_id=tier0_id,
prefix_list_id=prefix_list_id,
tenant=tenant)
self.policy_api.delete(prefix_list_def)
def get(self, tier0_id, prefix_list_id,
tenant=constants.POLICY_INFRA_TENANT):
prefix_list_def = self.entry_def(tier0_id=tier0_id,
prefix_list_id=prefix_list_id,
tenant=tenant)
return self.policy_api.get(prefix_list_def)
def list(self, tier0_id, tenant=constants.POLICY_INFRA_TENANT):
prefix_list_def = self.entry_def(tier0_id=tier0_id, tenant=tenant)
return self._list(prefix_list_def)
def update(self, name, tier0_id,
prefix_list_id,
prefixes,
description=IGNORE,
tags=IGNORE,
tenant=constants.POLICY_INFRA_TENANT):
self._update(tier0_id=tier0_id,
prefix_list_id=prefix_list_id,
name=name,
prefixes=prefixes,
description=description,
tags=tags,
tenant=tenant)
def build_prefix_entry(self, network, le=None, ge=None,
action=constants.ADV_RULE_PERMIT):
return core_defs.PrefixEntry(network, le, ge, action)