Support client certificate import

In addition, add getters for certificate fields,
and ensure certificate object has short lifespan, since
it might change in storage

Change-Id: I2abbec0e48d82d432c9cc18afaca62bae7558d7c
This commit is contained in:
Anna Khmelnitsky 2017-01-18 14:25:55 -08:00
parent 76b47c2bb7
commit 763f024ab8
4 changed files with 160 additions and 41 deletions

View File

@ -181,18 +181,26 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
self.assertEqual(cert_pem, stored_cert)
self.assertEqual(key_pem, stored_key)
def test_load_and_delete_existing_cert(self):
"""Test startup with existing certificate + certificate deletion"""
def _prepare_storage_with_existing_cert(self, key_size, days, alg, subj):
# prepare storage driver with existing cert and key
# this test simulates system startup
cert, key = client_cert.generate_self_signed_cert_pair(4096, 365,
'sha256', {})
cert, key = client_cert.generate_self_signed_cert_pair(key_size, days,
alg, subj)
storage_driver = DummyStorageDriver()
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
key_pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
storage_driver.store_cert(self.identity, cert_pem, key_pem)
return storage_driver
def test_load_and_delete_existing_cert(self):
"""Test startup with existing certificate + certificate deletion"""
storage_driver = self._prepare_storage_with_existing_cert(4096,
3650,
'sha256',
{})
# get mocked backend driver for trust management,
# prepared for get request, that preceeds delete operation
mocked_trust = self._get_mocked_trust('delete')
@ -223,6 +231,30 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
test_client.assert_json_call('delete', mocked_trust.client, uri,
single_call=False)
def test_get_certificate_details(self):
"""Test retrieving cert details for existing cert"""
key_size = 2048
days = 999
alg = 'sha256'
subj = {'country': 'CA',
'organization': 'squirrel rights',
'hostname': 'www.squirrels.ca',
'unit': 'nuts',
'state': 'BC'}
storage_driver = self._prepare_storage_with_existing_cert(key_size,
days, alg,
subj)
with client_cert.ClientCertificateManager(self.identity,
None,
storage_driver) as cert:
self.assertTrue(cert.exists())
self.assertEqual(days - 1, cert.expires_in_days())
self.assertEqual(key_size, cert.get_key_size())
cert_subj = cert.get_subject()
self.assertEqual(subj, cert_subj)
def test_bad_certificate_values(self):
bad_cert_values = [{'key_size': 1024,
'valid_for_days': 10,

View File

@ -96,9 +96,25 @@ class ClientCertificateManager(object):
self._nsx_trust_management = nsx_trust_management
self._load_cert_and_key()
def __enter__(self):
"""Load cert from storage
def generate(self, subject, key_size=2048, valid_for_days=365,
This is an optimization to avoid repeated storage access.
Usage example:
with cert_manager as c:
if c.exists():
date = c.expires_on()
days = c.exires_in_days()
"""
self._cert, self._key = self.get_cert_and_key()
return self
def __exit__(self, type, value, traceback):
self._cert = None
self._key = None
def generate(self, subject, key_size=2048, valid_for_days=3650,
signature_alg='sha256'):
"""Generate new certificate and register it in the system
@ -108,19 +124,23 @@ class ClientCertificateManager(object):
"""
self._validate_empty()
self._cert, self._key = generate_self_signed_cert_pair(key_size,
valid_for_days,
signature_alg,
subject)
cert, key = generate_self_signed_cert_pair(key_size,
valid_for_days,
signature_alg,
subject)
self._register_cert()
self._store_cert_and_key()
# register on backend
self._register_cert(cert)
# save in storage
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
key_pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
self._storage_driver.store_cert(self._identity, cert_pem, key_pem)
LOG.debug("Client certificate generated successfully")
def delete(self):
"""Delete existing certificate from storage and backend"""
if not self.exists():
return
@ -156,26 +176,58 @@ class ClientCertificateManager(object):
LOG.debug("Client certificate removed successfully")
def exists(self):
"""Check if certificate was created"""
"""Check if certificate was created for given identity"""
if self._cert:
return True
return self._cert is not None
cert_pem, key_pem = self._storage_driver.get_cert(self._identity)
return cert_pem is not None
def get_pem(self):
def import_pem(self, filename):
"""Import and register existing certificate in PEM format"""
# TODO(annak): support PK import as well
self._validate_empty()
with open(filename, 'r') as f:
cert_pem = f.read()
if not cert_pem:
raise nsxlib_exceptions.CertificateError(
msg=_("Failed to read certificate from %s") % filename)
# validate correct crypto
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
except crypto.Error:
raise nsxlib_exceptions.CertificateError(
msg=_("Failed to import client certificate"))
# register on backend
self._register_cert(cert)
self._storage_driver.store_cert(self._identity, cert_pem, None)
LOG.debug("Client certificate imported successfully")
def _load_from_storage(self):
"""Returns certificate and key pair in PEM format"""
self._validate_exists()
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, self._cert)
key_pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, self._key)
cert_pem, key_pem = self._storage_driver.get_cert(self._identity)
if cert_pem is None:
return None, None
return (cert_pem, key_pem)
def get_pem(self):
return self._load_from_storage()
def export_pem(self, filename):
"""Exports certificate and key pair to file"""
if not self.exists():
LOG.error(_LE("No certificate present - nothing to export"))
return
self._validate_exists()
cert_pem, key_pem = self._load_from_storage()
cert_pem, key_pem = self.get_pem()
with open(filename, 'w') as f:
f.write(cert_pem)
f.write(key_pem)
@ -184,8 +236,9 @@ class ClientCertificateManager(object):
"""Returns certificate expiration timestamp"""
self._validate_exists()
cert, key = self.get_cert_and_key()
converted = datetime.datetime.strptime(
self._cert.get_notAfter().decode(),
cert.get_notAfter().decode(),
"%Y%m%d%H%M%SZ")
return converted
@ -194,6 +247,28 @@ class ClientCertificateManager(object):
delta = self.expires_on() - datetime.datetime.utcnow()
return delta.days
def get_subject(self):
self._validate_exists()
cert, key = self.get_cert_and_key()
return {'country': cert.get_subject().C,
'state': cert.get_subject().ST,
'organization': cert.get_subject().O,
'unit': cert.get_subject().OU,
'hostname': cert.get_subject().CN}
def get_signature_alg(self):
self._validate_exists()
cert, key = self.get_cert_and_key()
return cert.get_signature_algorithm()
def get_key_size(self):
self._validate_exists()
cert, key = self.get_cert_and_key()
return key.bits()
def _validate_empty(self):
if self.exists():
raise nsxlib_exceptions.ObjectAlreadyExists(
@ -204,23 +279,27 @@ class ClientCertificateManager(object):
raise nsxlib_exceptions.ObjectNotGenerated(
object_type='Client Certificate')
def _load_cert_and_key(self):
self._validate_empty()
def get_cert_and_key(self):
"""Load cert and key from storage"""
if self._cert and self._key:
return self._cert, self._key
cert_pem, key_pem = self._storage_driver.get_cert(self._identity)
cert_pem, key_pem = self._load_from_storage()
if cert_pem is not None:
self._cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
self._key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
if cert_pem is None:
return None, None
def _store_cert_and_key(self):
self._validate_exists()
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
except crypto.Error:
raise nsxlib_exceptions.CertificateError(
msg="Failed to load client certificate")
cert_pem, key_pem = self.get_pem()
self._storage_driver.store_cert(self._identity, cert_pem, key_pem)
return cert, key
def _register_cert(self):
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, self._cert)
def _register_cert(self, cert):
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
nsx_cert_id = self._nsx_trust_management.create_cert(cert_pem)
try:
self._nsx_trust_management.create_identity(self._identity,

View File

@ -52,11 +52,15 @@ class NsxLibException(Exception):
class ObjectAlreadyExists(NsxLibException):
message = _("%(object_type) already exists")
message = _("%(object_type)s already exists")
class ObjectNotGenerated(NsxLibException):
message = _("%(object_type) was not generated")
message = _("%(object_type)s was not generated")
class CertificateError(NsxLibException):
message = _("Certificate error: %(msg)s")
class ManagerError(NsxLibException):

View File

@ -30,8 +30,8 @@ class NsxLibTrustManagement(utils.NsxLibApiBase):
"""
lines = pem.split(b'\n')
result = lines[0] + b'\n'
result += b''.join(lines[1:-1])
result += b'\n' + lines[-1]
result += b''.join(lines[1:-2])
result += b'\n' + lines[-2]
return result
def create_cert(self, cert_pem):
@ -43,6 +43,10 @@ class NsxLibTrustManagement(utils.NsxLibApiBase):
# should be only one result
return results[0]['id']
def get_cert(self, cert_id):
resource = CERT_SECTION + '/' + cert_id
return self.client.get(resource)
def delete_cert(self, cert_id):
resource = CERT_SECTION + '/' + cert_id
self.client.delete(resource)