Browse Source

NSXV3: Client certificate private key encryption

When certificate storage is nsx-db and nsx_client_cert_pk_password
is provided in configuration, private key will be stored encrypted.

Change-Id: Id0e6f3b614da9eb2381c80d1a76043e38d2d11ee
changes/92/426992/5
Anna Khmelnitsky 5 years ago
committed by garyk
parent
commit
8bb4df14f1
  1. 1
      devstack/lib/vmware_nsx_v3
  2. 2
      devstack/tools/nsxv3_cleanup.py
  3. 3
      vmware_nsx/common/config.py
  4. 61
      vmware_nsx/plugins/nsx_v3/cert_utils.py
  5. 54
      vmware_nsx/tests/unit/nsx_v3/test_plugin.py

1
devstack/lib/vmware_nsx_v3

@ -186,6 +186,7 @@ function neutron_plugin_configure_service {
_nsxv3_ini_set nsx_use_client_auth "True"
_nsxv3_ini_set nsx_client_cert_file "$CLIENT_CERT_FILE"
_nsxv3_ini_set nsx_client_cert_storage "nsx-db"
_nsxv3_ini_set nsx_client_cert_pk_password "openstack"
fi
}

2
devstack/tools/nsxv3_cleanup.py

@ -130,7 +130,7 @@ class NSXClient(object):
headers['Accept'] = accept_type
# allow admin user to delete entities created
# under openstack principal identity
headers['X-Allow-Overwrite'] = "True"
headers['X-Allow-Overwrite'] = 'true'
self.headers = headers
def get(self, endpoint=None, params=None):

3
vmware_nsx/common/config.py

@ -279,6 +279,9 @@ nsx_v3_opts = [
cfg.StrOpt('nsx_client_cert_file',
default='',
help=_("File to contain client certificate and private key")),
cfg.StrOpt('nsx_client_cert_pk_password',
default="",
help=_("password for private key encryption")),
cfg.StrOpt('nsx_client_cert_storage',
default='nsx-db',
choices=['nsx-db', 'none'],

61
vmware_nsx/plugins/nsx_v3/cert_utils.py

@ -13,24 +13,77 @@
# License for the specific language governing permissions and limitations
# under the License.
from vmware_nsx.db import db as nsx_db
import base64
from cryptography import fernet
import hashlib
import logging
from oslo_config import cfg
from vmware_nsx._i18n import _LE
from vmware_nsx.db import db as nsx_db
LOG = logging.getLogger(__name__)
NSX_OPENSTACK_IDENTITY = "com.vmware.nsx.openstack"
# 32-byte base64-encoded secret for symmetric password encryption
# generated on init based on password provided in configuration
_SECRET = None
def generate_secret_from_password(password):
m = hashlib.md5()
m.update(password.encode('ascii'))
return base64.b64encode(m.hexdigest().encode('ascii'))
def symmetric_encrypt(secret, plaintext):
if not isinstance(plaintext, bytes):
plaintext = plaintext.encode('ascii')
return fernet.Fernet(secret).encrypt(plaintext).decode('ascii')
def symmetric_decrypt(secret, ciphertext):
if not isinstance(ciphertext, bytes):
ciphertext = ciphertext.encode('ascii')
return fernet.Fernet(secret).decrypt(ciphertext).decode('ascii')
class DbCertificateStorageDriver(object):
"""Storage for certificate and private key in neutron DB"""
# TODO(annak): Add private key encryption
def __init__(self, context):
global _SECRET
self._context = context
if cfg.CONF.nsx_v3.nsx_client_cert_pk_password and not _SECRET:
_SECRET = generate_secret_from_password(
cfg.CONF.nsx_v3.nsx_client_cert_pk_password)
def store_cert(self, purpose, certificate, private_key):
# ecrypt private key
if _SECRET:
private_key = symmetric_encrypt(_SECRET, private_key)
nsx_db.save_certificate(self._context.session, purpose,
certificate, private_key)
def get_cert(self, purpose):
return nsx_db.get_certificate(self._context.session, purpose)
cert, private_key = nsx_db.get_certificate(self._context.session,
purpose)
if _SECRET and private_key:
try:
# Encrypted PK is stored in DB as string, while fernet expects
# bytearray.
private_key = symmetric_decrypt(_SECRET, private_key)
except fernet.InvalidToken:
# unable to decrypt - probably due to change of password
# cert and PK are useless, need to delete them
LOG.error(_LE("Unable to decrypt private key, possibly due "
"to change of password. Certificate needs to be "
"regenerated"))
self.delete_cert(purpose)
return None, None
return cert, private_key
def delete_cert(self, purpose):
return nsx_db.delete_certificate(self._context.session, purpose)
@ -47,7 +100,7 @@ class DummyCertificateStorageDriver(object):
pass
def get_cert(self, purpose):
pass
return None, None
def delete_cert(self, purpose):
pass

54
vmware_nsx/tests/unit/nsx_v3/test_plugin.py

@ -48,6 +48,7 @@ from oslo_utils import uuidutils
from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.common import utils
from vmware_nsx.plugins.nsx_v3 import cert_utils
from vmware_nsx.plugins.nsx_v3 import plugin as nsx_plugin
from vmware_nsx.tests import unit as vmware
from vmware_nsx.tests.unit.extensions import test_metadata
@ -772,7 +773,7 @@ class NsxV3PluginClientCertTestCase(testlib_api.WebTestCase):
CERTFILE = '/tmp/client_cert.pem'
def _init_config(self):
def _init_config(self, password=None):
cfg.CONF.set_override('default_overlay_tz', NSX_TZ_NAME, 'nsx_v3')
cfg.CONF.set_override('native_dhcp_metadata', False, 'nsx_v3')
cfg.CONF.set_override('dhcp_profile',
@ -784,22 +785,30 @@ class NsxV3PluginClientCertTestCase(testlib_api.WebTestCase):
cfg.CONF.set_override('nsx_client_cert_file', self.CERTFILE, 'nsx_v3')
cfg.CONF.set_override('nsx_client_cert_storage', 'nsx-db', 'nsx_v3')
def _init_plugin(self):
if password:
cfg.CONF.set_override('nsx_client_cert_pk_password',
password, 'nsx_v3')
def _init_plugin(self, password=None):
_mock_nsx_backend_calls()
self._tenant_id = test_plugin.TEST_TENANT_ID
self._init_config()
self._init_config(password)
self.setup_coreplugin(PLUGIN_NAME, load_plugins=True)
def test_init_without_cert(self):
"""Verify init fails if no cert is provided in client cert mode"""
# certificate not generated - exception should be raised
self.assertRaises(nsx_exc.ClientCertificateException,
self._init_plugin)
def test_init_with_cert(self):
"""Verify successful certificate load from storage"""
mock.patch(
"vmware_nsx.db.db.get_certificate",
return_value=(self.CERT, self.PKEY)).start()
_mock_nsx_backend_calls()
self._init_plugin()
# verify cert data was exported to CERTFILE
@ -812,5 +821,42 @@ class NsxV3PluginClientCertTestCase(testlib_api.WebTestCase):
# delete CERTFILE
os.remove(self.CERTFILE)
def test_init_with_cert_encrypted(self):
"""Verify successful encrypted PK load from storage"""
password = 'topsecret'
secret = cert_utils.generate_secret_from_password(password)
encrypted_pkey = cert_utils.symmetric_encrypt(secret, self.PKEY)
# db always returns string
mock.patch(
"vmware_nsx.db.db.get_certificate",
return_value=(self.CERT, encrypted_pkey)).start()
self._init_plugin(password)
# verify cert data was exported to CERTFILE
expected = self.CERT + self.PKEY
with open(self.CERTFILE, 'r') as f:
actual = f.read()
self.assertEqual(expected, actual)
# delete CERTFILE
os.remove(self.CERTFILE)
def test_init_with_cert_decrypt_fails(self):
"""Verify loading plaintext PK from storage fails in encrypt mode"""
mock.patch(
"vmware_nsx.db.db.get_certificate",
return_value=(self.CERT, self.PKEY)).start()
self._tenant_id = test_plugin.TEST_TENANT_ID
self._init_config('topsecret')
# since PK in DB is not encrypted, we should fail to decrypt it on load
self.assertRaises(nsx_exc.ClientCertificateException,
self._init_plugin)
# TODO(annak): add test that verifies bad crypto data raises exception
# when OPENSSL exception wrapper is available from NSXLIB
Loading…
Cancel
Save