Merge "heat_keystoneclient convert get_ec2_keypair to v3 API"

This commit is contained in:
Jenkins 2014-01-30 06:20:06 +00:00 committed by Gerrit Code Review
commit 5261b5beff
2 changed files with 107 additions and 6 deletions

View File

@ -31,6 +31,8 @@ from heat.openstack.common.gettextutils import _
logger = logging.getLogger('heat.common.keystoneclient')
AccessKey = namedtuple('AccessKey', ['id', 'access', 'secret'])
class KeystoneClient(object):
"""
@ -283,9 +285,31 @@ class KeystoneClient(object):
def delete_ec2_keypair(self, user_id, accesskey):
self.client_v2.ec2.delete(user_id, accesskey)
def get_ec2_keypair(self, access, user_id=None):
uid = user_id or self.client_v2.auth_ref.user_id
return self.client_v2.ec2.get(uid, access)
def get_ec2_keypair(self, credential_id=None, access=None, user_id=None):
'''Get an ec2 keypair via v3/credentials, by id or access.'''
# Note v3/credentials does not support filtering by access
# because it's stored in the credential blob, so we expect
# all resources to pass credential_id except where backwards
# compatibility is required (resource only has acccess stored)
# then we'll have to to a brute-force lookup locally
if credential_id:
cred = self.client_v3.credentials.get(credential_id)
ec2_creds = json.loads(cred.blob)
return AccessKey(id=cred.id,
access=ec2_creds['access'],
secret=ec2_creds['secret'])
elif access:
# FIXME(shardy): add filtering for user_id when keystoneclient
# extensible-crud-manager-operations bp lands
credentials = self.client_v3.credentials.list()
for cr in credentials:
ec2_creds = json.loads(cr.blob)
if ec2_creds.get('access') == access:
return AccessKey(id=cr.id,
access=ec2_creds['access'],
secret=ec2_creds['secret'])
else:
raise ValueError("Must specify either credential_id or access")
def create_ec2_keypair(self, user_id=None):
user_id = user_id or self.client_v3.auth_ref.user_id
@ -296,11 +320,10 @@ class KeystoneClient(object):
user=user_id, type='ec2', data=json.dumps(data_blob),
project=project_id)
# Return a namedtuple for easier access to the blob contents
# Return a AccessKey namedtuple for easier access to the blob contents
# We return the id as the v3 api provides no way to filter by
# access in the blob contents, so it will be much more efficient
# if we manage credentials by ID instead
AccessKey = namedtuple('AccessKey', ['id', 'access', 'secret'])
return AccessKey(id=ec2_creds.id,
access=data_blob['access'],
secret=data_blob['secret'])

View File

@ -96,7 +96,7 @@ class KeystoneClientTest(HeatTestCase):
self.mock_ks_client.auth_ref.auth_token = 'atrusttoken'
self.mock_ks_client.auth_ref.user_id = user_id
def _stubs_v3(self, method='token', auth_ok=True):
def _stubs_v3(self, method='token', auth_ok=True, user_id=None):
self._stub_config()
self.m.StubOutClassWithMocks(heat_keystoneclient.kc_v3, "Client")
@ -132,6 +132,9 @@ class KeystoneClientTest(HeatTestCase):
insecure=False,
key=None)
self.mock_ks_v3_client.authenticate().AndReturn(auth_ok)
if user_id:
self.mock_ks_v3_client.auth_ref = self.m.CreateMockAnything()
self.mock_ks_v3_client.auth_ref.user_id = user_id
def test_username_length(self):
"""Test that user names >64 characters are properly truncated."""
@ -505,3 +508,78 @@ class KeystoneClientTest(HeatTestCase):
self.assertEqual('123456', ec2_cred.id)
self.assertEqual('dummy_access', ec2_cred.access)
self.assertEqual('dummy_secret', ec2_cred.secret)
def test_get_ec2_keypair_id(self):
"""Test getting ec2 credential by id."""
user_id = 'atestuser'
self._stubs_v3(user_id=user_id)
ctx = utils.dummy_context()
ctx.trust_id = None
ex_data = {'access': 'access123',
'secret': 'secret456'}
ex_data_json = json.dumps(ex_data)
# Create a mock credential response
credential_id = 'acredential123'
mock_credential = self.m.CreateMockAnything()
mock_credential.id = credential_id
mock_credential.user_id = user_id
mock_credential.blob = ex_data_json
mock_credential.type = 'ec2'
# mock keystone client get function
self.mock_ks_v3_client.credentials = self.m.CreateMockAnything()
self.mock_ks_v3_client.credentials.get(
credential_id).AndReturn(mock_credential)
self.m.ReplayAll()
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
ec2_cred = heat_ks_client.get_ec2_keypair(credential_id=credential_id)
self.assertEqual(credential_id, ec2_cred.id)
self.assertEqual('access123', ec2_cred.access)
self.assertEqual('secret456', ec2_cred.secret)
def test_get_ec2_keypair_access(self):
"""Test getting ec2 credential by access."""
user_id = 'atestuser'
self._stubs_v3(user_id=user_id)
ctx = utils.dummy_context()
ctx.trust_id = None
# Create a mock credential list response
mock_credential_list = []
for x in (1, 2, 3):
mock_credential = self.m.CreateMockAnything()
mock_credential.id = 'credential_id%s' % x
mock_credential.user_id = user_id
mock_credential.blob = json.dumps({'access': 'access%s' % x,
'secret': 'secret%s' % x})
mock_credential.type = 'ec2'
mock_credential_list.append(mock_credential)
# mock keystone client list function
self.mock_ks_v3_client.credentials = self.m.CreateMockAnything()
self.mock_ks_v3_client.credentials.list().AndReturn(
mock_credential_list)
self.m.ReplayAll()
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
ec2_cred = heat_ks_client.get_ec2_keypair(access='access2')
self.assertEqual('credential_id2', ec2_cred.id)
self.assertEqual('access2', ec2_cred.access)
self.assertEqual('secret2', ec2_cred.secret)
def test_get_ec2_keypair_error(self):
"""Test getting ec2 credential error path."""
ctx = utils.dummy_context()
ctx.trust_id = None
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
self.assertRaises(ValueError, heat_ks_client.get_ec2_keypair)