@ -12,10 +12,13 @@
import mock
from oslo_config import cfg
from oslo_config import fixture
cfg . CONF . import_group ( ' keystone_authtoken ' ,
' keystonemiddleware.auth_token ' )
from keystoneauth1 import exceptions as ka_exception
from keystoneauth1 import identity as ka_identity
import keystoneclient . exceptions as kc_exception
from magnum . common import exception
@ -25,79 +28,86 @@ from magnum.tests import utils
@mock . patch ( ' keystoneclient.v3.client.Client ' )
class KeystoneClientTest ( base . Base TestCase) :
class KeystoneClientTest ( base . TestCase ) :
def setUp ( self ) :
super ( KeystoneClientTest , self ) . setUp ( )
dummy_url = ' http://server.test:5000/v2.0 '
dummy_url = ' http://server.test:5000/v3 '
self . ctx = utils . dummy_context ( )
self . ctx . auth_url = dummy_url
self . ctx . auth_token = ' abcd1234 '
cfg . CONF . set_override ( ' auth_uri ' , dummy_url ,
group = ' keystone_authtoken ' )
cfg . CONF . set_override ( ' admin_user ' , ' magnum ' ,
group = ' keystone_authtoken ' )
cfg . CONF . set_override ( ' admin_password ' , ' verybadpass ' ,
group = ' keystone_authtoken ' )
cfg . CONF . set_override ( ' admin_tenant_name ' , ' service ' ,
group = ' keystone_authtoken ' )
def test_client_with_token ( self , mock_ks ) :
plugin = keystone . ka_loading . get_plugin_loader ( ' password ' )
opts = keystone . ka_loading . get_auth_plugin_conf_options ( plugin )
cfg_fixture = self . useFixture ( fixture . Config ( ) )
cfg_fixture . register_opts ( opts , group = keystone . CFG_GROUP )
self . config ( auth_type = ' password ' ,
auth_url = dummy_url ,
username = ' fake_user ' ,
password = ' fake_pass ' ,
project_name = ' fake_project ' ,
group = keystone . CFG_GROUP )
self . config ( auth_uri = dummy_url ,
admin_user = ' magnum ' ,
admin_password = ' varybadpass ' ,
admin_tenant_name = ' service ' ,
group = keystone . CFG_LEGACY_GROUP )
def test_client_with_password ( self , mock_ks ) :
self . ctx . is_admin = True
ks_client = keystone . KeystoneClientV3 ( self . ctx )
ks_client . client
self . assertIsNotNone ( ks_client . _client )
mock_ks . assert_called_once_with ( token = ' abcd1234 ' ,
auth_url = ' http://server.test:5000/v3 ' ,
endpoint = ' http://server.test:5000/v3 ' )
def test_client_with_no_credentials ( self , mock_ks ) :
self . ctx . auth_token = None
session = ks_client . session
auth_plugin = session . auth
mock_ks . assert_called_once_with ( session = session , trust_id = None )
self . assertIsInstance ( auth_plugin , ka_identity . Password )
@mock . patch ( ' magnum.common.keystone.ka_loading ' )
@mock . patch ( ' magnum.common.keystone.ka_v3 ' )
def test_client_with_password_legacy ( self , mock_v3 , mock_loading , mock_ks ) :
self . ctx . is_admin = True
mock_loading . load_auth_from_conf_options . side_effect = \
ka_exception . MissingRequiredOptions ( mock . MagicMock ( ) )
ks_client = keystone . KeystoneClientV3 ( self . ctx )
self . assertRaises ( exception . AuthorizationFailure ,
ks_client . _get_ks_client )
def test_client_with_v2_auth_token_info ( self , mock_ks ) :
self . ctx . auth_token_info = { ' access ' : { } }
ks_client . client
session = ks_client . session
self . assertWarnsRegex ( Warning ,
' [keystone_authtoken] section is deprecated ' )
mock_v3 . Password . assert_called_once_with (
auth_url = ' http://server.test:5000/v3 ' , password = ' varybadpass ' ,
project_domain_id = ' default ' , project_name = ' service ' ,
user_domain_id = ' default ' , username = ' magnum ' )
mock_ks . assert_called_once_with ( session = session , trust_id = None )
@mock . patch ( ' magnum.common.keystone.ka_access ' )
def test_client_with_access_info ( self , mock_access , mock_ks ) :
self . ctx . auth_token_info = mock . MagicMock ( )
ks_client = keystone . KeystoneClientV3 ( self . ctx )
ks_client . client
self . assertIsNotNone ( ks_client . _client )
mock_ks . assert_called_once_with ( auth_ref = { ' version ' : ' v2.0 ' } ,
auth_url = ' http://server.test:5000/v3 ' ,
endpoint = ' http://server.test:5000/v3 ' ,
token = ' abcd1234 ' )
def test_client_with_v3_auth_token_info ( self , mock_ks ) :
self . ctx . auth_token_info = { ' token ' : { } }
session = ks_client . session
auth_plugin = session . auth
mock_access . create . assert_called_once_with ( body = mock . ANY ,
auth_token = ' abcd1234 ' )
mock_ks . assert_called_once_with ( session = session , trust_id = None )
self . assertIsInstance ( auth_plugin , ka_identity . access . AccessInfoPlugin )
@mock . patch ( ' magnum.common.keystone.ka_v3 ' )
def test_client_with_token ( self , mock_v3 , mock_ks ) :
ks_client = keystone . KeystoneClientV3 ( self . ctx )
ks_client . client
self . assertIsNotNone ( ks_client . _client )
mock_ks . assert_called_once_with ( auth_ref = { ' version ' : ' v3 ' } ,
auth_url = ' http://server.test:5000/v3 ' ,
endpoint = ' http://server.test:5000/v3 ' ,
token = ' abcd1234 ' )
def test_client_with_invalid_auth_token_info ( self , mock_ks ) :
self . ctx . auth_token_info = { ' not_this ' : ' urg ' }
session = ks_client . session
mock_v3 . Token . assert_called_once_with (
auth_url = ' http://server.test:5000/v3 ' , token = ' abcd1234 ' )
mock_ks . assert_called_once_with ( session = session , trust_id = None )
def test_client_with_no_credentials ( self , mock_ks ) :
self . ctx . auth_token = None
ks_client = keystone . KeystoneClientV3 ( self . ctx )
self . assertRaises ( exception . AuthorizationFailure ,
ks_client . _get_ks_client )
def test_client_with_is_admin ( self , mock_ks ) :
self . ctx . is_admin = True
ks_client = keystone . KeystoneClientV3 ( self . ctx )
ks_client . client
self . assertIsNone ( ks_client . _client )
self . assertIsNotNone ( ks_client . _admin_client )
mock_ks . assert_called_once_with ( auth_url = ' http://server.test:5000/v3 ' ,
username = ' magnum ' ,
password = ' verybadpass ' ,
project_name = ' service ' )
ks_client . _get_auth )
mock_ks . assert_not_called ( )
def test_delete_trust ( self , mock_ks ) :
mock_ks . return_value . trusts . delete . return_value = None
@ -111,9 +121,10 @@ class KeystoneClientTest(base.BaseTestCase):
ks_client = keystone . KeystoneClientV3 ( self . ctx )
self . assertIsNone ( ks_client . delete_trust ( trust_id = ' atrust123 ' ) )
def test_create_trust_with_all_roles ( self , mock_ks ) :
mock_ks . return_value . auth_ref . user_id = ' 123456 '
mock_ks . return_value . auth_ref . project_id = ' 654321 '
@mock . patch ( ' magnum.common.keystone.ka_session.Session ' )
def test_create_trust_with_all_roles ( self , mock_session , mock_ks ) :
mock_session . return_value . get_user_id . return_value = ' 123456 '
mock_session . return_value . get_project_id . return_value = ' 654321 '
self . ctx . roles = [ ' role1 ' , ' role2 ' ]
ks_client = keystone . KeystoneClientV3 ( self . ctx )
@ -125,9 +136,10 @@ class KeystoneClientTest(base.BaseTestCase):
trustee_user = ' 888888 ' , role_names = [ ' role1 ' , ' role2 ' ] ,
impersonation = True )
def test_create_trust_with_limit_roles ( self , mock_ks ) :
mock_ks . return_value . auth_ref . user_id = ' 123456 '
mock_ks . return_value . auth_ref . project_id = ' 654321 '
@mock . patch ( ' magnum.common.keystone.ka_session.Session ' )
def test_create_trust_with_limit_roles ( self , mock_session , mock_ks ) :
mock_session . return_value . get_user_id . return_value = ' 123456 '
mock_session . return_value . get_project_id . return_value = ' 654321 '
self . ctx . roles = [ ' role1 ' , ' role2 ' ]
ks_client = keystone . KeystoneClientV3 ( self . ctx )