NSXAdmin: add import and nsx-list commands for client cert

In addition, add details in "show" command output.

Import command read certificate from filename provided and
registers it on backend. Only supported with storage type=none.
Syntax:
nsxadmin -r certificate -o import -p filename=<file>

nsx-list command lists certificates associated with openstack
identity on the backend, regardless of what certs are present in
storage. Currently up to one cert is supported per identity, this
will change in future. Syntax:
nsxadmin -r certificate -o nsx-list

Change-Id: I245d239942b0a85b67d263c3a6d51d228832f54b
This commit is contained in:
Anna Khmelnitsky 2017-01-19 09:39:48 -08:00 committed by garyk
parent 8fae2ed90a
commit ef5a7d611a
3 changed files with 132 additions and 31 deletions

View File

@ -309,6 +309,15 @@ Client Certificate
nsxadmin -r certificate -o show
- Import external certificate to NSX::
nsxadmin -r certificate -o import --property username=<username> --property password=<password>
- List certificates associated with openstack principal identity in NSX::
nsxadmin -r certificate -o nsx-list
Upgrade Steps (Version 1.0.0 to Version 1.1.0)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -22,6 +22,7 @@ from vmware_nsx.shell.admin.plugins.common import utils as admin_utils
from vmware_nsx.shell.admin.plugins.nsxv3.resources import utils
from vmware_nsx.shell import resources as shell
from vmware_nsxlib.v3 import client_cert
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
from vmware_nsxlib.v3 import trust_management
from neutron.callbacks import registry
@ -34,13 +35,19 @@ LOG = logging.getLogger(__name__)
DEFAULT_CERT_VALIDITY_PERIOD = 3650
def get_certificate_manager(**kwargs):
def get_nsx_trust_management(**kwargs):
username, password = None, None
if kwargs.get('property'):
properties = admin_utils.parse_multi_keyval_opt(kwargs['property'])
username = properties.get('user')
password = properties.get('password')
nsx_client = utils.get_nsxv3_client(username, password, True)
nsx_trust = trust_management.NsxLibTrustManagement(nsx_client, {})
return nsx_trust
def get_certificate_manager(**kwargs):
storage_driver_type = cfg.CONF.nsx_v3.nsx_client_cert_storage.lower()
LOG.info(_LI("Certificate storage is %s"), storage_driver_type)
if storage_driver_type == 'nsx-db':
@ -50,12 +57,9 @@ def get_certificate_manager(**kwargs):
storage_driver = cert_utils.DummyCertificateStorageDriver()
# TODO(annak) - add support for barbican storage driver
nsx_client = utils.get_nsxv3_client(username, password, True)
nsx_trust = trust_management.NsxLibTrustManagement(nsx_client, {})
return client_cert.ClientCertificateManager(
cert_utils.NSX_OPENSTACK_IDENTITY,
nsx_trust,
get_nsx_trust_management(**kwargs),
storage_driver)
@ -64,47 +68,126 @@ def generate_cert(resource, event, trigger, **kwargs):
"""Generate self signed client certificate and private key
"""
cert_manager = get_certificate_manager(**kwargs)
if cert_manager.exists():
# Need to delete cert first
cert_manager.delete()
if cfg.CONF.nsx_v3.nsx_client_cert_storage.lower() == "none":
LOG.info(_LI("Generate operation is not supported "
"with storage type 'none'"))
return
cert_manager.generate(subject={},
valid_for_days=DEFAULT_CERT_VALIDITY_PERIOD)
with get_certificate_manager(**kwargs) as cert:
if cert.exists():
LOG.info(_LI("Deleting existing certificate"))
# Need to delete cert first
cert.delete()
cert.generate(subject={},
valid_for_days=DEFAULT_CERT_VALIDITY_PERIOD)
LOG.info(_LI("Client certificate generated succesfully"))
@admin_utils.output_header
def delete_cert(resource, event, trigger, **kwargs):
"""Delete client certificate and private key """
cert_manager = get_certificate_manager(**kwargs)
if cert_manager.exists():
cert_manager.delete()
if cfg.CONF.nsx_v3.nsx_client_cert_storage.lower() == "none":
LOG.info(_LI("Clean operation is not supported "
"with storage type 'none'"))
return
with get_certificate_manager(**kwargs) as cert:
if cert.exists():
cert.delete()
LOG.info(_LI("Client certificate deleted succesfully"))
return
LOG.info(_LI("Nothing to clean"))
@admin_utils.output_header
def show_cert(resource, event, trigger, **kwargs):
"""Show client certificate details """
cert_manager = get_certificate_manager(**kwargs)
if cert_manager.exists():
cert_pem, key_pem = cert_manager.get_pem()
expires_on = cert_manager.expires_on()
expires_in_days = cert_manager.expires_in_days()
if expires_in_days > 0:
LOG.info(_LI("Client certificate is valid. "
"Expires on %(date)s (in %(days)d days)"),
{'date': expires_on, 'days': expires_in_days})
with get_certificate_manager(**kwargs) as cert:
if cert.exists():
cert_pem, key_pem = cert.get_pem()
expires_on = cert.expires_on()
expires_in_days = cert.expires_in_days()
cert_data = cert.get_subject()
cert_data['alg'] = cert.get_signature_alg()
cert_data['key_size'] = cert.get_key_size()
if expires_in_days > 0:
LOG.info(_LI("Client certificate is valid. "
"Expires on %(date)s (in %(days)d days)."),
{'date': expires_on,
'days': expires_in_days})
else:
LOG.info(_LI("Client certificate expired on %s."), expires_on)
LOG.info(_LI("Key Size %(key_size)s, "
"Signature Algorithm %(alg)s\n"
"Subject: Country %(country)s, State %(state)s, "
"Organization %(organization)s, Unit %(unit)s, "
"Common Name %(hostname)s"), cert_data)
LOG.info(cert_pem)
else:
LOG.info(_LI("Client certificate expired on %s."), expires_on)
LOG.info(_LI("Client certificate is not registered "
"in storage"))
LOG.info(cert_pem)
# TODO(annak): show certificate details such as subject and crypto
# and add verification same certificate is registered in NSX.
# For imported certificate, fetch from NSX
else:
LOG.info(_LI("Client certificate was not registered in the system"))
@admin_utils.output_header
def import_cert(resource, event, trigger, **kwargs):
"""Import client certificate that was generated externally"""
if cfg.CONF.nsx_v3.nsx_client_cert_storage.lower() != "none":
LOG.info(_LI("Import operation is supported "
"with storage type 'none' only"))
return
filename = None
if kwargs.get('property'):
properties = admin_utils.parse_multi_keyval_opt(kwargs['property'])
filename = properties.get('filename')
if not filename:
LOG.info(_LI("Please specify file containing the certificate "
"using filename property"))
return
with get_certificate_manager(**kwargs) as cert:
if cert.exists():
LOG.info(_LI("Deleting existing certificate"))
cert.delete()
cert.import_pem(filename)
LOG.info(_LI("Client certificate imported succesfully"))
@admin_utils.output_header
def show_nsx_certs(resource, event, trigger, **kwargs):
"""Show client certificates associated with openstack identity in NSX"""
# TODO(annak): show multiple certs when backend supports it
try:
nsx_trust = get_nsx_trust_management(**kwargs)
details = nsx_trust.get_identity_details(
cert_utils.NSX_OPENSTACK_IDENTITY)
if 'certificate_id' in details:
cert = nsx_trust.get_cert(details['certificate_id'])
LOG.info(_LI("The following certificate is associated with "
"principal identity %s\n"),
cert_utils.NSX_OPENSTACK_IDENTITY)
LOG.info(cert['pem_encoded'])
except nsxlib_exc.ResourceNotFound:
LOG.info(_LI("No certificates associated with principal identity %s"),
cert_utils.NSX_OPENSTACK_IDENTITY)
registry.subscribe(generate_cert,
@ -118,3 +201,11 @@ registry.subscribe(show_cert,
registry.subscribe(delete_cert,
constants.CERTIFICATE,
shell.Operations.CLEAN.value)
registry.subscribe(import_cert,
constants.CERTIFICATE,
shell.Operations.IMPORT.value)
registry.subscribe(show_nsx_certs,
constants.CERTIFICATE,
shell.Operations.NSX_LIST.value)

View File

@ -98,7 +98,8 @@ nsxv3_resources = {
[Operations.GENERATE.value,
Operations.SHOW.value,
Operations.CLEAN.value,
Operations.IMPORT.value])
Operations.IMPORT.value,
Operations.NSX_LIST.value])
}
# Add supported NSX-V resources in this dictionary