NSXV3: Initial client certificate auth support

Client certificate authentication is disabled by default.
To enable client auth, define the following in nsx.ini:
nsx_use_client_auth = True
nsx_client_cert_storage = nsx-db
nsx_client_cert_file = <file to store certificate and private key>

To enable client auth in devstack, define the following in local.conf:
NSX_USE_CLIENT_CERT_AUTH=True

This commit covers only DB type of cert storage. Barbican storage
and imported cert will be added later. Also planned for near future:

    reload cert from DB if NSX connection failes due to bad cert
    show warning when cert nears expiration
    delete cert file from file system on neutron exit

Change-Id: Ic70a949b740d9149d71187b02640d3071a3e0159
This commit is contained in:
Anna Khmelnitsky 2017-01-12 12:47:18 -08:00 committed by garyk
parent 846ec955ab
commit 6e1a21881e
17 changed files with 468 additions and 8 deletions

View File

@ -40,6 +40,9 @@ METADATA_PROXY_SHARED_SECRET=${METADATA_PROXY_SHARED_SECRET:-}
NSX_XTRACE=$(set +o | grep xtrace)
set +o xtrace
# File to store client certificate and PK
CLIENT_CERT_FILE=${DEST}/data/neutron/client.pem
source $TOP_DIR/lib/neutron_plugins/ovs_base
@ -179,6 +182,11 @@ function neutron_plugin_configure_service {
_nsxv3_ini_set metadata_proxy $METADATA_PROXY_UUID
iniset $NEUTRON_CONF DEFAULT dhcp_agent_notification False
fi
if [[ "$NSX_USE_CLIENT_CERT_AUTH" == "True" ]]; then
_nsxv3_ini_set nsx_use_client_auth "True"
_nsxv3_ini_set nsx_client_cert_file "$CLIENT_CERT_FILE"
_nsxv3_ini_set nsx_client_cert_storage "nsx-db"
fi
}
function neutron_plugin_setup_interface_driver {
@ -203,6 +211,10 @@ function init_vmware_nsx_v3 {
die $LINENO "Native support does not require DHCP and Metadata agents!"
fi
fi
# Generate client certificate
if [[ "$NSX_USE_CLIENT_CERT_AUTH" == "True" ]]; then
nsxadmin -o generate -r certificate
fi
if ! is_set NSX_GATEWAY_NETWORK_INTERFACE; then
echo "NSX_GATEWAY_NETWORK_INTERFACE not set not configuring routes"
return
@ -257,6 +269,9 @@ function stop_vmware_nsx_v3 {
for address in $addresses; do
sudo ip addr add dev $NSX_GATEWAY_NETWORK_INTERFACE $address
done
# Clean client certificate if exists
nsxadmin -o clean -r certificate
}
# Restore xtrace

View File

@ -128,6 +128,9 @@ class NSXClient(object):
headers['Authorization'] = "Basic %s" % auth
headers['Content-Type'] = content_type
headers['Accept'] = accept_type
# allow admin user to delete entities created
# under openstack principal identity
headers['X-Allow-Overwrite'] = "True"
self.headers = headers
def get(self, endpoint=None, params=None):

View File

@ -294,6 +294,21 @@ Orphaned DHCP Servers
nsxadmin -r orphaned-dhcp-servers -o nsx-clean
Client Certificate
~~~~~~~~~~~~~~~~~~
- Generate new client certificate (this command will delete previous certificate if exists)::
nsxadmin -r certificate -o generate --property username=<username> --property password=<password>
- Delete client certificate::
nsxadmin -r certificate -o clean
- Show client certificate details::
nsxadmin -r certificate -o show
Upgrade Steps (Version 1.0.0 to Version 1.1.0)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -272,6 +272,17 @@ nsx_v3_opts = [
"[<scheme>://]<ip_adress>[:<port>]\nIf scheme is not "
"provided https is used. If port is not provided port "
"80 is used for http and port 443 for https.")),
cfg.BoolOpt('nsx_use_client_auth',
default=False,
help=_("Use client certificate in NSX manager "
"authentication")),
cfg.StrOpt('nsx_client_cert_file',
default='',
help=_("File to contain client certificate and private key")),
cfg.StrOpt('nsx_client_cert_storage',
default='nsx-db',
choices=['nsx-db', 'none'],
help=_("Storage type for client certificate sensitive data")),
cfg.StrOpt('default_overlay_tz',
deprecated_name='default_overlay_tz_uuid',
help=_("This is the name or UUID of the default NSX overlay "

View File

@ -23,6 +23,10 @@ class NsxPluginException(n_exc.NeutronException):
message = _("An unexpected error occurred in the NSX Plugin: %(err_msg)s")
class ClientCertificateException(NsxPluginException):
message = _("Client certificate error: %(err_msg)s")
class InvalidVersion(NsxPluginException):
message = _("Unable to fulfill request with version %(version)s.")

View File

@ -448,3 +448,27 @@ def del_nsx_ipam_subnet_pool(session, subnet_id, nsx_pool_id):
return (session.query(nsx_models.NsxSubnetIpam).
filter_by(subnet_id=subnet_id,
nsx_pool_id=nsx_pool_id).delete())
def get_certificate(session, purpose):
try:
cert_entry = session.query(
nsx_models.NsxCertificateRepository).filter_by(
purpose=purpose).one()
return cert_entry.certificate, cert_entry.private_key
except exc.NoResultFound:
return None, None
def save_certificate(session, purpose, cert, pk):
with session.begin(subtransactions=True):
cert_entry = nsx_models.NsxCertificateRepository(
purpose=purpose,
certificate=cert,
private_key=pk)
session.add(cert_entry)
def delete_certificate(session, purpose):
return (session.query(nsx_models.NsxCertificateRepository).
filter_by(purpose=purpose).delete())

View File

@ -1 +1 @@
e816d4fe9d4f
dd9fe5a3a526

View File

@ -0,0 +1,39 @@
# Copyright 2016 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""NSX Adds certificate table for client certificate management
Revision ID: dd9fe5a3a526
Revises: e816d4fe9d4f
Create Date: 2017-01-06 12:30:01.070022
"""
# revision identifiers, used by Alembic.
revision = 'dd9fe5a3a526'
down_revision = 'e816d4fe9d4f'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table('nsx_certificates',
sa.Column('purpose', sa.String(length=32), nullable=False),
sa.Column('certificate', sa.String(length=9216), nullable=False),
sa.Column('private_key', sa.String(length=5120), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('purpose'))

View File

@ -377,3 +377,16 @@ class NsxSubnetIpam(model_base.BASEV2, models.TimestampMixin):
# before the pool does
subnet_id = sa.Column(sa.String(36), primary_key=True)
nsx_pool_id = sa.Column(sa.String(36), primary_key=True)
class NsxCertificateRepository(model_base.BASEV2, models.TimestampMixin):
"""Stores certificate and private key per logical purpose.
For now, will have zero or one rows with nsxv3 client certificate
"""
__tablename__ = 'nsx_certificates'
purpose = sa.Column(sa.String(32),
nullable=False,
primary_key=True)
certificate = sa.Column(sa.String(9216), nullable=False)
private_key = sa.Column(sa.String(5120), nullable=False)

View File

@ -0,0 +1,53 @@
# Copyright 2016 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from vmware_nsx.db import db as nsx_db
NSX_OPENSTACK_IDENTITY = "com.vmware.nsx.openstack"
class DbCertificateStorageDriver(object):
"""Storage for certificate and private key in neutron DB"""
# TODO(annak): Add private key encryption
def __init__(self, context):
self._context = context
def store_cert(self, purpose, certificate, private_key):
nsx_db.save_certificate(self._context.session, purpose,
certificate, private_key)
def get_cert(self, purpose):
return nsx_db.get_certificate(self._context.session, purpose)
def delete_cert(self, purpose):
return nsx_db.delete_certificate(self._context.session, purpose)
class DummyCertificateStorageDriver(object):
"""Dummy driver API implementation
Used for external certificate import scenario
(nsx_client_cert_storage == None)
"""
def store_cert(self, purpose, certificate, private_key):
pass
def get_cert(self, purpose):
pass
def delete_cert(self, purpose):
pass

View File

@ -91,10 +91,12 @@ from vmware_nsx.extensions import advancedserviceproviders as as_providers
from vmware_nsx.extensions import maclearning as mac_ext
from vmware_nsx.extensions import providersecuritygroup as provider_sg
from vmware_nsx.extensions import securitygrouplogging as sg_logging
from vmware_nsx.plugins.nsx_v3 import cert_utils
from vmware_nsx.plugins.nsx_v3 import utils as v3_utils
from vmware_nsx.services.qos.common import utils as qos_com_utils
from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils
from vmware_nsx.services.trunk.nsx_v3 import driver as trunk_driver
from vmware_nsxlib.v3 import client_cert
from vmware_nsxlib.v3 import exceptions as nsx_lib_exc
from vmware_nsxlib.v3 import nsx_constants as nsxlib_consts
from vmware_nsxlib.v3 import resources as nsx_resources
@ -176,6 +178,10 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
self._extension_manager.initialize()
self.supported_extension_aliases.extend(
self._extension_manager.extension_aliases())
if cfg.CONF.nsx_v3.nsx_use_client_auth:
self._init_client_certificate()
self.nsxlib = v3_utils.get_nsxlib_wrapper()
# reinitialize the cluster upon fork for api workers to ensure each
# process has its own keepalive loops + state
@ -241,6 +247,29 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
attributes.SUBNETS, ['_ext_extend_subnet_dict'])
def _init_client_certificate(self):
"""Load certificate data from storage"""
LOG.info(_LI("NSX authenication will use client certificate "
"with storage type %s"),
cfg.CONF.nsx_v3.nsx_client_cert_storage)
if cfg.CONF.nsx_v3.nsx_client_cert_storage.lower() == 'none':
# nothing to do - admin is responsible for storing cert file
# in the filesystem of each neutron host
return
if cfg.CONF.nsx_v3.nsx_client_cert_storage.lower() == 'nsx-db':
context = q_context.get_admin_context()
db_storage_driver = cert_utils.DbCertificateStorageDriver(
context)
cert_manager = client_cert.ClientCertificateManager(
cert_utils.NSX_OPENSTACK_IDENTITY, None, db_storage_driver)
if not cert_manager.exists():
msg = _("Unable to load from nsx-db")
raise nsx_exc.ClientCertificateException(err_msg=msg)
# TODO(annak): add certificate expiration warning if expires soon
cert_manager.export_pem(cfg.CONF.nsx_v3.nsx_client_cert_file)
def _init_nsx_profiles(self):
LOG.debug("Initializing NSX v3 port spoofguard switching profile")
if not self._init_port_security_profile():

View File

@ -19,15 +19,20 @@ from neutron import version as n_version
from vmware_nsxlib import v3
from vmware_nsxlib.v3 import config
NSX_NEUTRON_PLUGIN = 'NSX Neutron plugin'
OS_NEUTRON_ID_SCOPE = 'os-neutron-id'
def get_nsxlib_wrapper():
def get_nsxlib_wrapper(nsx_username=None, nsx_password=None, basic_auth=False):
client_cert_file = None
if not basic_auth and cfg.CONF.nsx_v3.nsx_use_client_auth:
# if basic auth requested, dont use cert file even if provided
client_cert_file = cfg.CONF.nsx_v3.nsx_client_cert_file
nsxlib_config = config.NsxLibConfig(
username=cfg.CONF.nsx_v3.nsx_api_user,
password=cfg.CONF.nsx_v3.nsx_api_password,
username=nsx_username or cfg.CONF.nsx_v3.nsx_api_user,
password=nsx_password or cfg.CONF.nsx_v3.nsx_api_password,
client_cert_file=client_cert_file,
retries=cfg.CONF.nsx_v3.http_retries,
insecure=cfg.CONF.nsx_v3.insecure,
ca_file=cfg.CONF.nsx_v3.ca_file,

View File

@ -32,6 +32,7 @@ SECURITY_GROUPS = 'security-groups'
PORTS = 'ports'
METADATA_PROXY = 'metadata-proxy'
ORPHANED_DHCP_SERVERS = 'orphaned-dhcp-servers'
CERTIFICATE = 'certificate'
# NSXV Resource Constants
EDGES = 'edges'

View File

@ -0,0 +1,120 @@
# Copyright 2016 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from vmware_nsx._i18n import _LI
from vmware_nsx.plugins.nsx_v3 import cert_utils
from vmware_nsx.shell.admin.plugins.common import constants
from vmware_nsx.shell.admin.plugins.common import utils as admin_utils
from vmware_nsx.shell.admin.plugins.nsxv3.resources import utils
from vmware_nsx.shell import resources as shell
from vmware_nsxlib.v3 import client_cert
from vmware_nsxlib.v3 import trust_management
from neutron.callbacks import registry
from neutron import context
from oslo_config import cfg
LOG = logging.getLogger(__name__)
# default certificate validity period in days (10 years)
DEFAULT_CERT_VALIDITY_PERIOD = 3650
def get_certificate_manager(**kwargs):
username, password = None, None
if kwargs.get('property'):
properties = admin_utils.parse_multi_keyval_opt(kwargs['property'])
username = properties.get('user')
password = properties.get('password')
storage_driver_type = cfg.CONF.nsx_v3.nsx_client_cert_storage.lower()
LOG.info(_LI("Certificate storage is %s"), storage_driver_type)
if storage_driver_type == 'nsx-db':
storage_driver = cert_utils.DbCertificateStorageDriver(
context.get_admin_context())
elif storage_driver_type == 'none':
storage_driver = cert_utils.DummyCertificateStorageDriver()
# TODO(annak) - add support for barbican storage driver
nsx_client = utils.get_nsxv3_client(username, password, True)
nsx_trust = trust_management.NsxLibTrustManagement(nsx_client, {})
return client_cert.ClientCertificateManager(
cert_utils.NSX_OPENSTACK_IDENTITY,
nsx_trust,
storage_driver)
@admin_utils.output_header
def generate_cert(resource, event, trigger, **kwargs):
"""Generate self signed client certificate and private key
"""
cert_manager = get_certificate_manager(**kwargs)
if cert_manager.exists():
# Need to delete cert first
cert_manager.delete()
cert_manager.generate(subject={},
valid_for_days=DEFAULT_CERT_VALIDITY_PERIOD)
@admin_utils.output_header
def delete_cert(resource, event, trigger, **kwargs):
"""Delete client certificate and private key """
cert_manager = get_certificate_manager(**kwargs)
if cert_manager.exists():
cert_manager.delete()
@admin_utils.output_header
def show_cert(resource, event, trigger, **kwargs):
"""Show client certificate details """
cert_manager = get_certificate_manager(**kwargs)
if cert_manager.exists():
cert_pem, key_pem = cert_manager.get_pem()
expires_on = cert_manager.expires_on()
expires_in_days = cert_manager.expires_in_days()
if expires_in_days > 0:
LOG.info(_LI("Client certificate is valid. "
"Expires on %(date)s (in %(days)d days)"),
{'date': expires_on, 'days': expires_in_days})
else:
LOG.info(_LI("Client certificate expired on %s."), expires_on)
LOG.info(cert_pem)
# TODO(annak): show certificate details such as subject and crypto
# and add verification same certificate is registered in NSX.
# For imported certificate, fetch from NSX
else:
LOG.info(_LI("Client certificate was not registered in the system"))
registry.subscribe(generate_cert,
constants.CERTIFICATE,
shell.Operations.GENERATE.value)
registry.subscribe(show_cert,
constants.CERTIFICATE,
shell.Operations.SHOW.value)
registry.subscribe(delete_cert,
constants.CERTIFICATE,
shell.Operations.CLEAN.value)

View File

@ -24,12 +24,23 @@ from vmware_nsxlib.v3 import nsx_constants
_NSXLIB = None
def get_nsxv3_client():
return get_connected_nsxlib().client
def get_nsxv3_client(nsx_username=None, nsx_password=None,
use_basic_auth=False):
return get_connected_nsxlib(nsx_username,
nsx_password,
use_basic_auth).client
def get_connected_nsxlib():
def get_connected_nsxlib(nsx_username=None, nsx_password=None,
use_basic_auth=False):
global _NSXLIB
# for non-default agruments, initiate new lib
if nsx_username or use_basic_auth:
return v3_utils.get_nsxlib_wrapper(nsx_username,
nsx_password,
use_basic_auth)
if _NSXLIB is None:
_NSXLIB = v3_utils.get_nsxlib_wrapper()
return _NSXLIB

View File

@ -52,6 +52,9 @@ class Operations(enum.Enum):
NSX_MIGRATE_V_V3 = 'nsx-migrate-v-v3'
MIGRATE_TO_POLICY = 'migrate-to-policy'
STATUS = 'status'
GENERATE = 'generate'
IMPORT = 'import'
SHOW = 'show'
ops = [op.value for op in Operations]
@ -91,6 +94,11 @@ nsxv3_resources = {
constants.ORPHANED_DHCP_SERVERS: Resource(constants.ORPHANED_DHCP_SERVERS,
[Operations.NSX_LIST.value,
Operations.NSX_CLEAN.value]),
constants.CERTIFICATE: Resource(constants.CERTIFICATE,
[Operations.GENERATE.value,
Operations.SHOW.value,
Operations.CLEAN.value,
Operations.IMPORT.value])
}
# Add supported NSX-V resources in this dictionary

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import mock
import six
from webob import exc
@ -34,6 +36,7 @@ from neutron.tests.unit.extensions \
import test_l3_ext_gw_mode as test_ext_gw_mode
from neutron.tests.unit.scheduler \
import test_dhcp_agent_scheduler as test_dhcpagent
from neutron.tests.unit import testlib_api
from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import provider_net as pnet
@ -43,6 +46,7 @@ from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_utils import uuidutils
from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.common import utils
from vmware_nsx.plugins.nsx_v3 import plugin as nsx_plugin
from vmware_nsx.tests import unit as vmware
@ -98,6 +102,10 @@ def _mock_nsx_backend_calls():
"vmware_nsxlib.v3.NsxLibBridgeCluster.get_id_by_name_or_id",
return_value=uuidutils.generate_uuid()).start()
mock.patch(
"vmware_nsxlib.v3.NsxLibTransportZone.get_id_by_name_or_id",
return_value=uuidutils.generate_uuid()).start()
mock.patch(
"vmware_nsxlib.v3.NsxLibBridgeEndpoint.create",
side_effect=_return_id_key).start()
@ -134,6 +142,10 @@ def _mock_nsx_backend_calls():
"vmware_nsxlib.v3.resources.LogicalDhcpServer.create_binding",
side_effect=_return_id_key).start()
mock.patch(
"vmware_nsxlib.v3.NsxLib.get_version",
return_value="0.6.0").start()
class NsxV3PluginTestCaseMixin(test_plugin.NeutronDbPluginV2TestCase,
nsxlib_testcase.NsxClientTestCase):
@ -705,3 +717,100 @@ class ExtGwModeTestCase(test_ext_gw_mode.ExtGwModeIntTestCase,
L3NatTest):
def test_router_gateway_set_fail_after_port_create(self):
self.skipTest("TBD")
class NsxV3PluginClientCertTestCase(testlib_api.WebTestCase):
CERT = "-----BEGIN CERTIFICATE-----\n" \
"MIIDJTCCAg0CBFh36j0wDQYJKoZIhvcNAQELBQAwVzELMAkGA1UEBhMCVVMxEzAR\n" \
"BgNVBAgMCkNhbGlmb3JuaWExDjAMBgNVBAoMBU15T3JnMQ8wDQYDVQQLDAZNeVVu\n" \
"aXQxEjAQBgNVBAMMCW15b3JnLmNvbTAeFw0xNzAxMTIyMDQyMzdaFw0yNzAxMTAy\n" \
"MDQyMzdaMFcxCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMQ4wDAYD\n" \
"VQQKDAVNeU9yZzEPMA0GA1UECwwGTXlVbml0MRIwEAYDVQQDDAlteW9yZy5jb20w\n" \
"ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC/wsYintlWVaSeXwaSrdPa\n" \
"+AHtL1ooH7q0uf6tt+6Rwiy10YRjAVJhapj9995gqgJ2402J+3gzNXLCbXjjDR/D\n" \
"9xjAzKHu61r0AVNd9/0+8yXQrEDuzlwHSCKz+zjq5ZEZ7RkLIUdreaZJFPTCwry3\n" \
"wuTnBfqcE7xWl6WfWR8evooV+ZzIfjQdoSliIyn3YGxNN5pc1P40qt0pxOsNBGXG\n" \
"2FIZXpML8TpKw0ga/wE70CJd6tRvSsAADxQXehfKvGtHvlJYS+3cTahC7reQXJnc\n" \
"qsjgYkiWyhhR4jdcTD/tDlVcJroM1jFVxpsCg/AU3srWWWeAGyVe42ZhqWVf0Urz\n" \
"AgMBAAEwDQYJKoZIhvcNAQELBQADggEBAA/lLfmXe8wPyBhN/VMb5bu5Ey56qz+j\n" \
"jCn7tz7FjRvsB9P0fLUDOBKNwyon3yopDNYJ4hnm4yKoHCHURQLZKWHzm0XKzE+4\n" \
"cA/M13M8OEg5otnVVHhz1FPQWnJq7bLHh/KXYcc5Rkc7UeHEPj0sDjfUjCPGdepc\n" \
"Ghu1ZcgHsL4JCuvcadG+RFGeDTug3yO92Fj2uFy5DlzzWOZSi4otpZRd9JZkAtZ1\n" \
"umZRBJ2A504nJx4MplmNqvLNkmxMLKQdvZYNNiYr6icOavDOJA5RhzgoppJZkV2w\n" \
"v2oC+8BFarXnZSk37HAWjwcaqzBLbIyPYpClW5IYMr8LiixSBACc+4w=\n" \
"-----END CERTIFICATE-----\n"
PKEY = "-----BEGIN PRIVATE KEY-----\n" \
"MIIEwAIBADANBgkqhkiG9w0BAQEFAASCBKowggSmAgEAAoIBAQC/wsYintlWVaSe\n" \
"XwaSrdPa+AHtL1ooH7q0uf6tt+6Rwiy10YRjAVJhapj9995gqgJ2402J+3gzNXLC\n" \
"bXjjDR/D9xjAzKHu61r0AVNd9/0+8yXQrEDuzlwHSCKz+zjq5ZEZ7RkLIUdreaZJ\n" \
"FPTCwry3wuTnBfqcE7xWl6WfWR8evooV+ZzIfjQdoSliIyn3YGxNN5pc1P40qt0p\n" \
"xOsNBGXG2FIZXpML8TpKw0ga/wE70CJd6tRvSsAADxQXehfKvGtHvlJYS+3cTahC\n" \
"7reQXJncqsjgYkiWyhhR4jdcTD/tDlVcJroM1jFVxpsCg/AU3srWWWeAGyVe42Zh\n" \
"qWVf0UrzAgMBAAECggEBAJrGuie9cQy3KZzOdD614RaPMPbhTnKuUYOH0GEk4YFy\n" \
"aaYDS0iiC30njf8HLs10y3JsOuyRNU6X6F24AGe68xW3/pm3UUjHXG0wGLry68wA\n" \
"c1g/gFV/6FXUSnZc4m7uBjUX4yvRm5TK5oV8TaZZifsEar9xWvrZDx4RXpQEWhL0\n" \
"L/TyrOZSfRtBgdWX6Ag4XQVsCfZxJoCi2ZyvaMBsWTH06x9AGo1Io5t1AmA9Hsfb\n" \
"6BsSz186nqb0fq4UMfrWrSCz7M/1s03+hBOVICH2TdaRDZLtDVa1b2x4sFpfdp9t\n" \
"VVxuSHxcmvzOPMIv3NXwj0VitTYYJDBFKoEfx1mzhNkCgYEA59gYyBfpsuCOevP2\n" \
"tn7IeysbtaoKDzHE+ksjs3sAn6Vr2Y0Lbed26NpdIVL6u3HAteJxqrIh0zpkpAtp\n" \
"akdqlj86oRaBUqLXxK3QNpUx19f7KN7UsVAbzUJSOm2n1piPg261ktfhtms2rxnQ\n" \
"+9yluINu+z1wS4FG9SwrRmwwfsUCgYEA072Ma1sj2MER5tmQw1zLANkzP1PAkUdy\n" \
"+oDuJmU9A3/+YSIkm8dGprFglPkLUaf1B15oN6wCJVMpB1lza3PM/YT70rpqc7cq\n" \
"PHJXQlZFMBhyVfIkCv3wICTLD5phhgAWlzlwm094f2uAnbG6WUkrVfZajuh0pW53\n" \
"1i0OTfxAvlcCgYEAkDB2oSM2JhjApDlMbA2HtAqIbkA1h2OlpSDMMFjEd4WTALdW\n" \
"r2CwNHtyRkJsS92gQ750gPvOS6daZifuxLlr0cu7M+piPbmnRdvvzbKWUC40NyP2\n" \
"1dwDnnGr4EjIhI9XWh+lb5EyAJjHZrlAnxOIQawEft6kE2FwdxSkSWUJ+B0CgYEA\n" \
"n2xYDXzRwKGdmPK2zGFRd5IRw9yLYNcq+vGYXdBb4Aa+wOO0LJYd2+Qxk/jvTMvo\n" \
"8WNjlIcuFmxGuAHhpUXLUhaOhFtXS0jdxCVTDd9muI+vhoaKHLyVz53kRhs20m2+\n" \
"lJ3q6wUq9MU8UX8/j3pH5rFV/cOIEAbcs6W4337OQIECgYEAoLtQyqXjH45FlCQx\n" \
"xK8dY+GuxIP+TIwiq23yhu3e+3LIgXJw8DwBFN5yJyH2HMnhGkD4PurEx2sGHeLO\n" \
"EG6L8PNDOxpvSzcgxwmZsUK6j3nAbKycF3PDDXA4kt8WDXBr86OMQsFtpjeO+fGh\n" \
"YWJa+OKc2ExdeMewe9gKIDQ5stw=\n" \
"-----END PRIVATE KEY-----\n"
CERTFILE = '/tmp/client_cert.pem'
def _init_config(self):
cfg.CONF.set_override('default_overlay_tz', NSX_TZ_NAME, 'nsx_v3')
cfg.CONF.set_override('native_dhcp_metadata', False, 'nsx_v3')
cfg.CONF.set_override('dhcp_profile',
NSX_DHCP_PROFILE_ID, 'nsx_v3')
cfg.CONF.set_override('metadata_proxy',
NSX_METADATA_PROXY_ID, 'nsx_v3')
cfg.CONF.set_override('nsx_use_client_auth', True, 'nsx_v3')
cfg.CONF.set_override('nsx_client_cert_file', self.CERTFILE, 'nsx_v3')
cfg.CONF.set_override('nsx_client_cert_storage', 'nsx-db', 'nsx_v3')
def _init_plugin(self):
self._tenant_id = test_plugin.TEST_TENANT_ID
self._init_config()
self.setup_coreplugin(PLUGIN_NAME, load_plugins=True)
def test_init_without_cert(self):
# certificate not generated - exception should be raised
self.assertRaises(nsx_exc.ClientCertificateException,
self._init_plugin)
def test_init_with_cert(self):
mock.patch(
"vmware_nsx.db.db.get_certificate",
return_value=(self.CERT, self.PKEY)).start()
_mock_nsx_backend_calls()
self._init_plugin()
# verify cert data was exported to CERTFILE
expected = self.CERT + self.PKEY
with open(self.CERTFILE, 'r') as f:
actual = f.read()
self.assertEqual(expected, actual)
# delete CERTFILE
os.remove(self.CERTFILE)
# TODO(annak): add test that verifies bad crypto data raises exception
# when OPENSSL exception wrapper is available from NSXLIB