Add Certificate to policy API

Add certificate api to vmware-nsxlib policy API

Change-Id: Id108afaddbe2d561a9186b13bf0cb3a8af9f8765
This commit is contained in:
Gordon Zhang 2019-01-07 13:42:09 -08:00
parent 53eddb0f61
commit 64c9112cba
4 changed files with 199 additions and 0 deletions

View File

@ -3589,3 +3589,121 @@ class TestPolicyLBPoolApi(NsxPolicyLibTestCase):
snat_translation=snat_translation,
tenant=TEST_TENANT)
self.assert_called_with_def(update_call, expected_def)
class TestPolicyCertificate(NsxPolicyLibTestCase):
def setUp(self, *args, **kwargs):
super(TestPolicyCertificate, self).setUp()
self.resourceApi = self.policy_lib.certificate
def test_create_with_id(self):
name = 'd1'
description = 'desc'
id = '111'
pem_encoded = 'pem_encoded'
private_key = 'private_key'
passphrase = 'passphrase'
key_algo = 'algo'
with mock.patch.object(self.policy_api,
"create_or_update") as api_call:
self.resourceApi.create_or_overwrite(name,
certificate_id=id,
description=description,
pem_encoded=pem_encoded,
private_key=private_key,
passphrase=passphrase,
key_algo=key_algo,
tenant=TEST_TENANT)
expected_def = (
core_defs.CertificateDef(
certificate_id=id,
name=name,
description=description,
pem_encoded=pem_encoded,
private_key=private_key,
passphrase=passphrase,
key_algo=key_algo,
tenant=TEST_TENANT))
self.assert_called_with_def(api_call, expected_def)
def test_create_without_id(self):
name = 'd1'
description = 'desc'
pem_encoded = 'pem_encoded'
with mock.patch.object(self.policy_api,
"create_or_update") as api_call:
self.resourceApi.create_or_overwrite(name, description=description,
tenant=TEST_TENANT,
pem_encoded=pem_encoded)
expected_def = (
core_defs.CertificateDef(certificate_id=mock.ANY,
name=name,
description=description,
tenant=TEST_TENANT,
pem_encoded=pem_encoded))
self.assert_called_with_def(api_call, expected_def)
def test_delete(self):
id = '111'
with mock.patch.object(self.policy_api, "delete") as api_call:
self.resourceApi.delete(id, tenant=TEST_TENANT)
expected_def = core_defs.CertificateDef(
certificate_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get(self):
id = '111'
with mock.patch.object(self.policy_api, "get") as api_call:
self.resourceApi.get(id, tenant=TEST_TENANT)
expected_def = core_defs.CertificateDef(
certificate_id=id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get_by_name(self):
name = 'd1'
with mock.patch.object(
self.policy_api, "list",
return_value={'results': [{'display_name': name}]}) as api_call:
obj = self.resourceApi.get_by_name(name, tenant=TEST_TENANT)
self.assertIsNotNone(obj)
expected_def = core_defs.CertificateDef(tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_list(self):
with mock.patch.object(self.policy_api, "list") as api_call:
self.resourceApi.list(tenant=TEST_TENANT)
expected_def = core_defs.CertificateDef(tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_update(self):
id = '111'
name = 'new name'
description = 'new desc'
pem_encoded = 'pem_encoded'
private_key = 'private_key'
passphrase = '12'
key_algo = 'new_algo'
with mock.patch.object(self.policy_api,
"create_or_update") as update_call:
self.resourceApi.update(id,
name=name,
description=description,
tenant=TEST_TENANT,
pem_encoded=pem_encoded,
private_key=private_key,
passphrase=passphrase,
key_algo=key_algo)
expected_def = core_defs.CertificateDef(
certificate_id=id,
name=name,
description=description,
tenant=TEST_TENANT,
pem_encoded=pem_encoded,
private_key=private_key,
passphrase=passphrase,
key_algo=key_algo
)
self.assert_called_with_def(update_call, expected_def)

View File

@ -100,6 +100,7 @@ class NsxPolicyLib(lib.NsxLibBase):
core_resources.SegmentPortQosProfilesBindingMapApi(
*args))
self.dhcp_relay_config = core_resources.NsxDhcpRelayConfigApi(*args)
self.certificate = core_resources.NsxPolicyCertApi(*args)
self.load_balancer = lb_resources.NsxPolicyLoadBalancerApi(*args)
@property

View File

@ -45,6 +45,7 @@ IP_DISCOVERY_PROFILES_PATH_PATTERN = (TENANTS_PATH_PATTERN +
"ip-discovery-profiles/")
MAC_DISCOVERY_PROFILES_PATH_PATTERN = (TENANTS_PATH_PATTERN +
"mac-discovery-profiles/")
CERTIFICATE_PATH_PATTERN = TENANTS_PATH_PATTERN + "certificates/"
REALIZATION_PATH = "infra/realized-state/realized-entities?intent_path=%s"
DHCP_REALY_PATTERN = TENANTS_PATH_PATTERN + "dhcp-relay-configs/"
@ -1424,6 +1425,27 @@ class DhcpRelayConfigDef(ResourceDef):
return body
class CertificateDef(ResourceDef):
@property
def path_pattern(self):
return CERTIFICATE_PATH_PATTERN
@property
def path_ids(self):
return ('tenant', 'certificate_id')
@staticmethod
def resource_type():
return "TlsTrustData"
def get_obj_dict(self):
body = super(CertificateDef, self).get_obj_dict()
self._set_attrs_if_specified(body, ['pem_encoded', 'key_algo',
'private_key', 'passphrase'])
return body
class NsxPolicyApi(object):
def __init__(self, client):

View File

@ -2784,3 +2784,61 @@ class NsxDhcpRelayConfigApi(NsxPolicyResourceBase):
server_addresses=server_addresses,
tags=tags,
tenant=tenant)
class NsxPolicyCertApi(NsxPolicyResourceBase):
"""NSX Policy Certificate API."""
@property
def entry_def(self):
return core_defs.CertificateDef
def create_or_overwrite(self, name, certificate_id=None,
pem_encoded=IGNORE, private_key=IGNORE,
passphrase=IGNORE,
key_algo=IGNORE,
description=IGNORE,
tags=IGNORE,
tenant=constants.POLICY_INFRA_TENANT):
certificate_id = self._init_obj_uuid(certificate_id)
certificate_def = self._init_def(certificate_id=certificate_id,
name=name,
private_key=private_key,
pem_encoded=pem_encoded,
passphrase=passphrase,
key_algo=key_algo,
description=description,
tags=tags,
tenant=tenant)
self._create_or_store(certificate_def)
return certificate_id
def delete(self, certificate_id,
tenant=constants.POLICY_INFRA_TENANT):
certificate_def = self.entry_def(certificate_id=certificate_id,
tenant=tenant)
self.policy_api.delete(certificate_def)
def get(self, certificate_id, tenant=constants.POLICY_INFRA_TENANT,
silent=False):
certificate_def = self.entry_def(certificate_id=certificate_id,
tenant=tenant)
return self.policy_api.get(certificate_def, silent=silent)
def list(self, tenant=constants.POLICY_INFRA_TENANT):
certificate_def = self.entry_def(tenant=tenant)
return self._list(certificate_def)
def update(self, certificate_id, name=IGNORE,
pem_encoded=IGNORE, private_key=IGNORE,
passphrase=IGNORE, key_algo=IGNORE, description=IGNORE,
tags=IGNORE, tenant=constants.POLICY_INFRA_TENANT):
self._update(certificate_id=certificate_id,
name=name,
description=description,
tags=tags,
private_key=private_key,
pem_encoded=pem_encoded,
passphrase=passphrase,
key_algo=key_algo,
tenant=tenant)