Initial Trusts support
Implements client support for the basic trusts API operations, note this does not include support for the roles subpath operations, support for those can be added in a subsequent patch. Change-Id: I0c6ba12bad5cc8f3f10697d2a3dcf4f3be8c7ece blueprint: delegation-impersonation-support
This commit is contained in:
@@ -199,6 +199,23 @@ class AccessInfo(dict):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def trust_id(self):
|
||||
"""Returns the trust id associated with the authentication token.
|
||||
|
||||
:returns: str or None (if no trust associated with the token)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def trust_scoped(self):
|
||||
"""Returns true if the authorization token was scoped as delegated in a
|
||||
trust, via the OS-TRUST v3 extension.
|
||||
|
||||
:returns: bool
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def project_id(self):
|
||||
"""Returns the project ID associated with the authentication
|
||||
@@ -330,6 +347,14 @@ class AccessInfoV2(AccessInfo):
|
||||
def domain_scoped(self):
|
||||
return False
|
||||
|
||||
@property
|
||||
def trust_id(self):
|
||||
return None
|
||||
|
||||
@property
|
||||
def trust_scoped(self):
|
||||
return False
|
||||
|
||||
@property
|
||||
def project_id(self):
|
||||
tenant_dict = self['token'].get('tenant', None)
|
||||
@@ -448,6 +473,14 @@ class AccessInfoV3(AccessInfo):
|
||||
def domain_scoped(self):
|
||||
return 'domain' in self
|
||||
|
||||
@property
|
||||
def trust_id(self):
|
||||
return self.get('OS-TRUST:trust', {}).get('id')
|
||||
|
||||
@property
|
||||
def trust_scoped(self):
|
||||
return 'OS-TRUST:trust' in self
|
||||
|
||||
@property
|
||||
def auth_url(self):
|
||||
if self.service_catalog:
|
||||
|
@@ -126,7 +126,7 @@ class HTTPClient(object):
|
||||
stale_duration=None, user_id=None, user_domain_id=None,
|
||||
user_domain_name=None, domain_id=None, domain_name=None,
|
||||
project_id=None, project_name=None, project_domain_id=None,
|
||||
project_domain_name=None):
|
||||
project_domain_name=None, trust_id=None):
|
||||
"""Construct a new http client
|
||||
|
||||
:param string user_id: User ID for authentication. (optional)
|
||||
@@ -196,6 +196,7 @@ class HTTPClient(object):
|
||||
:param string tenant_id: Tenant id. (optional)
|
||||
The tenant_id keyword argument is
|
||||
deprecated, use project_id instead.
|
||||
:param string trust_id: Trust ID for trust scoping. (optional)
|
||||
|
||||
"""
|
||||
# set baseline defaults
|
||||
@@ -217,6 +218,8 @@ class HTTPClient(object):
|
||||
self.management_url = None
|
||||
self.timeout = float(timeout) if timeout is not None else None
|
||||
|
||||
self.trust_id = None
|
||||
|
||||
# if loading from a dictionary passed in via auth_ref,
|
||||
# load values from AccessInfo parsing that dictionary
|
||||
if auth_ref:
|
||||
@@ -233,6 +236,7 @@ class HTTPClient(object):
|
||||
self.auth_url = self.auth_ref.auth_url[0]
|
||||
self.management_url = self.auth_ref.management_url[0]
|
||||
self.auth_token = self.auth_ref.auth_token
|
||||
self.trust_id = self.auth_ref.trust_id
|
||||
else:
|
||||
self.auth_ref = None
|
||||
|
||||
@@ -276,6 +280,10 @@ class HTTPClient(object):
|
||||
if project_domain_name:
|
||||
self.project_domain_name = project_domain_name
|
||||
|
||||
# trust-related attributes
|
||||
if trust_id:
|
||||
self.trust_id = trust_id
|
||||
|
||||
# endpoint selection
|
||||
if auth_url:
|
||||
self.auth_url = auth_url.rstrip('/')
|
||||
@@ -361,7 +369,7 @@ class HTTPClient(object):
|
||||
user_id=None, domain_name=None, domain_id=None,
|
||||
project_name=None, project_id=None, user_domain_id=None,
|
||||
user_domain_name=None, project_domain_id=None,
|
||||
project_domain_name=None):
|
||||
project_domain_name=None, trust_id=None):
|
||||
"""Authenticate user.
|
||||
|
||||
Uses the data provided at instantiation to authenticate against
|
||||
@@ -382,6 +390,9 @@ class HTTPClient(object):
|
||||
will be 'unscoped' and limited in capabilities until a fully-scoped
|
||||
token is acquired.
|
||||
|
||||
With the v3 API, with the OS-TRUST extension enabled, the trust_id can
|
||||
be provided to allow project-specific role delegation between users
|
||||
|
||||
If successful, sets the self.auth_ref and self.auth_token with
|
||||
the returned token. If not already set, will also set
|
||||
self.management_url from the details provided in the token.
|
||||
@@ -416,6 +427,8 @@ class HTTPClient(object):
|
||||
project_domain_id = project_domain_id or self.project_domain_id
|
||||
project_domain_name = project_domain_name or self.project_domain_name
|
||||
|
||||
trust_id = trust_id or self.trust_id
|
||||
|
||||
if not token:
|
||||
token = self.auth_token_from_user
|
||||
if (not token and self.auth_ref and not
|
||||
@@ -434,7 +447,8 @@ class HTTPClient(object):
|
||||
'project_name': project_name,
|
||||
'project_domain_id': project_domain_id,
|
||||
'project_domain_name': project_domain_name,
|
||||
'token': token
|
||||
'token': token,
|
||||
'trust_id': trust_id,
|
||||
}
|
||||
(keyring_key, auth_ref) = self.get_auth_ref_from_keyring(**kwargs)
|
||||
new_token_needed = False
|
||||
@@ -535,7 +549,8 @@ class HTTPClient(object):
|
||||
domain_id=None, domain_name=None,
|
||||
project_id=None, project_name=None,
|
||||
project_domain_id=None,
|
||||
project_domain_name=None):
|
||||
project_domain_name=None,
|
||||
trust_id=None):
|
||||
"""Authenticate against the Identity API and get a token.
|
||||
|
||||
Not implemented here because auth protocols should be API
|
||||
|
@@ -17,6 +17,7 @@ import logging
|
||||
|
||||
from keystoneclient import exceptions
|
||||
from keystoneclient import httpclient
|
||||
from keystoneclient.v3.contrib import trusts
|
||||
from keystoneclient.v3 import credentials
|
||||
from keystoneclient.v3 import domains
|
||||
from keystoneclient.v3 import endpoints
|
||||
@@ -97,6 +98,7 @@ class Client(httpclient.HTTPClient):
|
||||
self.roles = roles.RoleManager(self)
|
||||
self.services = services.ServiceManager(self)
|
||||
self.users = users.UserManager(self)
|
||||
self.trusts = trusts.TrustManager(self)
|
||||
|
||||
if self.management_url is None:
|
||||
self.authenticate()
|
||||
@@ -128,7 +130,9 @@ class Client(httpclient.HTTPClient):
|
||||
project_id=None, project_name=None,
|
||||
project_domain_id=None,
|
||||
project_domain_name=None,
|
||||
token=None, **kwargs):
|
||||
token=None,
|
||||
trust_id=None,
|
||||
**kwargs):
|
||||
"""Authenticate against the v3 Identity API.
|
||||
|
||||
:returns: (``resp``, ``body``) if authentication was successful.
|
||||
@@ -151,7 +155,8 @@ class Client(httpclient.HTTPClient):
|
||||
project_name=project_name,
|
||||
project_domain_id=project_domain_id,
|
||||
project_domain_name=project_domain_name,
|
||||
token=token)
|
||||
token=token,
|
||||
trust_id=trust_id)
|
||||
except (exceptions.AuthorizationFailure, exceptions.Unauthorized):
|
||||
_logger.debug('Authorization failed.')
|
||||
raise
|
||||
@@ -163,7 +168,7 @@ class Client(httpclient.HTTPClient):
|
||||
user_domain_id=None, user_domain_name=None, password=None,
|
||||
domain_id=None, domain_name=None,
|
||||
project_id=None, project_name=None, project_domain_id=None,
|
||||
project_domain_name=None, token=None):
|
||||
project_domain_name=None, token=None, trust_id=None):
|
||||
headers = {}
|
||||
url = auth_url + "/auth/tokens"
|
||||
body = {'auth': {'identity': {}}}
|
||||
@@ -226,6 +231,12 @@ class Client(httpclient.HTTPClient):
|
||||
elif project_domain_name:
|
||||
scope['project']['domain']['name'] = project_domain_name
|
||||
|
||||
if trust_id:
|
||||
body['auth']['scope'] = {}
|
||||
scope = body['auth']['scope']
|
||||
scope['OS-TRUST:trust'] = {}
|
||||
scope['OS-TRUST:trust']['id'] = trust_id
|
||||
|
||||
if not (ident or token):
|
||||
raise ValueError('Authentication method required (e.g. password)')
|
||||
|
||||
|
0
keystoneclient/v3/contrib/__init__.py
Normal file
0
keystoneclient/v3/contrib/__init__.py
Normal file
86
keystoneclient/v3/contrib/trusts.py
Normal file
86
keystoneclient/v3/contrib/trusts.py
Normal file
@@ -0,0 +1,86 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from keystoneclient import base
|
||||
from keystoneclient import exceptions
|
||||
from keystoneclient.openstack.common import timeutils
|
||||
|
||||
|
||||
class Trust(base.Resource):
|
||||
"""Represents a Trust.
|
||||
|
||||
Attributes:
|
||||
* id: a uuid that identifies the trust
|
||||
* impersonation: allow explicit impersonation
|
||||
* project_id: project ID
|
||||
* trustee_user_id: a uuid that identifies the trustee
|
||||
* trustor_user_id: a uuid that identifies the trustor
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class TrustManager(base.CrudManager):
|
||||
"""Manager class for manipulating Trusts."""
|
||||
resource_class = Trust
|
||||
collection_key = 'trusts'
|
||||
key = 'trust'
|
||||
base_url = '/OS-TRUST'
|
||||
|
||||
def create(self, trustee_user, trustor_user, role_names=None,
|
||||
project=None, impersonation=False, expires_at=None):
|
||||
"""Create a Trust.
|
||||
:param string trustee_user: user who's authorization is being delegated
|
||||
:param string trustor_user: user who is capable of consuming the trust
|
||||
:param string role_names: subset of trustor's roles to be granted
|
||||
:param string project: project which the trustor is delegating
|
||||
:param boolean impersonation: enable explicit impersonation
|
||||
:param datetime.datetime expires_at: expiry time
|
||||
"""
|
||||
# Convert role_names list into list-of-dict API format
|
||||
if role_names:
|
||||
roles = [{'name': n} for n in role_names]
|
||||
else:
|
||||
roles = None
|
||||
|
||||
# Convert datetime.datetime expires_at to iso format string
|
||||
if expires_at:
|
||||
expires_str = timeutils.isotime(at=expires_at, subsecond=True)
|
||||
else:
|
||||
expires_str = None
|
||||
|
||||
return super(TrustManager, self).create(
|
||||
expires_at=expires_str,
|
||||
impersonation=impersonation,
|
||||
project_id=base.getid(project),
|
||||
roles=roles,
|
||||
trustee_user_id=base.getid(trustee_user),
|
||||
trustor_user_id=base.getid(trustor_user))
|
||||
|
||||
def update(self):
|
||||
raise exceptions.HTTPNotImplemented("Update not supported for trusts")
|
||||
|
||||
def list(self, trustee_user=None, trustor_user=None):
|
||||
"""List Trusts."""
|
||||
trustee_user_id = base.getid(trustee_user)
|
||||
trustor_user_id = base.getid(trustor_user)
|
||||
return super(TrustManager, self).list(trustee_user_id=trustee_user_id,
|
||||
trustor_user_id=trustor_user_id)
|
||||
|
||||
def get(self, trust):
|
||||
"""Get a specific trust."""
|
||||
return super(TrustManager, self).get(trust_id=base.getid(trust))
|
||||
|
||||
def delete(self, trust):
|
||||
"""Delete a trust."""
|
||||
return super(TrustManager, self).delete(trust_id=base.getid(trust))
|
@@ -32,6 +32,7 @@ class AccessInfoTest(utils.TestCase):
|
||||
self.assertFalse(auth_ref.scoped)
|
||||
self.assertFalse(auth_ref.domain_scoped)
|
||||
self.assertFalse(auth_ref.project_scoped)
|
||||
self.assertFalse(auth_ref.trust_scoped)
|
||||
|
||||
self.assertEquals(auth_ref.expires, timeutils.parse_isotime(
|
||||
UNSCOPED_TOKEN['access']['token']['expires']))
|
||||
|
@@ -33,6 +33,8 @@ class KeystoneClientTest(utils.TestCase):
|
||||
self.assertFalse(c.auth_ref.scoped)
|
||||
self.assertFalse(c.auth_ref.domain_scoped)
|
||||
self.assertFalse(c.auth_ref.project_scoped)
|
||||
self.assertIsNone(c.auth_ref.trust_id)
|
||||
self.assertFalse(c.auth_ref.trust_scoped)
|
||||
|
||||
def test_scoped_init(self):
|
||||
with mock.patch.object(requests, "request", self.scoped_mock_req):
|
||||
@@ -44,6 +46,8 @@ class KeystoneClientTest(utils.TestCase):
|
||||
self.assertTrue(c.auth_ref.scoped)
|
||||
self.assertTrue(c.auth_ref.project_scoped)
|
||||
self.assertFalse(c.auth_ref.domain_scoped)
|
||||
self.assertIsNone(c.auth_ref.trust_id)
|
||||
self.assertFalse(c.auth_ref.trust_scoped)
|
||||
|
||||
def test_auth_ref_load(self):
|
||||
with mock.patch.object(requests, "request", self.scoped_mock_req):
|
||||
@@ -57,6 +61,8 @@ class KeystoneClientTest(utils.TestCase):
|
||||
self.assertTrue(new_client.auth_ref.scoped)
|
||||
self.assertTrue(new_client.auth_ref.project_scoped)
|
||||
self.assertFalse(new_client.auth_ref.domain_scoped)
|
||||
self.assertIsNone(new_client.auth_ref.trust_id)
|
||||
self.assertFalse(new_client.auth_ref.trust_scoped)
|
||||
self.assertEquals(new_client.username, 'exampleuser')
|
||||
self.assertIsNone(new_client.password)
|
||||
self.assertEqual(new_client.management_url,
|
||||
@@ -77,6 +83,8 @@ class KeystoneClientTest(utils.TestCase):
|
||||
self.assertTrue(new_client.auth_ref.scoped)
|
||||
self.assertTrue(new_client.auth_ref.project_scoped)
|
||||
self.assertFalse(new_client.auth_ref.domain_scoped)
|
||||
self.assertIsNone(new_client.auth_ref.trust_id)
|
||||
self.assertFalse(new_client.auth_ref.trust_scoped)
|
||||
self.assertEquals(new_client.auth_url, new_auth_url)
|
||||
self.assertEquals(new_client.username, 'exampleuser')
|
||||
self.assertIsNone(new_client.password)
|
||||
|
@@ -232,3 +232,40 @@ AUTH_RESPONSE_BODY = {
|
||||
}]
|
||||
}
|
||||
}
|
||||
|
||||
TRUST_TOKEN = {
|
||||
u'token': {
|
||||
u'methods': [
|
||||
u'password'
|
||||
],
|
||||
u'catalog': {},
|
||||
u'expires_at': u'2010-11-01T03:32:15-05:00',
|
||||
"OS-TRUST:trust": {
|
||||
"id": "fe0aef",
|
||||
"impersonation": False,
|
||||
"links": {
|
||||
"self": "http://identity:35357/v3/trusts/fe0aef"
|
||||
},
|
||||
"trustee_user": {
|
||||
"id": "0ca8f6",
|
||||
"links": {
|
||||
"self": "http://identity:35357/v3/users/0ca8f6"
|
||||
}
|
||||
},
|
||||
"trustor_user": {
|
||||
"id": "bd263c",
|
||||
"links": {
|
||||
"self": "http://identity:35357/v3/users/bd263c"
|
||||
}
|
||||
}
|
||||
},
|
||||
u'user': {
|
||||
u'domain': {
|
||||
u'id': u'4e6893b7ba0b4006840c3845660b86ed',
|
||||
u'name': u'exampledomain'
|
||||
},
|
||||
u'id': u'0ca8f6',
|
||||
u'name': u'exampleuser',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -36,6 +36,13 @@ class KeystoneClientTest(utils.TestCase):
|
||||
})
|
||||
self.unscoped_mock_req = mock.Mock(return_value=unscoped_fake_resp)
|
||||
|
||||
trust_fake_resp = utils.TestResponse({
|
||||
"status_code": 200,
|
||||
"text": json.dumps(client_fixtures.TRUST_TOKEN),
|
||||
"headers": client_fixtures.AUTH_RESPONSE_HEADERS
|
||||
})
|
||||
self.trust_mock_req = mock.Mock(return_value=trust_fake_resp)
|
||||
|
||||
def test_unscoped_init(self):
|
||||
with mock.patch.object(requests, "request", self.unscoped_mock_req):
|
||||
c = client.Client(user_domain_name='exampledomain',
|
||||
@@ -119,3 +126,17 @@ class KeystoneClientTest(utils.TestCase):
|
||||
self.assertIsNone(new_client.password)
|
||||
self.assertEqual(new_client.management_url,
|
||||
'http://admin:35357/v3')
|
||||
|
||||
def test_trust_init(self):
|
||||
with mock.patch.object(requests, "request", self.trust_mock_req):
|
||||
c = client.Client(user_domain_name='exampledomain',
|
||||
username='exampleuser',
|
||||
password='password',
|
||||
auth_url='http://somewhere/',
|
||||
trust_id='fe0aef')
|
||||
self.assertIsNotNone(c.auth_ref)
|
||||
self.assertFalse(c.auth_ref.domain_scoped)
|
||||
self.assertFalse(c.auth_ref.project_scoped)
|
||||
self.assertEqual(c.auth_ref.trust_id, 'fe0aef')
|
||||
self.assertTrue(c.auth_ref.trust_scoped)
|
||||
self.assertEquals(c.auth_user_id, '0ca8f6')
|
||||
|
103
tests/v3/test_trusts.py
Normal file
103
tests/v3/test_trusts.py
Normal file
@@ -0,0 +1,103 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from keystoneclient import exceptions
|
||||
from keystoneclient.openstack.common import timeutils
|
||||
from keystoneclient.v3.contrib import trusts
|
||||
from tests.v3 import utils
|
||||
import uuid
|
||||
|
||||
|
||||
class TrustTests(utils.TestCase, utils.CrudTests):
|
||||
def setUp(self):
|
||||
super(TrustTests, self).setUp()
|
||||
self.additionalSetUp()
|
||||
self.key = 'trust'
|
||||
self.collection_key = 'trusts'
|
||||
self.model = trusts.Trust
|
||||
self.manager = self.client.trusts
|
||||
self.path_prefix = 'OS-TRUST'
|
||||
|
||||
def new_ref(self, **kwargs):
|
||||
kwargs = super(TrustTests, self).new_ref(**kwargs)
|
||||
kwargs.setdefault('project_id', uuid.uuid4().hex)
|
||||
return kwargs
|
||||
|
||||
def test_create(self):
|
||||
ref = self.new_ref()
|
||||
ref['trustor_user_id'] = uuid.uuid4().hex
|
||||
ref['trustee_user_id'] = uuid.uuid4().hex
|
||||
ref['impersonation'] = False
|
||||
super(TrustTests, self).test_create(ref=ref)
|
||||
|
||||
def test_create_roles(self):
|
||||
ref = self.new_ref()
|
||||
ref['trustor_user_id'] = uuid.uuid4().hex
|
||||
ref['trustee_user_id'] = uuid.uuid4().hex
|
||||
ref['impersonation'] = False
|
||||
req_ref = ref.copy()
|
||||
|
||||
# Note the TrustManager takes a list of role_names, and converts
|
||||
# internally to the slightly odd list-of-dict API format, so we
|
||||
# have to pass the expected request data to allow correct stubbing
|
||||
ref['role_names'] = ['atestrole']
|
||||
req_ref['roles'] = [{'name': 'atestrole'}]
|
||||
super(TrustTests, self).test_create(ref=ref, req_ref=req_ref)
|
||||
|
||||
def test_create_expires(self):
|
||||
ref = self.new_ref()
|
||||
ref['trustor_user_id'] = uuid.uuid4().hex
|
||||
ref['trustee_user_id'] = uuid.uuid4().hex
|
||||
ref['impersonation'] = False
|
||||
ref['expires_at'] = timeutils.parse_isotime(
|
||||
'2013-03-04T12:00:01.000000Z')
|
||||
req_ref = ref.copy()
|
||||
|
||||
# Note the TrustManager takes a datetime.datetime object for
|
||||
# expires_at, and converts it internally into an iso format datestamp
|
||||
req_ref['expires_at'] = '2013-03-04T12:00:01.000000Z'
|
||||
super(TrustTests, self).test_create(ref=ref, req_ref=req_ref)
|
||||
|
||||
def test_create_imp(self):
|
||||
ref = self.new_ref()
|
||||
ref['trustor_user_id'] = uuid.uuid4().hex
|
||||
ref['trustee_user_id'] = uuid.uuid4().hex
|
||||
ref['impersonation'] = True
|
||||
super(TrustTests, self).test_create(ref=ref)
|
||||
|
||||
def test_create_roles_imp(self):
|
||||
ref = self.new_ref()
|
||||
ref['trustor_user_id'] = uuid.uuid4().hex
|
||||
ref['trustee_user_id'] = uuid.uuid4().hex
|
||||
ref['impersonation'] = True
|
||||
req_ref = ref.copy()
|
||||
ref['role_names'] = ['atestrole']
|
||||
req_ref['roles'] = [{'name': 'atestrole'}]
|
||||
super(TrustTests, self).test_create(ref=ref, req_ref=req_ref)
|
||||
|
||||
def test_list_filter_trustor(self):
|
||||
ep = 'v3/OS-TRUST/trusts?trustor_user_id=12345'
|
||||
super(TrustTests, self).test_list(expected_path=ep,
|
||||
trustor_user='12345')
|
||||
|
||||
def test_list_filter_trustee(self):
|
||||
ep = 'v3/OS-TRUST/trusts?trustee_user_id=12345'
|
||||
super(TrustTests, self).test_list(expected_path=ep,
|
||||
trustee_user='12345')
|
||||
|
||||
def test_update(self):
|
||||
# Update not supported for the OS-TRUST API
|
||||
self.assertRaises(exceptions.HTTPNotImplemented, self.manager.update)
|
@@ -185,6 +185,7 @@ class CrudTests(testtools.TestCase):
|
||||
collection_key = None
|
||||
model = None
|
||||
manager = None
|
||||
path_prefix = None
|
||||
|
||||
def new_ref(self, **kwargs):
|
||||
kwargs.setdefault('id', uuid.uuid4().hex)
|
||||
@@ -212,33 +213,48 @@ class CrudTests(testtools.TestCase):
|
||||
return json.dumps({self.collection_key: entity}, sort_keys=True)
|
||||
raise NotImplementedError('Are you sure you want to serialize that?')
|
||||
|
||||
def test_create(self, ref=None):
|
||||
def _req_path(self):
|
||||
if self.path_prefix:
|
||||
return 'v3/%s/%s' % (self.path_prefix, self.collection_key)
|
||||
else:
|
||||
return 'v3/%s' % self.collection_key
|
||||
|
||||
def test_create(self, ref=None, req_ref=None):
|
||||
ref = ref or self.new_ref()
|
||||
manager_ref = ref.copy()
|
||||
manager_ref.pop('id')
|
||||
|
||||
# req_ref argument allows you to specify a different
|
||||
# signature for the request when the manager does some
|
||||
# conversion before doing the request (e.g converting
|
||||
# from datetime object to timestamp string)
|
||||
req_ref = req_ref or ref.copy()
|
||||
req_ref.pop('id')
|
||||
data = self.serialize(req_ref)
|
||||
resp = TestResponse({
|
||||
"status_code": 201,
|
||||
"text": self.serialize(ref),
|
||||
"text": data,
|
||||
})
|
||||
|
||||
method = 'POST'
|
||||
req_ref = ref.copy()
|
||||
req_ref.pop('id')
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
kwargs['data'] = self.serialize(req_ref)
|
||||
kwargs['data'] = data
|
||||
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/%s' % self.collection_key),
|
||||
self._req_path()),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
returned = self.manager.create(**parameterize(req_ref))
|
||||
returned = self.manager.create(**parameterize(manager_ref))
|
||||
self.assertTrue(isinstance(returned, self.model))
|
||||
for attr in ref:
|
||||
for attr in req_ref:
|
||||
self.assertEqual(
|
||||
getattr(returned, attr),
|
||||
ref[attr],
|
||||
req_ref[attr],
|
||||
'Expected different %s' % attr)
|
||||
|
||||
def test_get(self, ref=None):
|
||||
@@ -255,7 +271,7 @@ class CrudTests(testtools.TestCase):
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/%s/%s' % (self.collection_key, ref['id'])),
|
||||
'%s/%s' % (self._req_path(), ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
@@ -281,7 +297,7 @@ class CrudTests(testtools.TestCase):
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
expected_path or 'v3/%s' % self.collection_key),
|
||||
expected_path or self._req_path()),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
@@ -305,7 +321,7 @@ class CrudTests(testtools.TestCase):
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/%s%s' % (self.collection_key, query)),
|
||||
'%s%s' % (self._req_path(), query)),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
@@ -334,7 +350,7 @@ class CrudTests(testtools.TestCase):
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/%s/%s' % (self.collection_key, ref['id'])),
|
||||
'%s/%s' % (self._req_path(), ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
@@ -360,7 +376,7 @@ class CrudTests(testtools.TestCase):
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/%s/%s' % (self.collection_key, ref['id'])),
|
||||
'%s/%s' % (self._req_path(), ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
|
Reference in New Issue
Block a user