|
|
|
@ -77,22 +77,6 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
|
|
|
|
|
# and then bind this id to principal identity
|
|
|
|
|
fake_responses.append(self._get_mocked_response(201, []))
|
|
|
|
|
|
|
|
|
|
elif action == 'retry-create':
|
|
|
|
|
# simulate "identity already exists" failure
|
|
|
|
|
results = [{'id': self.cert_id}]
|
|
|
|
|
fake_responses.append(self._get_mocked_response(201, results))
|
|
|
|
|
fake_responses.append(self._get_mocked_error_response(400, 2027))
|
|
|
|
|
# after error generate code will retry identity deletion:
|
|
|
|
|
# first get indentities
|
|
|
|
|
results = [{'resource_type': 'Principal Identity',
|
|
|
|
|
'id': self.identity_id,
|
|
|
|
|
'name': self.identity,
|
|
|
|
|
'certificate_id': self.cert_id}]
|
|
|
|
|
# then delete identity
|
|
|
|
|
fake_responses.append(self._get_mocked_response(200, results))
|
|
|
|
|
# then retry identity create
|
|
|
|
|
fake_responses.append(self._get_mocked_response(204, []))
|
|
|
|
|
|
|
|
|
|
elif action == 'delete':
|
|
|
|
|
# get principal identities list
|
|
|
|
|
results = [{'resource_type': 'Principal Identity',
|
|
|
|
@ -126,7 +110,8 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
|
|
|
|
|
storage_driver)
|
|
|
|
|
self.assertFalse(cert.exists())
|
|
|
|
|
|
|
|
|
|
cert.generate(subject={}, key_size=2048, valid_for_days=333)
|
|
|
|
|
cert.generate(subject={}, key_size=2048, valid_for_days=333,
|
|
|
|
|
node_id='meh')
|
|
|
|
|
|
|
|
|
|
# verify client cert was generated and makes sense
|
|
|
|
|
self.assertTrue(cert.exists())
|
|
|
|
@ -150,7 +135,10 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
|
|
|
|
|
# verify API call to bind cert to identity on backend
|
|
|
|
|
uri = base_uri + '/principal-identities'
|
|
|
|
|
expected_body = {'name': self.identity,
|
|
|
|
|
'certificate_id': self.cert_id}
|
|
|
|
|
'node_id': 'meh',
|
|
|
|
|
'permission_group': 'read_write_api_users',
|
|
|
|
|
'certificate_id': self.cert_id,
|
|
|
|
|
'is_protected': True}
|
|
|
|
|
test_client.assert_json_call('post', mocked_trust.client, uri,
|
|
|
|
|
single_call=False,
|
|
|
|
|
data=jsonutils.dumps(expected_body,
|
|
|
|
@ -160,27 +148,6 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
|
|
|
|
|
self.assertRaises(nsxlib_exc.ObjectAlreadyExists,
|
|
|
|
|
cert.generate, {})
|
|
|
|
|
|
|
|
|
|
def test_generate_cert_with_retry(self):
|
|
|
|
|
"""Test startup without certificate + certificate generation"""
|
|
|
|
|
|
|
|
|
|
storage_driver = DummyStorageDriver()
|
|
|
|
|
# Prepare fake trust management for "cert create" requests
|
|
|
|
|
mocked_trust = self._get_mocked_trust('retry-create')
|
|
|
|
|
cert = client_cert.ClientCertificateManager(self.identity,
|
|
|
|
|
mocked_trust,
|
|
|
|
|
storage_driver)
|
|
|
|
|
self.assertFalse(cert.exists())
|
|
|
|
|
cert.generate(subject={}, key_size=4096, valid_for_days=3)
|
|
|
|
|
|
|
|
|
|
# verify client cert was generated and makes sense
|
|
|
|
|
self.assertTrue(cert.exists())
|
|
|
|
|
|
|
|
|
|
# verify cert ans PK were stored in storage
|
|
|
|
|
cert_pem, key_pem = cert.get_pem()
|
|
|
|
|
stored_cert, stored_key = storage_driver.get_cert(self.identity)
|
|
|
|
|
self.assertEqual(cert_pem, stored_cert)
|
|
|
|
|
self.assertEqual(key_pem, stored_key)
|
|
|
|
|
|
|
|
|
|
def _prepare_storage_with_existing_cert(self, key_size, days, alg, subj):
|
|
|
|
|
# prepare storage driver with existing cert and key
|
|
|
|
|
# this test simulates system startup
|
|
|
|
|