- 204 No Content should be mocked with empty response bodies - Content-Type headers should not be mocked with empty response bodies - httplib2 would never return None as a response body - The Identity API never expects a req/resp body with a string value of "null" Change-Id: Ie22e8e5288573268165ed06049978195955f8ca6
		
			
				
	
	
		
			223 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			223 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import urlparse
 | 
						|
import json
 | 
						|
 | 
						|
import httplib2
 | 
						|
 | 
						|
from keystoneclient.v2_0 import users
 | 
						|
from tests import utils
 | 
						|
 | 
						|
 | 
						|
class UserTests(utils.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        super(UserTests, self).setUp()
 | 
						|
        self.TEST_REQUEST_HEADERS = {
 | 
						|
            'X-Auth-Token': 'aToken',
 | 
						|
            'User-Agent': 'python-keystoneclient',
 | 
						|
        }
 | 
						|
        self.TEST_POST_HEADERS = {
 | 
						|
            'Content-Type': 'application/json',
 | 
						|
            'X-Auth-Token': 'aToken',
 | 
						|
            'User-Agent': 'python-keystoneclient',
 | 
						|
        }
 | 
						|
        self.TEST_USERS = {
 | 
						|
            "users": {
 | 
						|
                "values": [
 | 
						|
                    {
 | 
						|
                        "email": "None",
 | 
						|
                        "enabled": True,
 | 
						|
                        "id": 1,
 | 
						|
                        "name": "admin",
 | 
						|
                    },
 | 
						|
                    {
 | 
						|
                        "email": "None",
 | 
						|
                        "enabled": True,
 | 
						|
                        "id": 2,
 | 
						|
                        "name": "demo",
 | 
						|
                    },
 | 
						|
                ]
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
    def test_create(self):
 | 
						|
        req_body = {
 | 
						|
            "user": {
 | 
						|
                "name": "gabriel",
 | 
						|
                "password": "test",
 | 
						|
                "tenantId": 2,
 | 
						|
                "email": "test@example.com",
 | 
						|
                "enabled": True,
 | 
						|
            }
 | 
						|
        }
 | 
						|
        resp_body = {
 | 
						|
            "user": {
 | 
						|
                "name": "gabriel",
 | 
						|
                "enabled": True,
 | 
						|
                "tenantId": 2,
 | 
						|
                "id": 3,
 | 
						|
                "password": "test",
 | 
						|
                "email": "test@example.com",
 | 
						|
            }
 | 
						|
        }
 | 
						|
        resp = httplib2.Response({
 | 
						|
            "status": 200,
 | 
						|
            "body": json.dumps(resp_body),
 | 
						|
        })
 | 
						|
 | 
						|
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL, 'v2.0/users'),
 | 
						|
                              'POST',
 | 
						|
                              body=json.dumps(req_body),
 | 
						|
                              headers=self.TEST_POST_HEADERS) \
 | 
						|
            .AndReturn((resp, resp['body']))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        user = self.client.users.create(req_body['user']['name'],
 | 
						|
                                        req_body['user']['password'],
 | 
						|
                                        req_body['user']['email'],
 | 
						|
                                        tenant_id=req_body['user']['tenantId'],
 | 
						|
                                        enabled=req_body['user']['enabled'])
 | 
						|
        self.assertTrue(isinstance(user, users.User))
 | 
						|
        self.assertEqual(user.id, 3)
 | 
						|
        self.assertEqual(user.name, "gabriel")
 | 
						|
        self.assertEqual(user.email, "test@example.com")
 | 
						|
 | 
						|
    def test_delete(self):
 | 
						|
        resp = httplib2.Response({
 | 
						|
            "status": 204,
 | 
						|
            "body": "",
 | 
						|
        })
 | 
						|
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL, 'v2.0/users/1'),
 | 
						|
                              'DELETE',
 | 
						|
                              headers=self.TEST_REQUEST_HEADERS) \
 | 
						|
            .AndReturn((resp, resp['body']))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        self.client.users.delete(1)
 | 
						|
 | 
						|
    def test_get(self):
 | 
						|
        resp = httplib2.Response({
 | 
						|
            "status": 200,
 | 
						|
            "body": json.dumps({
 | 
						|
                'user': self.TEST_USERS['users']['values'][0],
 | 
						|
            })
 | 
						|
        })
 | 
						|
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
 | 
						|
                              'v2.0/users/1'),
 | 
						|
                              'GET',
 | 
						|
                              headers=self.TEST_REQUEST_HEADERS) \
 | 
						|
            .AndReturn((resp, resp['body']))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        u = self.client.users.get(1)
 | 
						|
        self.assertTrue(isinstance(u, users.User))
 | 
						|
        self.assertEqual(u.id, 1)
 | 
						|
        self.assertEqual(u.name, 'admin')
 | 
						|
 | 
						|
    def test_list(self):
 | 
						|
        resp = httplib2.Response({
 | 
						|
            "status": 200,
 | 
						|
            "body": json.dumps(self.TEST_USERS),
 | 
						|
        })
 | 
						|
 | 
						|
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
 | 
						|
                              'v2.0/users'),
 | 
						|
                              'GET',
 | 
						|
                              headers=self.TEST_REQUEST_HEADERS) \
 | 
						|
            .AndReturn((resp, resp['body']))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        user_list = self.client.users.list()
 | 
						|
        [self.assertTrue(isinstance(u, users.User)) for u in user_list]
 | 
						|
 | 
						|
    def test_list_limit(self):
 | 
						|
        resp = httplib2.Response({
 | 
						|
            "status": 200,
 | 
						|
            "body": json.dumps(self.TEST_USERS),
 | 
						|
        })
 | 
						|
 | 
						|
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
 | 
						|
                              'v2.0/users?limit=1'),
 | 
						|
                              'GET',
 | 
						|
                              headers=self.TEST_REQUEST_HEADERS) \
 | 
						|
            .AndReturn((resp, resp['body']))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        user_list = self.client.users.list(limit=1)
 | 
						|
        [self.assertTrue(isinstance(u, users.User)) for u in user_list]
 | 
						|
 | 
						|
    def test_list_marker(self):
 | 
						|
        resp = httplib2.Response({
 | 
						|
            "status": 200,
 | 
						|
            "body": json.dumps(self.TEST_USERS),
 | 
						|
        })
 | 
						|
 | 
						|
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
 | 
						|
                              'v2.0/users?marker=1'),
 | 
						|
                              'GET',
 | 
						|
                              headers=self.TEST_REQUEST_HEADERS) \
 | 
						|
            .AndReturn((resp, resp['body']))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        user_list = self.client.users.list(marker=1)
 | 
						|
        [self.assertTrue(isinstance(u, users.User)) for u in user_list]
 | 
						|
 | 
						|
    def test_list_limit_marker(self):
 | 
						|
        resp = httplib2.Response({
 | 
						|
            "status": 200,
 | 
						|
            "body": json.dumps(self.TEST_USERS),
 | 
						|
        })
 | 
						|
 | 
						|
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
 | 
						|
                              'v2.0/users?marker=1&limit=1'),
 | 
						|
                              'GET',
 | 
						|
                              headers=self.TEST_REQUEST_HEADERS) \
 | 
						|
            .AndReturn((resp, resp['body']))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        user_list = self.client.users.list(limit=1, marker=1)
 | 
						|
        [self.assertTrue(isinstance(u, users.User)) for u in user_list]
 | 
						|
 | 
						|
    def test_update(self):
 | 
						|
        req_1 = {"user": {"password": "swordfish", "id": 2}}
 | 
						|
        req_2 = {"user": {"id": 2,
 | 
						|
                          "email": "gabriel@example.com",
 | 
						|
                          "name": "gabriel"}}
 | 
						|
        req_3 = {"user": {"tenantId": 1, "id": 2}}
 | 
						|
        req_4 = {"user": {"enabled": False, "id": 2}}
 | 
						|
        # Keystone basically echoes these back... including the password :-/
 | 
						|
        resp_1 = httplib2.Response({"status": 200, "body": json.dumps(req_1)})
 | 
						|
        resp_2 = httplib2.Response({"status": 200, "body": json.dumps(req_2)})
 | 
						|
        resp_3 = httplib2.Response({"status": 200, "body": json.dumps(req_3)})
 | 
						|
        resp_4 = httplib2.Response({"status": 200, "body": json.dumps(req_3)})
 | 
						|
 | 
						|
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL, 'v2.0/users/2'),
 | 
						|
                              'PUT',
 | 
						|
                              body=json.dumps(req_2),
 | 
						|
                              headers=self.TEST_POST_HEADERS) \
 | 
						|
            .AndReturn((resp_2, resp_2['body']))
 | 
						|
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
 | 
						|
                              'v2.0/users/2/OS-KSADM/password'),
 | 
						|
                              'PUT',
 | 
						|
                              body=json.dumps(req_1),
 | 
						|
                              headers=self.TEST_POST_HEADERS) \
 | 
						|
            .AndReturn((resp_1, resp_1['body']))
 | 
						|
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
 | 
						|
                              'v2.0/users/2/OS-KSADM/tenant'),
 | 
						|
                              'PUT',
 | 
						|
                              body=json.dumps(req_3),
 | 
						|
                              headers=self.TEST_POST_HEADERS) \
 | 
						|
            .AndReturn((resp_3, resp_3['body']))
 | 
						|
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
 | 
						|
                              'v2.0/users/2/OS-KSADM/enabled'),
 | 
						|
                              'PUT',
 | 
						|
                              body=json.dumps(req_4),
 | 
						|
                              headers=self.TEST_POST_HEADERS) \
 | 
						|
            .AndReturn((resp_4, resp_4['body']))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        user = self.client.users.update(2,
 | 
						|
                                        name='gabriel',
 | 
						|
                                        email='gabriel@example.com')
 | 
						|
        user = self.client.users.update_password(2, 'swordfish')
 | 
						|
        user = self.client.users.update_tenant(2, 1)
 | 
						|
        user = self.client.users.update_enabled(2, False)
 |