* Implement correct certificate verification * Add requests to tools/pip-requires * Fix OS_CACERT env var help text * Add info to README * Rework tests to use requests Pinned requests module to < 1.0 as 1.0.2 is now current in pipi as of 17Dec2012. Change-Id: I120d2c12d6f20ebe2fd7182ec8988cc73f623b80
		
			
				
	
	
		
			118 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			118 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import copy
 | 
						|
import urlparse
 | 
						|
import json
 | 
						|
 | 
						|
import requests
 | 
						|
 | 
						|
from keystoneclient.v2_0 import endpoints
 | 
						|
from tests import utils
 | 
						|
 | 
						|
 | 
						|
class EndpointTests(utils.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        super(EndpointTests, self).setUp()
 | 
						|
        self.TEST_REQUEST_HEADERS = {
 | 
						|
            'X-Auth-Token': 'aToken',
 | 
						|
            'User-Agent': 'python-keystoneclient',
 | 
						|
        }
 | 
						|
        self.TEST_POST_HEADERS = {
 | 
						|
            'Content-Type': 'application/json',
 | 
						|
            'X-Auth-Token': 'aToken',
 | 
						|
            'User-Agent': 'python-keystoneclient',
 | 
						|
        }
 | 
						|
        self.TEST_ENDPOINTS = {
 | 
						|
            'endpoints': [
 | 
						|
                {
 | 
						|
                    'adminurl': 'http://host-1:8774/v1.1/$(tenant_id)s',
 | 
						|
                    'id': '8f9531231e044e218824b0e58688d262',
 | 
						|
                    'internalurl': 'http://host-1:8774/v1.1/$(tenant_id)s',
 | 
						|
                    'publicurl': 'http://host-1:8774/v1.1/$(tenant_id)s',
 | 
						|
                    'region': 'RegionOne',
 | 
						|
                },
 | 
						|
                {
 | 
						|
                    'adminurl': 'http://host-1:8774/v1.1/$(tenant_id)s',
 | 
						|
                    'id': '8f9531231e044e218824b0e58688d263',
 | 
						|
                    'internalurl': 'http://host-1:8774/v1.1/$(tenant_id)s',
 | 
						|
                    'publicurl': 'http://host-1:8774/v1.1/$(tenant_id)s',
 | 
						|
                    'region': 'RegionOne',
 | 
						|
                }
 | 
						|
            ]
 | 
						|
        }
 | 
						|
 | 
						|
    def test_create(self):
 | 
						|
        req_body = {
 | 
						|
            "endpoint": {
 | 
						|
                "region": "RegionOne",
 | 
						|
                "publicurl": "http://host-3:8774/v1.1/$(tenant_id)s",
 | 
						|
                "internalurl": "http://host-3:8774/v1.1/$(tenant_id)s",
 | 
						|
                "adminurl": "http://host-3:8774/v1.1/$(tenant_id)s",
 | 
						|
                "service_id": "e044e21",
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        resp_body = {
 | 
						|
            "endpoint": {
 | 
						|
                "adminurl": "http://host-3:8774/v1.1/$(tenant_id)s",
 | 
						|
                "region": "RegionOne",
 | 
						|
                "id": "1fd485b2ffd54f409a5ecd42cba11401",
 | 
						|
                "internalurl": "http://host-3:8774/v1.1/$(tenant_id)s",
 | 
						|
                "publicurl": "http://host-3:8774/v1.1/$(tenant_id)s",
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 200,
 | 
						|
            "text": json.dumps(resp_body),
 | 
						|
        })
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_POST_HEADERS
 | 
						|
        kwargs['data'] = json.dumps(req_body)
 | 
						|
        requests.request('POST',
 | 
						|
                         urlparse.urljoin(self.TEST_URL,
 | 
						|
                         'v2.0/endpoints'),
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        endpoint = self.client.endpoints.create(
 | 
						|
            region=req_body['endpoint']['region'],
 | 
						|
            publicurl=req_body['endpoint']['publicurl'],
 | 
						|
            adminurl=req_body['endpoint']['adminurl'],
 | 
						|
            internalurl=req_body['endpoint']['internalurl'],
 | 
						|
            service_id=req_body['endpoint']['service_id']
 | 
						|
        )
 | 
						|
        self.assertTrue(isinstance(endpoint, endpoints.Endpoint))
 | 
						|
 | 
						|
    def test_delete(self):
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 204,
 | 
						|
            "text": "",
 | 
						|
        })
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
        requests.request('DELETE',
 | 
						|
                         urlparse.urljoin(self.TEST_URL,
 | 
						|
                         'v2.0/endpoints/8f953'),
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        self.client.endpoints.delete('8f953')
 | 
						|
 | 
						|
    def test_list(self):
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 200,
 | 
						|
            "text": json.dumps(self.TEST_ENDPOINTS),
 | 
						|
        })
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
        requests.request('GET',
 | 
						|
                         urlparse.urljoin(self.TEST_URL,
 | 
						|
                         'v2.0/endpoints'),
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        endpoint_list = self.client.endpoints.list()
 | 
						|
        [self.assertTrue(isinstance(r, endpoints.Endpoint))
 | 
						|
         for r in endpoint_list]
 |