heat_keystoneclient: Fix consuming trusts via v2 API

Contrary to my understanding when I first wrote this code, it is
possible to consume trusts (ie get a trust scoped token) via the
v2 keystone API.  So this patch reworks the heat_keystoneclient
so we:
- Consume trusts via the v2 client (so we can access the ec2tokens
  extension which is v2-only)
- Only create a v3 client when required (when creating or deleting
  a trust), so we minimise the requests to keystone.
- Similarly it's only necessary to create the v2 client when creating
  the client object if we need to consume a trust
- When a trust is found in the context, we always consume it
- Don't modify the request context, as this has undesired side-effects
  if the context is subsequently used to perform actions which require
  obtaining a keystone token, instead create a context containing a
  trust_id and trustor_user_id when the stack is stored.

Note this change depends on this patch to python-keystoneclient:
https://review.openstack.org/#/c/48462/, so consuming trusts won't
work unless a version of keystoneclient including this change is
used.

Change-Id: I61b380bc63d606c128ce029f1960c6812a3324e3
Closes-Bug: #1227901
This commit is contained in:
Steven Hardy
2013-09-27 08:11:03 +01:00
parent 003a3b978d
commit 288ae018eb
7 changed files with 200 additions and 237 deletions

View File

@@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from heat.common import context
from heat.common import exception
import eventlet
import hashlib
from keystoneclient.v2_0 import client as kc
from keystoneclient.v3 import client as kc_v3
@@ -38,45 +38,57 @@ class KeystoneClient(object):
directly instantiate instances of this class inside resources themselves
"""
def __init__(self, context):
self.context = context
# We have to maintain two clients authenticated with keystone:
# - ec2 interface is v2.0 only
# - trusts is v3 only
# - passing a v2 auth_token to the v3 client won't work until lp bug
# #1212778 is fixed
# - passing a v3 token to the v2 client works but we have to either
# md5sum it or use the nocatalog option to auth/tokens (not yet
# supported by keystoneclient), or we hit the v2 8192byte size limit
# If a trust_id is specified in the context, we immediately
# authenticate so we can populate the context with a trust token
# otherwise, we delay client authentication until needed to avoid
# unnecessary calls to keystone.
#
# Note that when you obtain a token using a trust, it cannot be
# used to reauthenticate and get another token, so we have to
# get a new trust-token even if context.auth_token is set.
#
# - context.auth_url is expected to contain the v2.0 keystone endpoint
if cfg.CONF.deferred_auth_method == 'trusts':
self.context = context
self._client_v2 = None
self._client_v3 = None
if self.context.trust_id:
# Create a connection to the v2 API, with the trust_id, this
# populates self.context.auth_token with a trust-scoped token
self._client_v2 = self._v2_client_init()
@property
def client_v3(self):
if not self._client_v3:
# Create connection to v3 API
self.client_v3 = self._v3_client_init()
self._client_v3 = self._v3_client_init()
return self._client_v3
# Set context auth_token to md5sum of v3 token
auth_token = self.client_v3.auth_ref.get('auth_token')
self.context.auth_token = self._md5_token(auth_token)
# Create the connection to the v2 API, reusing the md5-ified token
self.client_v2 = self._v2_client_init()
else:
# Create the connection to the v2 API, using the context creds
self.client_v2 = self._v2_client_init()
self.client_v3 = None
def _md5_token(self, auth_token):
# Get the md5sum of the v3 token, which we can pass instead of the
# actual token to avoid v2 8192byte size limit on the v2 token API
m_enc = hashlib.md5()
m_enc.update(auth_token)
return m_enc.hexdigest()
@property
def client_v2(self):
if not self._client_v2:
self._client_v2 = self._v2_client_init()
return self._client_v2
def _v2_client_init(self):
kwargs = {
'auth_url': self.context.auth_url
}
# Note check for auth_token first so we use existing token if
# available from v3 auth
if self.context.auth_token is not None:
auth_kwargs = {}
# Note try trust_id first, as we can't reuse auth_token in that case
if self.context.trust_id is not None:
# We got a trust_id, so we use the admin credentials
# to authenticate, then re-scope the token to the
# trust impersonating the trustor user.
# Note that this currently requires the trustor tenant_id
# to be passed to the authenticate(), unlike the v3 call
kwargs.update(self._service_admin_creds(api_version=2))
auth_kwargs['trust_id'] = self.context.trust_id
auth_kwargs['tenant_id'] = self.context.tenant_id
elif self.context.auth_token is not None:
kwargs['tenant_name'] = self.context.tenant
kwargs['token'] = self.context.auth_token
elif self.context.password is not None:
@@ -89,9 +101,19 @@ class KeystoneClient(object):
"auth_token!")
raise exception.AuthorizationFailure()
client_v2 = kc.Client(**kwargs)
if not client_v2.authenticate():
logger.error("Keystone v2 API authentication failed")
raise exception.AuthorizationFailure()
client_v2.authenticate(**auth_kwargs)
# If we are authenticating with a trust auth_kwargs are set, so set
# the context auth_token with the re-scoped trust token
if auth_kwargs:
# Sanity check
if not client_v2.auth_ref.trust_scoped:
logger.error("v2 trust token re-scoping failed!")
raise exception.AuthorizationFailure()
# All OK so update the context with the token
self.context.auth_token = client_v2.auth_ref.auth_token
self.context.auth_url = kwargs.get('auth_url')
return client_v2
@staticmethod
@@ -139,27 +161,24 @@ class KeystoneClient(object):
"auth_token!")
raise exception.AuthorizationFailure()
client_v3 = kc_v3.Client(**kwargs)
if not client_v3.authenticate():
logger.error("Keystone v3 API authentication failed")
raise exception.AuthorizationFailure()
return client_v3
client = kc_v3.Client(**kwargs)
# Have to explicitly authenticate() or client.auth_ref is None
client.authenticate()
return client
def create_trust_context(self):
"""
If cfg.CONF.deferred_auth_method is trusts, we create a
trust using the trustor identity in the current context, with the
trustee as the heat service user
trustee as the heat service user and return a context containing
the new trust_id
If deferred_auth_method != trusts, we do nothing
If the current context already contains a trust_id, we do nothing
If deferred_auth_method != trusts, or the current context already
contains a trust_id, we do nothing and return the current context
"""
if cfg.CONF.deferred_auth_method != 'trusts':
return
if self.context.trust_id:
return
return self.context
# We need the service admin user ID (not name), as the trustor user
# can't lookup the ID in keystoneclient unless they're admin
@@ -167,34 +186,27 @@ class KeystoneClient(object):
# then getting the user ID from the auth_ref
admin_creds = self._service_admin_creds()
admin_client = kc.Client(**admin_creds)
if not admin_client.authenticate():
logger.error("Keystone v2 API admin authentication failed")
raise exception.AuthorizationFailure()
trustee_user_id = admin_client.auth_ref['user']['id']
trustor_user_id = self.client_v3.auth_ref['user']['id']
trustor_project_id = self.client_v3.auth_ref['project']['id']
trustee_user_id = admin_client.auth_ref.user_id
trustor_user_id = self.client_v3.auth_ref.user_id
trustor_project_id = self.client_v3.auth_ref.project_id
roles = cfg.CONF.trusts_delegated_roles
trust = self.client_v3.trusts.create(trustor_user=trustor_user_id,
trustee_user=trustee_user_id,
project=trustor_project_id,
impersonation=True,
role_names=roles)
self.context.trust_id = trust.id
self.context.trustor_user_id = trustor_user_id
def delete_trust_context(self):
trust_context = context.RequestContext.from_dict(
self.context.to_dict())
trust_context.trust_id = trust.id
trust_context.trustor_user_id = trustor_user_id
return trust_context
def delete_trust(self, trust_id):
"""
If a trust_id exists in the context, we delete it
Delete the specified trust.
"""
if not self.context.trust_id:
return
self.client_v3.trusts.delete(self.context.trust_id)
self.context.trust_id = None
self.context.trustor_user_id = None
self.client_v3.trusts.delete(trust_id)
def create_stack_user(self, username, password=''):
"""

View File

@@ -16,6 +16,8 @@
import functools
import re
from oslo.config import cfg
from heat.engine import environment
from heat.common import exception
from heat.engine import dependencies
@@ -208,7 +210,13 @@ class Stack(object):
if self.id:
db_api.stack_update(self.context, self.id, s)
else:
new_creds = db_api.user_creds_create(self.context)
# Create a context containing a trust_id and trustor_user_id
# if trusts are enabled
if cfg.CONF.deferred_auth_method == 'trusts':
trust_context = self.clients.keystone().create_trust_context()
new_creds = db_api.user_creds_create(trust_context)
else:
new_creds = db_api.user_creds_create(self.context)
s['user_creds_id'] = new_creds.id
new_s = db_api.stack_create(self.context, s)
self.id = new_s.id
@@ -552,6 +560,13 @@ class Stack(object):
self.state_set(action, stack_status, reason)
if stack_status != self.FAILED:
# If we created a trust, delete it
stack = db_api.stack_get(self.context, self.id)
user_creds = db_api.user_creds_get(stack.user_creds_id)
trust_id = user_creds.get('trust_id')
if trust_id:
self.clients.keystone().delete_trust(trust_id)
# delete the stack
db_api.stack_delete(self.context, self.id)
self.id = None

View File

@@ -272,11 +272,6 @@ class EngineService(service.Service):
stack.validate()
# Creates a trust and sets the trust_id and trustor_user_id in
# the current context, before we store it in stack.store()
# Does nothing if deferred_auth_method is 'password'
stack.clients.keystone().create_trust_context()
stack_id = stack.store()
self._start_in_thread(stack_id, _stack_create, stack)
@@ -411,13 +406,6 @@ class EngineService(service.Service):
stack = parser.Stack.load(cnxt, stack=st)
# If we created a trust, delete it
# Note this is using the current request context, not the stored
# context, as it seems it's not possible to delete a trust with
# a token obtained via that trust. This means that only the user
# who created the stack can delete it when using trusts atm.
stack.clients.keystone().delete_trust_context()
# Kill any pending threads by calling ThreadGroup.stop()
if st.id in self.stg:
self.stg[st.id].stop()

View File

@@ -23,7 +23,6 @@ from testtools import matchers
from oslo.config import cfg
from heat.engine import environment
from heat.common import heat_keystoneclient as hkc
from heat.common import exception
from heat.tests.v1_1 import fakes
import heat.rpc.api as engine_api
@@ -300,16 +299,6 @@ class StackServiceCreateUpdateDeleteTest(HeatTestCase):
self.m.StubOutWithMock(stack, 'validate')
stack.validate().AndReturn(None)
self.m.StubOutClassWithMocks(hkc.kc, "Client")
mock_ks_client = hkc.kc.Client(
auth_url=mox.IgnoreArg(),
tenant_name='test_tenant',
token='abcd1234')
mock_ks_client.authenticate().AndReturn(True)
self.m.StubOutWithMock(hkc.KeystoneClient, 'create_trust_context')
hkc.KeystoneClient.create_trust_context().AndReturn(None)
self.m.StubOutWithMock(threadgroup, 'ThreadGroup')
threadgroup.ThreadGroup().AndReturn(DummyThreadGroup())
@@ -455,16 +444,6 @@ class StackServiceCreateUpdateDeleteTest(HeatTestCase):
stack.t,
stack.env).AndReturn(stack)
self.m.StubOutClassWithMocks(hkc.kc, "Client")
mock_ks_client = hkc.kc.Client(
auth_url=mox.IgnoreArg(),
tenant_name='test_tenant',
token='abcd1234')
mock_ks_client.authenticate().AndReturn(True)
self.m.StubOutWithMock(hkc.KeystoneClient, 'create_trust_context')
hkc.KeystoneClient.create_trust_context().AndReturn(None)
self.m.ReplayAll()
cfg.CONF.set_override('max_resources_per_stack', 3)
@@ -530,16 +509,6 @@ class StackServiceCreateUpdateDeleteTest(HeatTestCase):
parser.Stack.load(self.ctx, stack=s).AndReturn(stack)
self.m.StubOutClassWithMocks(hkc.kc, "Client")
mock_ks_client = hkc.kc.Client(
auth_url=mox.IgnoreArg(),
tenant_name='test_tenant',
token='abcd1234')
mock_ks_client.authenticate().AndReturn(True)
self.m.StubOutWithMock(hkc.KeystoneClient, 'delete_trust_context')
hkc.KeystoneClient.delete_trust_context().AndReturn(None)
self.man.tg = DummyThreadGroup()
self.m.ReplayAll()

View File

@@ -44,7 +44,8 @@ class KeystoneClientTest(HeatTestCase):
group='keystone_authtoken')
self.addCleanup(self.m.VerifyAll)
def _stubs_v2(self, method='token', auth_ok=True):
def _stubs_v2(self, method='token', auth_ok=True,
trust_scoped=True):
self.m.StubOutClassWithMocks(heat_keystoneclient.kc, "Client")
if method == 'token':
self.mock_ks_client = heat_keystoneclient.kc.Client(
@@ -60,9 +61,20 @@ class KeystoneClientTest(HeatTestCase):
username='test_username',
password='password')
self.mock_ks_client.authenticate().AndReturn(auth_ok)
if method == 'trust':
self.mock_ks_client = heat_keystoneclient.kc.Client(
auth_url='http://server.test:5000/v2.0',
password='verybadpass',
tenant_name='service',
username='heat')
self.mock_ks_client.authenticate(trust_id='atrust123',
tenant_id='test_tenant_id'
).AndReturn(auth_ok)
self.mock_ks_client.auth_ref = self.m.CreateMockAnything()
self.mock_ks_client.auth_ref.trust_scoped = trust_scoped
self.mock_ks_client.auth_ref.auth_token = 'atrusttoken'
def _stubs_v3(self, method='token', auth_ok=True):
self.m.StubOutClassWithMocks(heat_keystoneclient.kc, "Client")
self.m.StubOutClassWithMocks(heat_keystoneclient.kc_v3, "Client")
if method == 'token':
@@ -83,19 +95,8 @@ class KeystoneClientTest(HeatTestCase):
username='heat',
password='verybadpass',
project_name='service',
auth_url='http://server.test:5000/v3',
trust_id='atrust123')
auth_url='http://server.test:5000/v3')
self.mock_ks_v3_client.authenticate().AndReturn(auth_ok)
if auth_ok:
self.mock_ks_v3_client.auth_ref = self.m.CreateMockAnything()
self.mock_ks_v3_client.auth_ref.get('auth_token').AndReturn(
'av3token')
self.mock_ks_client = heat_keystoneclient.kc.Client(
auth_url=mox.IgnoreArg(),
tenant_name='test_tenant',
token='4b97cc1b2454e137ee2e8261e115bbe8')
self.mock_ks_client.authenticate().AndReturn(auth_ok)
def test_username_length(self):
"""Test that user names >64 characters are properly truncated."""
@@ -123,22 +124,24 @@ class KeystoneClientTest(HeatTestCase):
# the cleanup VerifyAll should verify that though we passed
# long_user_name, keystone was actually called with a truncated
# user name
heat_ks_client = heat_keystoneclient.KeystoneClient(
utils.dummy_context())
ctx = utils.dummy_context()
ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
heat_ks_client.create_stack_user(long_user_name, password='password')
def test_init_v2_password(self):
"""Test creating the client without trusts, user/password context."""
"""Test creating the client, user/password context."""
self._stubs_v2(method='password')
self.m.ReplayAll()
ctx = utils.dummy_context()
ctx.auth_token = None
ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertIsNotNone(heat_ks_client.client_v2)
self.assertIsNone(heat_ks_client.client_v3)
self.assertIsNone(heat_ks_client._client_v3)
def test_init_v2_bad_nocreds(self):
@@ -148,26 +151,14 @@ class KeystoneClientTest(HeatTestCase):
ctx.auth_token = None
ctx.username = None
ctx.password = None
ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertRaises(exception.AuthorizationFailure,
heat_keystoneclient.KeystoneClient, ctx)
def test_init_v2_bad_denied(self):
"""Test creating the client without trusts, auth failure."""
self._stubs_v2(method='password', auth_ok=False)
self.m.ReplayAll()
ctx = utils.dummy_context()
ctx.auth_token = None
self.assertRaises(exception.AuthorizationFailure,
heat_keystoneclient.KeystoneClient, ctx)
heat_ks_client._v2_client_init)
def test_init_v3_token(self):
"""Test creating the client with trusts, token auth."""
cfg.CONF.set_override('deferred_auth_method', 'trusts')
"""Test creating the client, token auth."""
self._stubs_v3()
self.m.ReplayAll()
@@ -175,15 +166,15 @@ class KeystoneClientTest(HeatTestCase):
ctx = utils.dummy_context()
ctx.username = None
ctx.password = None
ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertIsNotNone(heat_ks_client.client_v2)
self.assertIsNotNone(heat_ks_client.client_v3)
heat_ks_client.client_v3
self.assertIsNotNone(heat_ks_client._client_v3)
self.assertIsNone(heat_ks_client._client_v2)
def test_init_v3_password(self):
"""Test creating the client with trusts, password auth."""
cfg.CONF.set_override('deferred_auth_method', 'trusts')
"""Test creating the client, password auth."""
self._stubs_v3(method='password')
self.m.ReplayAll()
@@ -192,87 +183,60 @@ class KeystoneClientTest(HeatTestCase):
ctx.auth_token = None
ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertIsNotNone(heat_ks_client.client_v2)
self.assertIsNotNone(heat_ks_client.client_v3)
client_v3 = heat_ks_client.client_v3
self.assertIsNotNone(client_v3)
self.assertIsNone(heat_ks_client._client_v2)
def test_init_v3_bad_nocreds(self):
"""Test creating the client with trusts, no credentials."""
cfg.CONF.set_override('deferred_auth_method', 'trusts')
"""Test creating the client, no credentials."""
ctx = utils.dummy_context()
ctx.auth_token = None
ctx.trust_id = None
ctx.username = None
ctx.password = None
self.assertRaises(exception.AuthorizationFailure,
heat_keystoneclient.KeystoneClient, ctx)
def test_init_v3_bad_denied(self):
"""Test creating the client with trusts, auth failure."""
cfg.CONF.set_override('deferred_auth_method', 'trusts')
self._stubs_v3(method='password', auth_ok=False)
self.m.ReplayAll()
ctx = utils.dummy_context()
ctx.auth_token = None
ctx.trust_id = None
self.assertRaises(exception.AuthorizationFailure,
heat_keystoneclient.KeystoneClient, ctx)
def test_create_trust_context_notrust(self):
"""Test create_trust_context with trusts disabled."""
self._stubs_v2(method='password')
self.m.ReplayAll()
ctx = utils.dummy_context()
ctx.auth_token = None
ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertIsNone(heat_ks_client.create_trust_context())
self.assertRaises(exception.AuthorizationFailure,
heat_ks_client._v3_client_init)
def test_create_trust_context_trust_id(self):
"""Test create_trust_context with existing trust_id."""
cfg.CONF.set_override('deferred_auth_method', 'trusts')
self._stubs_v3()
self._stubs_v2(method='trust')
self.m.ReplayAll()
cfg.CONF.set_override('deferred_auth_method', 'trusts')
ctx = utils.dummy_context()
ctx.trust_id = 'atrust123'
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertIsNone(heat_ks_client.create_trust_context())
trust_context = heat_ks_client.create_trust_context()
self.assertEqual(trust_context.to_dict(), ctx.to_dict())
def test_create_trust_context_trust_create(self):
"""Test create_trust_context when creating a new trust."""
"""Test create_trust_context when creating a trust."""
cfg.CONF.set_override('deferred_auth_method', 'trusts')
class MockTrust(object):
id = 'atrust123'
self._stubs_v3()
self.m.StubOutClassWithMocks(heat_keystoneclient.kc, "Client")
mock_admin_client = heat_keystoneclient.kc.Client(
auth_url=mox.IgnoreArg(),
username='heat',
password='verybadpass',
tenant_name='service')
mock_admin_client.authenticate().AndReturn(True)
mock_admin_client.auth_ref = self.m.CreateMockAnything()
mock_admin_client.auth_ref.__getitem__('user').AndReturn(
{'id': '1234'})
self.mock_ks_v3_client.auth_ref.__getitem__('user').AndReturn(
{'id': '5678'})
self.mock_ks_v3_client.auth_ref.__getitem__('project').AndReturn(
{'id': '42'})
mock_admin_client.auth_ref.user_id = '1234'
self._stubs_v3()
self.mock_ks_v3_client.auth_ref = self.m.CreateMockAnything()
self.mock_ks_v3_client.auth_ref.user_id = '5678'
self.mock_ks_v3_client.auth_ref.project_id = '42'
self.mock_ks_v3_client.trusts = self.m.CreateMockAnything()
self.mock_ks_v3_client.trusts.create(
trustor_user='5678',
@@ -286,30 +250,9 @@ class KeystoneClientTest(HeatTestCase):
ctx = utils.dummy_context()
ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertIsNone(heat_ks_client.create_trust_context())
self.assertEqual(ctx.trust_id, 'atrust123')
self.assertEqual(ctx.trustor_user_id, '5678')
def test_create_trust_context_denied(self):
"""Test create_trust_context when creating admin auth fails."""
cfg.CONF.set_override('deferred_auth_method', 'trusts')
self._stubs_v3()
mock_admin_client = heat_keystoneclient.kc.Client(
auth_url=mox.IgnoreArg(),
username='heat',
password='verybadpass',
tenant_name='service')
mock_admin_client.authenticate().AndReturn(False)
self.m.ReplayAll()
ctx = utils.dummy_context()
ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertRaises(exception.AuthorizationFailure,
heat_ks_client.create_trust_context)
trust_context = heat_ks_client.create_trust_context()
self.assertEqual(trust_context.trust_id, 'atrust123')
self.assertEqual(trust_context.trustor_user_id, '5678')
def test_trust_init(self):
@@ -317,18 +260,67 @@ class KeystoneClientTest(HeatTestCase):
cfg.CONF.set_override('deferred_auth_method', 'trusts')
self._stubs_v3(method='trust')
self._stubs_v2(method='trust')
self.m.ReplayAll()
ctx = utils.dummy_context()
ctx.username = None
ctx.password = None
ctx.auth_token = None
ctx.trust_id = 'atrust123'
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
client_v2 = heat_ks_client.client_v2
self.assertIsNotNone(client_v2)
def test_delete_trust_context(self):
def test_trust_init_fail(self):
"""Test delete_trust_context when deleting trust."""
"""Test consuming a trust when initializing, error scoping."""
cfg.CONF.set_override('deferred_auth_method', 'trusts')
self._stubs_v2(method='trust', trust_scoped=False)
self.m.ReplayAll()
ctx = utils.dummy_context()
ctx.username = None
ctx.password = None
ctx.auth_token = None
ctx.trust_id = 'atrust123'
self.assertRaises(exception.AuthorizationFailure,
heat_keystoneclient.KeystoneClient, ctx)
def test_trust_init_pw(self):
"""Test trust_id is takes precedence username/password specified."""
self._stubs_v2(method='trust')
self.m.ReplayAll()
ctx = utils.dummy_context()
ctx.auth_token = None
ctx.trust_id = 'atrust123'
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertIsNotNone(heat_ks_client._client_v2)
self.assertIsNone(heat_ks_client._client_v3)
def test_trust_init_token(self):
"""Test trust_id takes precedence when token specified."""
self._stubs_v2(method='trust')
self.m.ReplayAll()
ctx = utils.dummy_context()
ctx.username = None
ctx.password = None
ctx.trust_id = 'atrust123'
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertIsNotNone(heat_ks_client._client_v2)
self.assertIsNone(heat_ks_client._client_v3)
def test_delete_trust(self):
"""Test delete_trust when deleting trust."""
cfg.CONF.set_override('deferred_auth_method', 'trusts')
@@ -339,17 +331,4 @@ class KeystoneClientTest(HeatTestCase):
self.m.ReplayAll()
ctx = utils.dummy_context()
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertIsNone(heat_ks_client.delete_trust_context())
def test_delete_trust_context_notrust(self):
"""Test delete_trust_context no trust_id specified."""
cfg.CONF.set_override('deferred_auth_method', 'trusts')
self._stubs_v3()
self.m.ReplayAll()
ctx = utils.dummy_context()
ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertIsNone(heat_ks_client.delete_trust_context())
self.assertIsNone(heat_ks_client.delete_trust(trust_id='atrust123'))

View File

@@ -587,7 +587,8 @@ class StackTest(HeatTestCase):
ctx = utils.dummy_context()
ctx.auth_token = None
self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
clients.OpenStackClients.keystone().AndReturn(FakeKeystoneClient())
clients.OpenStackClients.keystone().MultipleTimes().AndReturn(
FakeKeystoneClient())
self.m.ReplayAll()
stack = parser.Stack(ctx, 'test_stack', parser.Template({}))

View File

@@ -136,7 +136,6 @@ def dummy_context(user='test_username', tenant_id='test_tenant_id',
'username': user,
'password': password,
'roles': roles,
'trust_id': 'atrust123',
'auth_url': 'http://server.test:5000/v2.0',
'auth_token': 'abcd1234'
})