@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import os
from neutron_lib import exceptions
from OpenSSL import crypto
@ -24,7 +25,7 @@ from vmware_nsxlib.tests.unit.v3 import test_client
from vmware_nsxlib . v3 import client
from vmware_nsxlib . v3 import client_cert
from vmware_nsxlib . v3 import exceptions as nsxlib_exc
from vmware_nsxlib . v3 import trust_management
from vmware_nsxlib . v3 import trust_management as tm
class DummyStorageDriver ( dict ) :
@ -53,6 +54,7 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
identity = ' drumknott '
cert_id = " 00000000-1111-2222-3333-444444444444 "
identity_id = " 55555555-6666-7777-8888-999999999999 "
node_id = " meh "
def _get_mocked_response ( self , status_code , results ) :
return mocks . MockRequestsResponse (
@ -67,17 +69,31 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
' module_name ' : ' never mind ' ,
' error message ' : ' bad luck ' } ) )
def _get_mocked_trust ( self , action ):
def _get_mocked_trust ( self , action , cert_pem ):
fake_responses = [ ]
if action == ' create ' :
if ' create ' in action :
# import cert and return its id
results = [ { ' id ' : self . cert_id } ]
fake_responses . append ( self . _get_mocked_response ( 201 , results ) )
# and then bind this id to principal identity
fake_responses . append ( self . _get_mocked_response ( 201 , [ ] ) )
elif action == ' delete ' :
if ' delete ' in action :
nsx_style_pem = tm . NsxLibTrustManagement . remove_newlines_from_pem (
cert_pem )
# get certs list, including same cert imported twice edge case
results = [ { ' resource_type ' : ' Certificate ' ,
' id ' : ' dont care ' ,
' pem_encoded ' : ' some junk ' } ,
{ ' resource_type ' : ' Certificate ' ,
' id ' : ' some_other_cert_id ' ,
' pem_encoded ' : nsx_style_pem } ,
{ ' resource_type ' : ' Certificate ' ,
' id ' : self . cert_id ,
' pem_encoded ' : nsx_style_pem } ]
fake_responses . append ( self . _get_mocked_response ( 200 , results ) )
# get principal identities list
results = [ { ' resource_type ' : ' Principal Identity ' ,
' id ' : ' dont care ' ,
@ -97,21 +113,62 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
client . JSONRESTClient ,
url_prefix = ' api/v1 ' , session_response = fake_responses )
return trust_management . NsxLibTrustManagement ( mock_client , { } )
return tm . NsxLibTrustManagement ( mock_client , { } )
def _verify_backend_create ( self , mocked_trust , cert_pem ) :
""" Verify API calls to create cert and identity on backend """
# verify API call to import cert on backend
cert_pem = mocked_trust . remove_newlines_from_pem ( cert_pem )
base_uri = ' https://1.2.3.4/api/v1/trust-management '
uri = base_uri + ' /certificates?action=import '
expected_body = { ' pem_encoded ' : cert_pem }
test_client . assert_json_call ( ' post ' , mocked_trust . client , uri ,
single_call = False ,
data = jsonutils . dumps ( expected_body ) )
# verify API call to bind cert to identity on backend
uri = base_uri + ' /principal-identities '
expected_body = { ' name ' : self . identity ,
' node_id ' : self . node_id ,
' permission_group ' : ' read_write_api_users ' ,
' certificate_id ' : self . cert_id ,
' is_protected ' : True }
test_client . assert_json_call ( ' post ' , mocked_trust . client , uri ,
single_call = False ,
data = jsonutils . dumps ( expected_body ,
sort_keys = True ) )
def _verify_backend_delete ( self , mocked_trust ) :
""" Verify API calls to fetch and delete cert and identity """
# verify API call to query identities in order to get cert id
base_uri = ' https://1.2.3.4/api/v1/trust-management '
uri = base_uri + ' /principal-identities '
test_client . assert_json_call ( ' get ' , mocked_trust . client , uri ,
single_call = False )
# verify API call to delete openstack principal identity
uri = uri + ' / ' + self . identity_id
test_client . assert_json_call ( ' delete ' , mocked_trust . client , uri ,
single_call = False )
# verify API call to delete certificate
uri = base_uri + ' /certificates/ ' + self . cert_id
test_client . assert_json_call ( ' delete ' , mocked_trust . client , uri ,
single_call = False )
def test_generate_cert ( self ) :
""" Test startup without certificate + certificate generation """
storage_driver = DummyStorageDriver ( )
# Prepare fake trust management for "cert create" requests
mocked_trust = self . _get_mocked_trust ( ' create ' )
cert_pem , key_pem = storage_driver . get_cert ( self . identity )
mocked_trust = self . _get_mocked_trust ( ' create ' , cert_pem )
cert = client_cert . ClientCertificateManager ( self . identity ,
mocked_trust ,
storage_driver )
self . assertFalse ( cert . exists ( ) )
cert . generate ( subject = { } , key_size = 2048 , valid_for_days = 333 ,
node_id = ' meh ' )
node_id = self . node_id )
# verify client cert was generated and makes sense
self . assertTrue ( cert . exists ( ) )
@ -123,26 +180,8 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
self . assertEqual ( cert_pem , stored_cert )
self . assertEqual ( key_pem , stored_key )
# verify API call to import cert on backend
cert_pem = mocked_trust . remove_newlines_from_pem ( cert_pem )
base_uri = ' https://1.2.3.4/api/v1/trust-management '
uri = base_uri + ' /certificates?action=import '
expected_body = { ' pem_encoded ' : cert_pem }
test_client . assert_json_call ( ' post ' , mocked_trust . client , uri ,
single_call = False ,
data = jsonutils . dumps ( expected_body ) )
# verify API call to bind cert to identity on backend
uri = base_uri + ' /principal-identities '
expected_body = { ' name ' : self . identity ,
' node_id ' : ' meh ' ,
' permission_group ' : ' read_write_api_users ' ,
' certificate_id ' : self . cert_id ,
' is_protected ' : True }
test_client . assert_json_call ( ' post ' , mocked_trust . client , uri ,
single_call = False ,
data = jsonutils . dumps ( expected_body ,
sort_keys = True ) )
# verify backend API calls
self . _verify_backend_create ( mocked_trust , cert_pem )
# try to generate cert again and fail
self . assertRaises ( nsxlib_exc . ObjectAlreadyExists ,
@ -170,7 +209,8 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
# get mocked backend driver for trust management,
# prepared for get request, that preceeds delete operation
mocked_trust = self . _get_mocked_trust ( ' delete ' )
cert_pem , key_pem = storage_driver . get_cert ( self . identity )
mocked_trust = self . _get_mocked_trust ( ' delete ' , cert_pem )
cert = client_cert . ClientCertificateManager ( self . identity ,
mocked_trust ,
@ -182,21 +222,42 @@ class NsxV3ClientCertificateTestCase(nsxlib_testcase.NsxClientTestCase):
self . assertFalse ( cert . exists ( ) )
self . assertTrue ( storage_driver . is_empty ( self . identity ) )
# verify API call to query identities in order to get cert id
base_uri = ' https://1.2.3.4/api/v1/trust-management '
uri = base_uri + ' /principal-identities '
test_client . assert_json_call ( ' get ' , mocked_trust . client , uri ,
single_call = False )
self . _verify_backend_delete ( mocked_trust )
# verify API call to delete openstack principal identity
uri = uri + ' / ' + self . identity_id
test_client . assert_json_call ( ' delete ' , mocked_trust . client , uri ,
single_call = False )
def _test_import_and_delete_cert ( self , with_pkey = True ) :
filename = ' /tmp/test.pem '
# this driver simulates storage==none scenario
noop_driver = DummyStorageDriver ( )
cert , key = client_cert . generate_self_signed_cert_pair ( 4096 ,
20 ,
' sha256 ' ,
{ } )
# verify API call to delete certificate
uri = base_uri + ' /certificates/ ' + self . cert_id
test_client . assert_json_call ( ' delete ' , mocked_trust . client , uri ,
single_call = False )
cert_pem = crypto . dump_certificate ( crypto . FILETYPE_PEM , cert )
key_pem = crypto . dump_privatekey ( crypto . FILETYPE_PEM , key )
with open ( filename , ' wb ' ) as f :
f . write ( cert_pem )
if with_pkey :
f . write ( key_pem )
mocked_trust = self . _get_mocked_trust ( ' create_delete ' ,
cert_pem )
cert = client_cert . ClientCertificateManager ( self . identity ,
mocked_trust ,
noop_driver )
cert . import_pem ( filename , self . node_id )
self . _verify_backend_create ( mocked_trust , cert_pem )
cert . delete_pem ( filename )
self . _verify_backend_delete ( mocked_trust )
os . remove ( filename )
def test_import_and_delete_cert_pkey ( self ) :
self . _test_import_and_delete_cert ( True )
def test_import_and_delete_cert_only ( self ) :
self . _test_import_and_delete_cert ( False )
def test_get_certificate_details ( self ) :
""" Test retrieving cert details for existing cert """