Files
python-keystoneclient/tests/v2_0/test_tenants.py
Charles V Bock f9fbca21ae Adds support for passing extra tenant attributes to keystone
Patch adds simple kwargs parsing to tenant update and create
methods allowing users to exercise the extras functionality
of keystone.

As-per keystone data model docs:
http://docs.openstack.org/developer/keystone/architecture.html#data-model

Change-Id: I1c375c70a0ab4519626d2238d65fa0695f953dc5
Fixes: bug #1180317
2013-08-12 15:22:34 -07:00

412 lines
14 KiB
Python

import copy
import json
import urlparse
import requests
from keystoneclient import exceptions
from keystoneclient.v2_0 import tenants
from tests import utils
class TenantTests(utils.TestCase):
def setUp(self):
super(TenantTests, self).setUp()
self.TEST_REQUEST_HEADERS = {
'X-Auth-Token': 'aToken',
'User-Agent': 'python-keystoneclient',
}
self.TEST_POST_HEADERS = {
'Content-Type': 'application/json',
'X-Auth-Token': 'aToken',
'User-Agent': 'python-keystoneclient',
}
self.TEST_TENANTS = {
"tenants": {
"values": [
{
"enabled": True,
"description": "A description change!",
"name": "invisible_to_admin",
"id": 3,
},
{
"enabled": True,
"description": "None",
"name": "demo",
"id": 2,
},
{
"enabled": True,
"description": "None",
"name": "admin",
"id": 1,
},
{
"extravalue01": "metadata01",
"enabled": True,
"description": "For testing extras",
"name": "test_extras",
"id": 4,
}
],
"links": [],
},
}
def test_create(self):
req_body = {
"tenant": {
"name": "tenantX",
"description": "Like tenant 9, but better.",
"enabled": True,
"extravalue01": "metadata01",
},
}
resp_body = {
"tenant": {
"name": "tenantX",
"enabled": True,
"id": 4,
"description": "Like tenant 9, but better.",
"extravalue01": "metadata01",
}
}
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp_body),
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_POST_HEADERS
kwargs['data'] = json.dumps(req_body)
requests.request('POST',
urlparse.urljoin(self.TEST_URL, 'v2.0/tenants'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
tenant = self.client.tenants.create(
req_body['tenant']['name'],
req_body['tenant']['description'],
req_body['tenant']['enabled'],
extravalue01=req_body['tenant']['extravalue01'],
name="dont overwrite priors")
self.assertTrue(isinstance(tenant, tenants.Tenant))
self.assertEqual(tenant.id, 4)
self.assertEqual(tenant.name, "tenantX")
self.assertEqual(tenant.description, "Like tenant 9, but better.")
self.assertEqual(tenant.extravalue01, "metadata01")
def test_duplicate_create(self):
req_body = {
"tenant": {
"name": "tenantX",
"description": "The duplicate tenant.",
"enabled": True
},
}
resp_body = {
"error": {
"message": "Conflict occurred attempting to store project.",
"code": 409,
"title": "Conflict",
}
}
resp = utils.TestResponse({
"status_code": 409,
"text": json.dumps(resp_body),
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_POST_HEADERS
kwargs['data'] = json.dumps(req_body)
requests.request('POST',
urlparse.urljoin(self.TEST_URL, 'v2.0/tenants'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
def create_duplicate_tenant():
self.client.tenants.create(req_body['tenant']['name'],
req_body['tenant']['description'],
req_body['tenant']['enabled'])
self.assertRaises(exceptions.Conflict, create_duplicate_tenant)
def test_delete(self):
resp = utils.TestResponse({
"status_code": 204,
"text": "",
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
requests.request('DELETE',
urlparse.urljoin(self.TEST_URL, 'v2.0/tenants/1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
self.client.tenants.delete(1)
def test_get(self):
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps({
'tenant': self.TEST_TENANTS['tenants']['values'][2],
}),
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
requests.request('GET',
urlparse.urljoin(self.TEST_URL, 'v2.0/tenants/1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
t = self.client.tenants.get(1)
self.assertTrue(isinstance(t, tenants.Tenant))
self.assertEqual(t.id, 1)
self.assertEqual(t.name, 'admin')
def test_list(self):
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_TENANTS),
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
requests.request('GET',
urlparse.urljoin(self.TEST_URL, 'v2.0/tenants'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
tenant_list = self.client.tenants.list()
[self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list]
def test_list_limit(self):
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_TENANTS),
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
requests.request('GET',
urlparse.urljoin(self.TEST_URL,
'v2.0/tenants?limit=1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
tenant_list = self.client.tenants.list(limit=1)
[self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list]
def test_list_marker(self):
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_TENANTS),
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
requests.request('GET',
urlparse.urljoin(self.TEST_URL,
'v2.0/tenants?marker=1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
tenant_list = self.client.tenants.list(marker=1)
[self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list]
def test_list_limit_marker(self):
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_TENANTS),
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
requests.request('GET',
urlparse.urljoin(self.TEST_URL,
'v2.0/tenants?marker=1&limit=1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
tenant_list = self.client.tenants.list(limit=1, marker=1)
[self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list]
def test_update(self):
req_body = {
"tenant": {
"id": 4,
"name": "tenantX",
"description": "I changed you!",
"enabled": False,
"extravalue01": "metadataChanged",
#"extraname": "dontoverwrite!",
},
}
resp_body = {
"tenant": {
"name": "tenantX",
"enabled": False,
"id": 4,
"description": "I changed you!",
"extravalue01": "metadataChanged",
},
}
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp_body),
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_POST_HEADERS
kwargs['data'] = json.dumps(req_body)
requests.request('POST',
urlparse.urljoin(self.TEST_URL,
'v2.0/tenants/4'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
tenant = self.client.tenants.update(
req_body['tenant']['id'],
req_body['tenant']['name'],
req_body['tenant']['description'],
req_body['tenant']['enabled'],
extravalue01=req_body['tenant']['extravalue01'],
name="dont overwrite priors")
self.assertTrue(isinstance(tenant, tenants.Tenant))
self.assertEqual(tenant.id, 4)
self.assertEqual(tenant.name, "tenantX")
self.assertEqual(tenant.description, "I changed you!")
self.assertFalse(tenant.enabled)
self.assertEqual(tenant.extravalue01, "metadataChanged")
def test_update_empty_description(self):
req_body = {
"tenant": {
"id": 4,
"name": "tenantX",
"description": "",
"enabled": False,
},
}
resp_body = {
"tenant": {
"name": "tenantX",
"enabled": False,
"id": 4,
"description": "",
},
}
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp_body),
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_POST_HEADERS
kwargs['data'] = json.dumps(req_body)
requests.request('POST',
urlparse.urljoin(self.TEST_URL,
'v2.0/tenants/4'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
tenant = self.client.tenants.update(req_body['tenant']['id'],
req_body['tenant']['name'],
req_body['tenant']['description'],
req_body['tenant']['enabled'])
self.assertTrue(isinstance(tenant, tenants.Tenant))
self.assertEqual(tenant.id, 4)
self.assertEqual(tenant.name, "tenantX")
self.assertEqual(tenant.description, "")
self.assertFalse(tenant.enabled)
def test_add_user(self):
resp = utils.TestResponse({
"status_code": 204,
"text": '',
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
requests.request('PUT',
urlparse.urljoin(self.TEST_URL,
'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
self.client.tenants.add_user('4', 'foo', 'barrr')
def test_remove_user(self):
resp = utils.TestResponse({
"status_code": 204,
"text": '',
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
requests.request('DELETE',
urlparse.urljoin(self.TEST_URL,
'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
self.client.tenants.remove_user('4', 'foo', 'barrr')
def test_tenant_add_user(self):
req_body = {
"tenant": {
"id": 4,
"name": "tenantX",
"description": "I changed you!",
"enabled": False,
},
}
resp = utils.TestResponse({
"status_code": 204,
"text": '',
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
requests.request('PUT',
urlparse.urljoin(self.TEST_URL,
'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
# make tenant object with manager
tenant = self.client.tenants.resource_class(self.client.tenants,
req_body['tenant'])
tenant.add_user('foo', 'barrr')
self.assertTrue(isinstance(tenant, tenants.Tenant))
def test_tenant_remove_user(self):
req_body = {
"tenant": {
"id": 4,
"name": "tenantX",
"description": "I changed you!",
"enabled": False,
},
}
resp = utils.TestResponse({
"status_code": 204,
"text": '',
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
requests.request('DELETE',
urlparse.urljoin(self.TEST_URL,
'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
# make tenant object with manager
tenant = self.client.tenants.resource_class(self.client.tenants,
req_body['tenant'])
tenant.remove_user('foo', 'barrr')
self.assertTrue(isinstance(tenant, tenants.Tenant))