Client certificate management for NSXV3 authentication

Client certificate authentication will replace basic authentication.
A single client certificate will be generated by admin for the
configuration agent (openstack, container,..).

This commit focuses on certificate generation and coordination of
certificate management on backend, storage and in the agent itself.

Change-Id: Ib00e2c00aecb53cec63a746e9db6829a5594eb3a
This commit is contained in:
Anna Khmelnitsky 2016-12-20 14:47:41 -08:00
parent d7473ecf2f
commit e8ef5db4e9
7 changed files with 573 additions and 7 deletions

View File

@ -15,3 +15,4 @@ oslo.log>=3.11.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0 oslo.serialization>=1.10.0 # Apache-2.0
oslo.service>=1.10.0 # Apache-2.0 oslo.service>=1.10.0 # Apache-2.0
oslo.utils>=3.18.0 # Apache-2.0 oslo.utils>=3.18.0 # Apache-2.0
pyOpenSSL>=0.14 # Apache-2.0

View File

@ -215,6 +215,10 @@ class NsxClientTestCase(NsxLibTestCase):
mock_call = getattr(self._record, verb.lower()) mock_call = getattr(self._record, verb.lower())
mock_call.assert_called_once_with(**kwargs) mock_call.assert_called_once_with(**kwargs)
def assert_any_call(self, verb, **kwargs):
mock_call = getattr(self._record, verb.lower())
mock_call.assert_any_call(**kwargs)
@property @property
def recorded_calls(self): def recorded_calls(self):
return self._record return self._record

View File

@ -0,0 +1,239 @@
# Copyright 2015 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron_lib import exceptions
from OpenSSL import crypto
from oslo_serialization import jsonutils
from vmware_nsxlib.tests.unit.v3 import mocks
from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase
from vmware_nsxlib.tests.unit.v3 import test_client
from vmware_nsxlib.v3 import client
from vmware_nsxlib.v3 import client_cert
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
from vmware_nsxlib.v3 import trust_management
class DummyStorageDriver(dict):
"""Storage driver simulation - just a dictionary"""
def store_cert(self, project_id, certificate, private_key):
self[project_id] = {}
self[project_id]['cert'] = certificate
self[project_id]['key'] = private_key
def get_cert(self, project_id):
if project_id not in self:
return (None, None)
return (self[project_id]['cert'], self[project_id]['key'])
def delete_cert(self, project_id):
del(self[project_id])
def is_empty(self, project_id):
return project_id not in self
class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
identity = 'drumknott'
cert_id = "00000000-1111-2222-3333-444444444444"
identity_id = "55555555-6666-7777-8888-999999999999"
def _get_mocked_response(self, status_code, results):
return mocks.MockRequestsResponse(
status_code,
jsonutils.dumps({'results': results}))
def _get_mocked_error_response(self, status_code, error_code):
return mocks.MockRequestsResponse(
status_code,
jsonutils.dumps({'httpStatus': 'go away',
'error_code': error_code,
'module_name': 'never mind',
'error message': 'bad luck'}))
def _get_mocked_trust(self, action):
fake_responses = []
if action == 'create':
# import cert and return its id
results = [{'id': self.cert_id}]
fake_responses.append(self._get_mocked_response(201, results))
# and then bind this id to principal identity
fake_responses.append(self._get_mocked_response(201, []))
elif action == 'retry-create':
# simulate "identity already exists" failure
results = [{'id': self.cert_id}]
fake_responses.append(self._get_mocked_response(201, results))
fake_responses.append(self._get_mocked_error_response(400, 2027))
# after error generate code will retry identity deletion:
# first get indentities
results = [{'resource_type': 'Principal Identity',
'id': self.identity_id,
'name': self.identity,
'certificate_id': self.cert_id}]
# then delete identity
fake_responses.append(self._get_mocked_response(200, results))
# then retry identity create
fake_responses.append(self._get_mocked_response(204, []))
elif action == 'delete':
# get principal identities list
results = [{'resource_type': 'Principal Identity',
'id': 'dont care',
'name': 'willikins',
'certificate_id': 'some other id'},
{'resource_type': 'Principal Identity',
'id': self.identity_id,
'name': self.identity,
'certificate_id': self.cert_id}]
fake_responses.append(self._get_mocked_response(200, results))
# delete certificate
fake_responses.append(self._get_mocked_response(204, []))
# delete identity
fake_responses.append(self._get_mocked_response(204, []))
mock_client = self.new_mocked_client(
client.JSONRESTClient,
url_prefix='api/v1', session_response=fake_responses)
return trust_management.NsxLibTrustManagement(mock_client, {})
def test_generate_cert(self):
"""Test startup without certificate + certificate generation"""
storage_driver = DummyStorageDriver()
# Prepare fake trust management for "cert create" requests
mocked_trust = self._get_mocked_trust('create')
cert = client_cert.ClientCertificateManager(self.identity,
mocked_trust,
storage_driver)
self.assertFalse(cert.exists())
cert.generate(subject={}, key_size=2048, valid_for_days=333)
# verify client cert was generated and makes sense
self.assertTrue(cert.exists())
self.assertEqual(332, cert.expires_in_days())
cert_pem, key_pem = cert.get_pem()
# verify cert ans PK were stored in storage
stored_cert, stored_key = storage_driver.get_cert(self.identity)
self.assertEqual(cert_pem, stored_cert)
self.assertEqual(key_pem, stored_key)
# verify API call to import cert on backend
cert_pem = mocked_trust.remove_newlines_from_pem(cert_pem)
base_uri = 'https://1.2.3.4/api/v1/trust-management'
uri = base_uri + '/certificates?action=import'
expected_body = {'pem_encoded': cert_pem}
test_client.assert_json_call('post', mocked_trust.client, uri,
single_call=False,
data=jsonutils.dumps(expected_body))
# verify API call to bind cert to identity on backend
uri = base_uri + '/principal-identities'
expected_body = {'name': self.identity,
'certificate_id': self.cert_id}
test_client.assert_json_call('post', mocked_trust.client, uri,
single_call=False,
data=jsonutils.dumps(expected_body,
sort_keys=True))
# try to generate cert again and fail
self.assertRaises(nsxlib_exc.ObjectAlreadyExists,
cert.generate, {})
def test_generate_cert_with_retry(self):
"""Test startup without certificate + certificate generation"""
storage_driver = DummyStorageDriver()
# Prepare fake trust management for "cert create" requests
mocked_trust = self._get_mocked_trust('retry-create')
cert = client_cert.ClientCertificateManager(self.identity,
mocked_trust,
storage_driver)
self.assertFalse(cert.exists())
cert.generate(subject={}, key_size=4096, valid_for_days=3)
# verify client cert was generated and makes sense
self.assertTrue(cert.exists())
# verify cert ans PK were stored in storage
cert_pem, key_pem = cert.get_pem()
stored_cert, stored_key = storage_driver.get_cert(self.identity)
self.assertEqual(cert_pem, stored_cert)
self.assertEqual(key_pem, stored_key)
def test_load_and_delete_existing_cert(self):
"""Test startup with existing certificate + certificate deletion"""
# prepare storage driver with existing cert and key
# this test simulates system startup
cert, key = client_cert.generate_self_signed_cert_pair(4096, 365,
'sha256', {})
storage_driver = DummyStorageDriver()
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
key_pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
storage_driver.store_cert(self.identity, cert_pem, key_pem)
# get mocked backend driver for trust management,
# prepared for get request, that preceeds delete operation
mocked_trust = self._get_mocked_trust('delete')
cert = client_cert.ClientCertificateManager(self.identity,
mocked_trust,
storage_driver)
self.assertTrue(cert.exists())
cert.delete()
self.assertFalse(cert.exists())
self.assertTrue(storage_driver.is_empty(self.identity))
# verify API call to query identities in order to get cert id
base_uri = 'https://1.2.3.4/api/v1/trust-management'
uri = base_uri + '/principal-identities'
test_client.assert_json_call('get', mocked_trust.client, uri,
single_call=False)
# verify API call to delete openstack principal identity
uri = uri + '/' + self.identity_id
test_client.assert_json_call('delete', mocked_trust.client, uri,
single_call=False)
# verify API call to delete certificate
uri = base_uri + '/certificates/' + self.cert_id
test_client.assert_json_call('delete', mocked_trust.client, uri,
single_call=False)
def test_bad_certificate_values(self):
bad_cert_values = [{'key_size': 1024,
'valid_for_days': 10,
'signature_alg': 'sha256',
'subject': {}},
{'key_size': 4096,
'valid_for_days': 100,
'signature_alg': 'sha',
'subject': {}}]
for args in bad_cert_values:
self.assertRaises(exceptions.InvalidInput,
client_cert.generate_self_signed_cert_pair,
**args)

View File

@ -43,24 +43,32 @@ def assert_call(verb, client_or_resource,
url, verify=nsxlib_testcase.NSX_CERT, url, verify=nsxlib_testcase.NSX_CERT,
data=None, headers=DFT_ACCEPT_HEADERS, data=None, headers=DFT_ACCEPT_HEADERS,
timeout=(nsxlib_testcase.NSX_HTTP_TIMEOUT, timeout=(nsxlib_testcase.NSX_HTTP_TIMEOUT,
nsxlib_testcase.NSX_HTTP_READ_TIMEOUT)): nsxlib_testcase.NSX_HTTP_READ_TIMEOUT),
single_call=True):
nsx_client = client_or_resource nsx_client = client_or_resource
if getattr(nsx_client, '_client', None) is not None: if getattr(nsx_client, '_client', None) is not None:
nsx_client = nsx_client._client nsx_client = nsx_client._client
cluster = nsx_client._conn cluster = nsx_client._conn
cluster.assert_called_once( if single_call:
verb, cluster.assert_called_once(
**{'url': url, 'verify': verify, 'body': data, verb,
'headers': headers, 'cert': None, 'timeout': timeout}) **{'url': url, 'verify': verify, 'body': data,
'headers': headers, 'cert': None, 'timeout': timeout})
else:
cluster.assert_any_call(
verb,
**{'url': url, 'verify': verify, 'body': data,
'headers': headers, 'cert': None, 'timeout': timeout})
def assert_json_call(verb, client_or_resource, url, def assert_json_call(verb, client_or_resource, url,
verify=nsxlib_testcase.NSX_CERT, verify=nsxlib_testcase.NSX_CERT,
data=None, data=None,
headers=client.JSONRESTClient._DEFAULT_HEADERS): headers=client.JSONRESTClient._DEFAULT_HEADERS,
single_call=True):
return assert_call(verb, client_or_resource, url, return assert_call(verb, client_or_resource, url,
verify=verify, data=data, verify=verify, data=data,
headers=headers) headers=headers, single_call=single_call)
class NsxV3RESTClientTestCase(nsxlib_testcase.NsxClientTestCase): class NsxV3RESTClientTestCase(nsxlib_testcase.NsxClientTestCase):

View File

@ -0,0 +1,240 @@
# Copyright 2016 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from OpenSSL import crypto
from time import time
from neutron_lib import exceptions
from oslo_log import log
from vmware_nsxlib._i18n import _, _LE
from vmware_nsxlib.v3 import exceptions as nsxlib_exceptions
LOG = log.getLogger(__name__)
NSX_ERROR_IDENTITY_EXISTS = 2027
def validate_cert_params(key_size, valid_for_days,
signature_alg, subject):
"""Validate parameters for certificate"""
expected_key_sizes = (2048, 4096)
if key_size not in expected_key_sizes:
raise exceptions.InvalidInput(
error_message=_('Invalid key size %(value)d'
'(must be one of %(list)s)') %
{'value': key_size,
'list': expected_key_sizes})
expected_signature_algs = ('sha224', 'sha256')
if signature_alg not in expected_signature_algs:
raise exceptions.InvalidInput(
error_message=_('Invalid signature algorithm %(value)s'
'(must be one of %(list)s)') %
{'value': signature_alg,
'list': expected_signature_algs})
def generate_self_signed_cert_pair(key_size, valid_for_days,
signature_alg, subject):
"""Generate self signed certificate and key pair"""
validate_cert_params(key_size, valid_for_days,
signature_alg, subject)
# generate key pair
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, key_size)
# generate certificate
cert = crypto.X509()
cert.get_subject().C = subject.get('country', 'US')
cert.get_subject().ST = subject.get('state', 'California')
cert.get_subject().O = subject.get('organization', 'MyOrg')
cert.get_subject().OU = subject.get('unit', 'MyUnit')
cert.get_subject().CN = subject.get('hostname', 'myorg.com')
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(valid_for_days * 24 * 60 * 60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.set_serial_number(int(time()))
cert.sign(key, signature_alg)
return cert, key
class ClientCertificateManager(object):
"""Manage Client Certificate for backend authentication
There should be single client certificate associated
with certain principal identity. Certificate and PK storage
is pluggable. Storage API (similar to neutron-lbaas barbican API):
store_cert(purpose, certificate, private_key)
get_cert(purpose)
delete_cert(purpose)
"""
def __init__(self, identity, nsx_trust_management, storage_driver):
self._cert = None
self._key = None
self._storage_driver = storage_driver
self._identity = identity
self._nsx_trust_management = nsx_trust_management
self._load_cert_and_key()
def generate(self, subject, key_size=2048, valid_for_days=365,
signature_alg='sha256'):
"""Generate new certificate and register it in the system
Generate certificate with RSA key based on arguments provided,
register and associate it to principal identity on backend,
and store it in storage. If certificate already exists, fail.
"""
self._validate_empty()
self._cert, self._key = generate_self_signed_cert_pair(key_size,
valid_for_days,
signature_alg,
subject)
self._register_cert()
self._store_cert_and_key()
LOG.debug("Client certificate generated successfully")
def delete(self):
"""Delete existing certificate from storage and backend"""
if not self.exists():
return
ok = True
try:
# delete certificate and principal identity from backend
details = self._nsx_trust_management.get_identity_details(
self._identity)
# TODO(annak): do not delete the identity once
# NSX supports multiple certificates per identity
# this will be required to support multiple openstack
# installations using same backend NSX
self._nsx_trust_management.delete_identity(details['id'])
if details['certificate_id']:
self._nsx_trust_management.delete_cert(
details['certificate_id'])
except exceptions.ManagerError as e:
LOG.error(_LE("Failed to clear certificate on backend: %s"), e)
ok = False
try:
self._storage_driver.delete_cert(self._identity)
except Exception as e:
LOG.error(_LE("Failed to clear certificate on storage: %s"), e)
ok = False
self._cert = None
self._key = None
if ok:
LOG.debug("Client certificate removed successfully")
def exists(self):
"""Check if certificate was created"""
return self._cert is not None
def get_pem(self):
"""Returns certificate and key pair in PEM format"""
self._validate_exists()
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, self._cert)
key_pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, self._key)
return (cert_pem, key_pem)
def export_pem(self, filename):
"""Exports certificate and key pair to file"""
if not self.exists():
LOG.error(_LE("No certificate present - nothing to export"))
return
cert_pem, key_pem = self.get_pem()
with open(filename, 'w') as f:
f.write(cert_pem)
f.write(key_pem)
def expires_on(self):
"""Returns certificate expiration timestamp"""
self._validate_exists()
converted = datetime.datetime.strptime(
self._cert.get_notAfter().decode(),
"%Y%m%d%H%M%SZ")
return converted
def expires_in_days(self):
"""Returns in how many days the certificate expires"""
delta = self.expires_on() - datetime.datetime.utcnow()
return delta.days
def _validate_empty(self):
if self.exists():
raise nsxlib_exceptions.ObjectAlreadyExists(
object_type='Client Certificate')
def _validate_exists(self):
if not self.exists():
raise nsxlib_exceptions.ObjectNotGenerated(
object_type='Client Certificate')
def _load_cert_and_key(self):
self._validate_empty()
cert_pem, key_pem = self._storage_driver.get_cert(self._identity)
if cert_pem is not None:
self._cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
self._key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
def _store_cert_and_key(self):
self._validate_exists()
cert_pem, key_pem = self.get_pem()
self._storage_driver.store_cert(self._identity, cert_pem, key_pem)
def _register_cert(self):
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, self._cert)
nsx_cert_id = self._nsx_trust_management.create_cert(cert_pem)
try:
self._nsx_trust_management.create_identity(self._identity,
nsx_cert_id)
except nsxlib_exceptions.ManagerError as e:
if e.error_code != NSX_ERROR_IDENTITY_EXISTS:
raise e
# principal identity already exists - this can happen
# due to temporary error on deletion. Worth retrying.
# TODO(annak): remove this code once
# NSX supports multiple certificates per identity
details = self._nsx_trust_management.get_identity_details(
self._identity)
self._nsx_trust_management.delete_identity(details['id'])
self._nsx_trust_management.create_identity(self._identity,
nsx_cert_id)

View File

@ -51,6 +51,14 @@ class NsxLibException(Exception):
return False return False
class ObjectAlreadyExists(NsxLibException):
message = _("%(object_type) already exists")
class ObjectNotGenerated(NsxLibException):
message = _("%(object_type) was not generated")
class ManagerError(NsxLibException): class ManagerError(NsxLibException):
message = _("Unexpected error from backend manager (%(manager)s) " message = _("Unexpected error from backend manager (%(manager)s) "
"for %(operation)s %(details)s") "for %(operation)s %(details)s")

View File

@ -0,0 +1,66 @@
# Copyright 2016 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
from vmware_nsxlib.v3 import utils
BASE_SECTION = 'trust-management'
CERT_SECTION = BASE_SECTION + '/certificates'
ID_SECTION = BASE_SECTION + '/principal-identities'
class NsxLibTrustManagement(utils.NsxLibApiBase):
def remove_newlines_from_pem(self, pem):
"""NSX expects pem without newlines in certificate body
BEGIN and END sections should be separated with newlines
"""
lines = pem.split(b'\n')
result = lines[0] + b'\n'
result += b''.join(lines[1:-1])
result += b'\n' + lines[-1]
return result
def create_cert(self, cert_pem):
resource = CERT_SECTION + '?action=import'
body = {'pem_encoded': self.remove_newlines_from_pem(cert_pem)}
results = self.client.create(resource, body)['results']
if len(results) > 0:
# should be only one result
return results[0]['id']
def delete_cert(self, cert_id):
resource = CERT_SECTION + '/' + cert_id
self.client.delete(resource)
def create_identity(self, identity, cert_id):
body = {'name': identity, 'certificate_id': cert_id}
self.client.create(ID_SECTION, body)
def delete_identity(self, identity):
resource = ID_SECTION + '/' + identity
self.client.delete(resource)
def get_identity_details(self, identity):
results = self.client.get(ID_SECTION)['results']
for result in results:
if result['name'] == identity:
return result
raise nsxlib_exc.ResourceNotFound(
manager=self._client.nsx_api_managers,
operation="Principal identity %s not found" % identity)