Allow v2 client authentication with trust_id
It should be possible to authenticate against the v2 tokens
API with a trust_id, because it suports rescoping an existing
token to a trust, this patch adds client support for this.
Note with the current keystone code it's necessary to pass the
trustor tenant ID when rescoping with a trust where
impersonation==True, e.g:
c = client_v2.Client(username=TRUSTEE_USERNAME,
                     password=TRUSTEE_USERNAME,
                     tenant_name=TRUSTEE_TENANT_NAME,
                     auth_url=OS_AUTH_URL_V2)
c.authenticate(trust_id=trust_i.id, tenant_id=TRUSTOR_TENANT_ID)
Change-Id: I177c41af298b7437e2c6fb437aa9ce9a09773b9d
Closes-Bug: #1231483
			
			
This commit is contained in:
		@@ -391,11 +391,11 @@ class AccessInfoV2(AccessInfo):
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def trust_id(self):
 | 
			
		||||
        return None
 | 
			
		||||
        return self.get('trust', {}).get('id')
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def trust_scoped(self):
 | 
			
		||||
        return False
 | 
			
		||||
        return 'trust' in self
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def project_id(self):
 | 
			
		||||
 
 | 
			
		||||
@@ -30,6 +30,7 @@ class TestCase(testtools.TestCase):
 | 
			
		||||
    TEST_TENANT_ID = '1'
 | 
			
		||||
    TEST_TENANT_NAME = 'aTenant'
 | 
			
		||||
    TEST_TOKEN = 'aToken'
 | 
			
		||||
    TEST_TRUST_ID = 'aTrust'
 | 
			
		||||
    TEST_USER = 'test'
 | 
			
		||||
 | 
			
		||||
    TEST_ROOT_URL = 'http://127.0.0.1:5000/'
 | 
			
		||||
 
 | 
			
		||||
@@ -173,6 +173,24 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
 | 
			
		||||
                         self.TEST_RESPONSE_DICT["access"]["token"]["id"])
 | 
			
		||||
        self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY)
 | 
			
		||||
 | 
			
		||||
    @httpretty.activate
 | 
			
		||||
    def test_authenticate_success_token_scoped_trust(self):
 | 
			
		||||
        del self.TEST_REQUEST_BODY['auth']['passwordCredentials']
 | 
			
		||||
        self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN}
 | 
			
		||||
        self.TEST_REQUEST_BODY['auth']['trust_id'] = self.TEST_TRUST_ID
 | 
			
		||||
        response = self.TEST_RESPONSE_DICT.copy()
 | 
			
		||||
        response['access']['trust'] = {"trustee_user_id": self.TEST_USER,
 | 
			
		||||
                                       "id": self.TEST_TRUST_ID}
 | 
			
		||||
        self.stub_auth(json=response)
 | 
			
		||||
 | 
			
		||||
        cs = client.Client(token=self.TEST_TOKEN,
 | 
			
		||||
                           tenant_id=self.TEST_TENANT_ID,
 | 
			
		||||
                           trust_id=self.TEST_TRUST_ID,
 | 
			
		||||
                           auth_url=self.TEST_URL)
 | 
			
		||||
        self.assertTrue(cs.auth_ref.trust_scoped)
 | 
			
		||||
        self.assertEqual(cs.auth_ref.trust_id, self.TEST_TRUST_ID)
 | 
			
		||||
        self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY)
 | 
			
		||||
 | 
			
		||||
    @httpretty.activate
 | 
			
		||||
    def test_authenticate_success_token_unscoped(self):
 | 
			
		||||
        del self.TEST_REQUEST_BODY['auth']['passwordCredentials']
 | 
			
		||||
 
 | 
			
		||||
@@ -142,6 +142,7 @@ class Client(httpclient.HTTPClient):
 | 
			
		||||
                                            password=None, tenant_name=None,
 | 
			
		||||
                                            tenant_id=None, token=None,
 | 
			
		||||
                                            project_name=None, project_id=None,
 | 
			
		||||
                                            trust_id=None,
 | 
			
		||||
                                            **kwargs):
 | 
			
		||||
        """Authenticate against the v2 Identity API.
 | 
			
		||||
 | 
			
		||||
@@ -157,6 +158,7 @@ class Client(httpclient.HTTPClient):
 | 
			
		||||
                                    tenant_id=project_id or tenant_id,
 | 
			
		||||
                                    tenant_name=project_name or tenant_name,
 | 
			
		||||
                                    password=password,
 | 
			
		||||
                                    trust_id=trust_id,
 | 
			
		||||
                                    token=token)
 | 
			
		||||
        except (exceptions.AuthorizationFailure, exceptions.Unauthorized):
 | 
			
		||||
            _logger.debug("Authorization Failed.")
 | 
			
		||||
@@ -166,11 +168,13 @@ class Client(httpclient.HTTPClient):
 | 
			
		||||
                                                  "%s" % e)
 | 
			
		||||
 | 
			
		||||
    def _base_authN(self, auth_url, username=None, password=None,
 | 
			
		||||
                    tenant_name=None, tenant_id=None, token=None):
 | 
			
		||||
                    tenant_name=None, tenant_id=None, trust_id=None,
 | 
			
		||||
                    token=None):
 | 
			
		||||
        """Takes a username, password, and optionally a tenant_id or
 | 
			
		||||
        tenant_name to get an authentication token from keystone.
 | 
			
		||||
        May also take a token and a tenant_id to re-scope a token
 | 
			
		||||
        to a tenant.
 | 
			
		||||
        to a tenant, or a token, tenant_id and trust_id and re-scope
 | 
			
		||||
        the token to the trust
 | 
			
		||||
        """
 | 
			
		||||
        headers = {}
 | 
			
		||||
        if auth_url is None:
 | 
			
		||||
@@ -188,5 +192,7 @@ class Client(httpclient.HTTPClient):
 | 
			
		||||
            params['auth']['tenantId'] = tenant_id
 | 
			
		||||
        elif tenant_name:
 | 
			
		||||
            params['auth']['tenantName'] = tenant_name
 | 
			
		||||
        if trust_id:
 | 
			
		||||
            params['auth']['trust_id'] = trust_id
 | 
			
		||||
        resp, body = self.request(url, 'POST', body=params, headers=headers)
 | 
			
		||||
        return resp, body
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user