support ec2 crud calls

This is to support blueprint generate-ec2-access-secret

Change-Id: I4474acc7d2193c4b04ecb11028d8ecb13e523266
This commit is contained in:
Jesse Andrews
2012-01-06 16:06:24 -08:00
parent 64c42c838e
commit 716d5ef25a
3 changed files with 199 additions and 0 deletions

View File

@@ -17,6 +17,7 @@ import logging
from keystoneclient import client
from keystoneclient import exceptions
from keystoneclient import service_catalog
from keystoneclient.v2_0 import ec2
from keystoneclient.v2_0 import roles
from keystoneclient.v2_0 import services
from keystoneclient.v2_0 import tenants
@@ -69,6 +70,10 @@ class Client(client.HTTPClient):
self.users = users.UserManager(self)
# NOTE(gabriel): If we have a pre-defined endpoint then we can
# get away with lazy auth. Otherwise auth immediately.
# extensions
self.ec2 = ec2.CredentialsManager(self)
if endpoint is None:
self.authenticate()
else:

View File

@@ -0,0 +1,62 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient import base
class EC2(base.Resource):
def __repr__(self):
return "<EC2 %s>" % self._info
def delete(self):
return self.manager.delete(self)
class CredentialsManager(base.ManagerWithFind):
resource_class = EC2
def create(self, user_id, tenant_id):
"""
Create a new access/secret pair for the user/tenant pair
:rtype: object of type :class:`EC2`
"""
params = {'tenant_id': tenant_id}
return self._create('/users/%s/credentials/OS-EC2' % user_id,
params, "credential")
def list(self, user_id):
"""
Get a list of access/secret pairs for a user_id
:rtype: list of :class:`EC2`
"""
return self._list("/users/%s/credentials/OS-EC2" % user_id,
"credentials")
def get(self, user_id, access):
"""
Get the access/secret pair for a given access key
:rtype: object of type :class:`EC2`
"""
return self._get("/users/%s/credentials/OS-EC2/%s" %
(user_id, base.getid(access)), "credential")
def delete(self, user_id, access):
"""
Delete an access/secret pair for a user
"""
return self._delete("/users/%s/credentials/OS-EC2/%s" %
(user_id, base.getid(access)))

132
tests/v2_0/test_ec2.py Normal file
View File

@@ -0,0 +1,132 @@
import urlparse
import json
import httplib2
from keystoneclient.v2_0 import ec2
from tests import utils
class EC2Tests(utils.TestCase):
def setUp(self):
super(EC2Tests, self).setUp()
self.TEST_REQUEST_HEADERS = {'X-Auth-Token': 'aToken',
'User-Agent': 'python-keystoneclient'}
self.TEST_POST_HEADERS = {'Content-Type': 'application/json',
'X-Auth-Token': 'aToken',
'User-Agent': 'python-keystoneclient'}
def test_create(self):
user_id = 'usr'
tenant_id = 'tnt'
req_body = {"tenant_id": tenant_id}
resp_body = {"credential": {"access_key": "access",
"secret_key": "secret",
"tenant_id": tenant_id,
"created": "12/12/12",
"enabled": True}}
resp = httplib2.Response({
"status": 200,
"body": json.dumps(resp_body),
})
url = urlparse.urljoin(self.TEST_URL,
'v2.0/users/%s/credentials/OS-EC2' % user_id)
httplib2.Http.request(url,
'POST',
body=json.dumps(req_body),
headers=self.TEST_POST_HEADERS) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
cred = self.client.ec2.create(user_id, tenant_id)
self.assertTrue(isinstance(cred, ec2.EC2))
self.assertEqual(cred.tenant_id, tenant_id)
self.assertEqual(cred.enabled, True)
self.assertEqual(cred.access_key, 'access')
self.assertEqual(cred.secret_key, 'secret')
def test_get(self):
user_id = 'usr'
tenant_id = 'tnt'
resp_body = {"credential": {"access_key": "access",
"secret_key": "secret",
"tenant_id": tenant_id,
"created": "12/12/12",
"enabled": True}}
resp = httplib2.Response({
"status": 200,
"body": json.dumps(resp_body),
})
url = urlparse.urljoin(self.TEST_URL,
'v2.0/users/%s/credentials/OS-EC2/%s'
'?fresh=1234' % (user_id, 'access'))
httplib2.Http.request(url,
'GET',
headers=self.TEST_REQUEST_HEADERS) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
cred = self.client.ec2.get(user_id, 'access')
self.assertTrue(isinstance(cred, ec2.EC2))
self.assertEqual(cred.tenant_id, tenant_id)
self.assertEqual(cred.enabled, True)
self.assertEqual(cred.access_key, 'access')
self.assertEqual(cred.secret_key, 'secret')
def test_list(self):
user_id = 'usr'
tenant_id = 'tnt'
resp_body = {"credentials": {
"values": [
{"access_key": "access",
"secret_key": "secret",
"tenant_id": tenant_id,
"created": "12/12/12",
"enabled": True},
{"access_key": "another",
"secret_key": "key",
"tenant_id": tenant_id,
"created": "12/12/31",
"enabled": True}]}}
resp = httplib2.Response({
"status": 200,
"body": json.dumps(resp_body),
})
url = urlparse.urljoin(self.TEST_URL,
'v2.0/users/%s/credentials/OS-EC2?fresh=1234' % user_id)
httplib2.Http.request(url,
'GET',
headers=self.TEST_REQUEST_HEADERS) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
creds = self.client.ec2.list(user_id)
self.assertTrue(len(creds), 2)
cred = creds[0]
self.assertTrue(isinstance(cred, ec2.EC2))
self.assertEqual(cred.tenant_id, tenant_id)
self.assertEqual(cred.enabled, True)
self.assertEqual(cred.access_key, 'access')
self.assertEqual(cred.secret_key, 'secret')
def test_delete(self):
user_id = 'usr'
access = 'access'
resp = httplib2.Response({
"status": 200,
"body": ""
})
url = urlparse.urljoin(self.TEST_URL,
'v2.0/users/%s/credentials/OS-EC2/%s' % (user_id, access))
httplib2.Http.request(url,
'DELETE',
headers=self.TEST_REQUEST_HEADERS) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
self.client.ec2.delete(user_id, access)