Create/delete network policy in non-default namespace

Create network policy on non-default namespace and make sure that
CRD and SG were created for that policy
Remove network policy in non-default namespace and make sure
the network policy CRD and SG were deleted

Change-Id: I8a0a4b3a8ef18072eef0db05eb98eb1f7ca57553
This commit is contained in:
Genadi Chereshnya 2019-04-14 15:11:31 +03:00
parent 5df10697a4
commit c9ef83d0fb
1 changed files with 49 additions and 0 deletions

View File

@ -181,3 +181,52 @@ class TestNetworkPolicyScenario(base.BaseKuryrScenarioTest):
time.sleep(1)
if time.time() - start >= TIMEOUT_PERIOD:
raise lib_exc.TimeoutException('Sec group ID still exists')
@decorators.idempotent_id('24577a9b-1d46-419b-8b60-da3c49d777c3')
def test_create_delete_network_policy_non_default_ns(self):
ns_name, ns = self.create_namespace()
match_labels = {'role': 'db'}
np = self.create_network_policy(match_labels=match_labels,
namespace=ns_name)
LOG.debug("Creating network policy %s" % np)
network_policy_name = np.metadata.name
kuryr_netpolicy_crd_name = 'np-' + network_policy_name
network_policies = self.list_network_policies(namespace=ns_name)
kuryrnetpolicies = ''
start = time.time()
while time.time() - start < TIMEOUT_PERIOD:
try:
kuryrnetpolicies = self.get_kuryr_netpolicy_crds(
name=kuryr_netpolicy_crd_name, namespace=ns_name)
break
except kubernetes.client.rest.ApiException:
time.sleep(1)
continue
sg_id = kuryrnetpolicies['spec']['securityGroupId']
sgs = self.list_security_groups(fields='id')
sg_ids = [sg['id'] for sg in sgs]
self.assertIn(network_policy_name, network_policies)
self.assertEqual(kuryr_netpolicy_crd_name,
str(kuryrnetpolicies['metadata']['name']))
self.assertIn(sg_id, sg_ids)
self.delete_network_policy(network_policy_name, namespace=ns_name)
start = time.time()
while time.time() - start < TIMEOUT_PERIOD:
time.sleep(1)
if network_policy_name in self.list_network_policies(
namespace=ns_name):
continue
try:
self.get_kuryr_netpolicy_crds(name=kuryr_netpolicy_crd_name,
namespace=ns_name)
except kubernetes.client.rest.ApiException:
break
start = time.time()
while time.time() - start < TIMEOUT_PERIOD:
sgs_after = self.list_security_groups(fields='id')
sg_ids_after = [sg['id'] for sg in sgs_after]
if sg_id not in sg_ids_after:
break
time.sleep(1)
if time.time() - start >= TIMEOUT_PERIOD:
raise lib_exc.TimeoutException('Sec group ID still exists')