heat/contrib/rackspace/heat_keystoneclient_v2/tests/test_client.py

275 lines
11 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import mox
from oslo_config import cfg
from oslo_utils import importutils
from heat.common import exception
from heat.tests import common
from heat.tests import utils
from .. import client as heat_keystoneclient # noqa
class KeystoneClientTest(common.HeatTestCase):
"""Test cases for heat.common.heat_keystoneclient."""
def setUp(self):
super(KeystoneClientTest, self).setUp()
self.ctx = utils.dummy_context()
# Import auth_token to have keystone_authtoken settings setup.
importutils.import_module('keystonemiddleware.auth_token')
dummy_url = 'http://server.test:5000/v2.0'
cfg.CONF.set_override('auth_uri', dummy_url,
group='keystone_authtoken')
cfg.CONF.set_override('admin_user', 'heat',
group='keystone_authtoken')
cfg.CONF.set_override('admin_password', 'verybadpass',
group='keystone_authtoken')
cfg.CONF.set_override('admin_tenant_name', 'service',
group='keystone_authtoken')
self.addCleanup(self.m.VerifyAll)
def _stubs_v2(self, method='token', auth_ok=True, trust_scoped=True,
user_id='trustor_user_id', region=None):
self.mock_ks_client = self.m.CreateMock(heat_keystoneclient.kc.Client)
self.m.StubOutWithMock(heat_keystoneclient.kc, "Client")
if method == 'token':
heat_keystoneclient.kc.Client(
auth_url=mox.IgnoreArg(),
endpoint=mox.IgnoreArg(),
tenant_name='test_tenant',
token='abcd1234',
cacert=None,
cert=None,
insecure=False,
region_name=region,
key=None).AndReturn(self.mock_ks_client)
self.mock_ks_client.authenticate().AndReturn(auth_ok)
elif method == 'password':
heat_keystoneclient.kc.Client(
auth_url=mox.IgnoreArg(),
endpoint=mox.IgnoreArg(),
tenant_name='test_tenant',
tenant_id='test_tenant_id',
username='test_username',
password='password',
cacert=None,
cert=None,
insecure=False,
region_name=region,
key=None).AndReturn(self.mock_ks_client)
self.mock_ks_client.authenticate().AndReturn(auth_ok)
if method == 'trust':
heat_keystoneclient.kc.Client(
auth_url='http://server.test:5000/v2.0',
endpoint='http://server.test:5000/v2.0',
password='verybadpass',
tenant_name='service',
username='heat',
cacert=None,
cert=None,
insecure=False,
region_name=region,
key=None).AndReturn(self.mock_ks_client)
self.mock_ks_client.authenticate(trust_id='atrust123',
tenant_id='test_tenant_id'
).AndReturn(auth_ok)
self.mock_ks_client.auth_ref = self.m.CreateMockAnything()
self.mock_ks_client.auth_ref.trust_scoped = trust_scoped
self.mock_ks_client.auth_ref.auth_token = 'atrusttoken'
self.mock_ks_client.auth_ref.user_id = user_id
def test_username_length(self):
"""Test that user names >64 characters are properly truncated."""
self._stubs_v2()
# a >64 character user name and the expected version
long_user_name = 'U' * 64 + 'S'
good_user_name = long_user_name[-64:]
# mock keystone client user functions
self.mock_ks_client.users = self.m.CreateMockAnything()
mock_user = self.m.CreateMockAnything()
# when keystone is called, the name should have been truncated
# to the last 64 characters of the long name
(self.mock_ks_client.users.create(good_user_name, 'password',
mox.IgnoreArg(), enabled=True,
tenant_id=mox.IgnoreArg())
.AndReturn(mock_user))
# mock out the call to roles; will send an error log message but does
# not raise an exception
self.mock_ks_client.roles = self.m.CreateMockAnything()
self.mock_ks_client.roles.list().AndReturn([])
self.m.ReplayAll()
# call create_stack_user with a long user name.
# the cleanup VerifyAll should verify that though we passed
# long_user_name, keystone was actually called with a truncated
# user name
self.ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClientV2(self.ctx)
heat_ks_client.create_stack_user(long_user_name, password='password')
def test_init_v2_password(self):
"""Test creating the client, user/password context."""
self._stubs_v2(method='password')
self.m.ReplayAll()
self.ctx.auth_token = None
self.ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClientV2(self.ctx)
self.assertIsNotNone(heat_ks_client.client)
def test_init_v2_bad_nocreds(self):
"""Test creating the client without trusts, no credentials."""
self.ctx.auth_token = None
self.ctx.username = None
self.ctx.password = None
self.ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClientV2(self.ctx)
self.assertRaises(exception.AuthorizationFailure,
heat_ks_client._v2_client_init)
def test_trust_init(self):
"""Test consuming a trust when initializing."""
self._stubs_v2(method='trust')
self.m.ReplayAll()
self.ctx.username = None
self.ctx.password = None
self.ctx.auth_token = None
self.ctx.trust_id = 'atrust123'
self.ctx.trustor_user_id = 'trustor_user_id'
heat_ks_client = heat_keystoneclient.KeystoneClientV2(self.ctx)
client = heat_ks_client.client
self.assertIsNotNone(client)
def test_trust_init_fail(self):
"""Test consuming a trust when initializing, error scoping."""
self._stubs_v2(method='trust', trust_scoped=False)
self.m.ReplayAll()
self.ctx.username = None
self.ctx.password = None
self.ctx.auth_token = None
self.ctx.trust_id = 'atrust123'
self.ctx.trustor_user_id = 'trustor_user_id'
self.assertRaises(exception.AuthorizationFailure,
heat_keystoneclient.KeystoneClientV2, self.ctx)
def test_trust_init_fail_impersonation(self):
"""Test consuming a trust when initializing, impersonation error."""
self._stubs_v2(method='trust', user_id='wrong_user_id')
self.m.ReplayAll()
self.ctx.username = 'heat'
self.ctx.password = None
self.ctx.auth_token = None
self.ctx.trust_id = 'atrust123'
self.ctx.trustor_user_id = 'trustor_user_id'
self.assertRaises(exception.AuthorizationFailure,
heat_keystoneclient.KeystoneClientV2, self.ctx)
def test_trust_init_pw(self):
"""Test trust_id is takes precedence username/password specified."""
self._stubs_v2(method='trust')
self.m.ReplayAll()
self.ctx.auth_token = None
self.ctx.trust_id = 'atrust123'
self.ctx.trustor_user_id = 'trustor_user_id'
heat_ks_client = heat_keystoneclient.KeystoneClientV2(self.ctx)
self.assertIsNotNone(heat_ks_client._client)
def test_trust_init_token(self):
"""Test trust_id takes precedence when token specified."""
self._stubs_v2(method='trust')
self.m.ReplayAll()
self.ctx.username = None
self.ctx.password = None
self.ctx.trust_id = 'atrust123'
self.ctx.trustor_user_id = 'trustor_user_id'
heat_ks_client = heat_keystoneclient.KeystoneClientV2(self.ctx)
self.assertIsNotNone(heat_ks_client._client)
def test_region_name(self):
"""Test region_name is used when specified."""
self._stubs_v2(method='trust', region='region123')
self.m.ReplayAll()
self.ctx.username = None
self.ctx.password = None
self.ctx.auth_token = None
self.ctx.trust_id = 'atrust123'
self.ctx.trustor_user_id = 'trustor_user_id'
self.ctx.region_name = 'region123'
heat_keystoneclient.KeystoneClientV2(self.ctx)
self.m.VerifyAll()
# ##################### #
# V3 Compatible Methods #
# ##################### #
def test_create_stack_domain_user_pass_through_to_create_stack_user(self):
heat_ks_client = heat_keystoneclient.KeystoneClientV2(self.ctx)
mock_create_stack_user = mock.Mock()
heat_ks_client.create_stack_user = mock_create_stack_user
heat_ks_client.create_stack_domain_user('username', 'project_id',
'password')
mock_create_stack_user.assert_called_once_with('username', 'password')
def test_delete_stack_domain_user_pass_through_to_delete_stack_user(self):
heat_ks_client = heat_keystoneclient.KeystoneClientV2(self.ctx)
mock_delete_stack_user = mock.Mock()
heat_ks_client.delete_stack_user = mock_delete_stack_user
heat_ks_client.delete_stack_domain_user('user_id', 'project_id')
mock_delete_stack_user.assert_called_once_with('user_id')
def test_create_stack_domain_project(self):
tenant_id = self.ctx.tenant_id
ks = heat_keystoneclient.KeystoneClientV2(self.ctx)
self.assertEqual(tenant_id, ks.create_stack_domain_project('fakeid'))
def test_delete_stack_domain_project(self):
heat_ks_client = heat_keystoneclient.KeystoneClientV2(self.ctx)
self.assertIsNone(heat_ks_client.delete_stack_domain_project('fakeid'))
# ###################### #
# V3 Unsupported Methods #
# ###################### #
def test_create_trust_context(self):
heat_ks_client = heat_keystoneclient.KeystoneClientV2(self.ctx)
self.assertRaises(exception.NotSupported,
heat_ks_client.create_trust_context)
def test_delete_trust(self):
heat_ks_client = heat_keystoneclient.KeystoneClientV2(self.ctx)
self.assertRaises(exception.NotSupported,
heat_ks_client.delete_trust,
'fake_trust_id')