v3 Policies

- v3 policy (bp rbac-keystone-api)
- v3 policy tests (bug 1023935)
- v3 policy implementation (bug 1023939)

Change-Id: I163fbb67726c295fe9ed09b68cd18d2273345d29
This commit is contained in:
Dolph Mathews 2012-08-29 02:57:38 -05:00
parent 71692f7805
commit 827fc4c731
15 changed files with 607 additions and 22 deletions

View File

@ -79,7 +79,7 @@
# expiration = 86400 # expiration = 86400
[policy] [policy]
# driver = keystone.policy.backends.rules.Policy # driver = keystone.policy.backends.sql.Policy
[ec2] [ec2]
# driver = keystone.contrib.ec2.backends.kvs.Ec2 # driver = keystone.contrib.ec2.backends.kvs.Ec2

View File

@ -44,6 +44,7 @@ String = sql.String
ForeignKey = sql.ForeignKey ForeignKey = sql.ForeignKey
DateTime = sql.DateTime DateTime = sql.DateTime
IntegrityError = sql.exc.IntegrityError IntegrityError = sql.exc.IntegrityError
NotFound = sql.orm.exc.NoResultFound
Boolean = sql.Boolean Boolean = sql.Boolean

View File

@ -0,0 +1,36 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import migrate
import sqlalchemy as sql
def upgrade(migrate_engine):
meta = sql.MetaData()
meta.bind = migrate_engine
policy_table = sql.Table(
'policy',
meta,
sql.Column('id', sql.String(64), primary_key=True),
sql.Column('type', sql.String(255), nullable=False),
sql.Column('blob', sql.Text(), nullable=False),
sql.Column('extra', sql.Text()))
policy_table.create(migrate_engine, checkfirst=True)
def downgrade(migrate_engine):
pass

View File

@ -161,7 +161,7 @@ register_str('driver', group='catalog',
register_str('driver', group='identity', register_str('driver', group='identity',
default='keystone.identity.backends.sql.Identity') default='keystone.identity.backends.sql.Identity')
register_str('driver', group='policy', register_str('driver', group='policy',
default='keystone.policy.backends.rules.Policy') default='keystone.policy.backends.sql.Policy')
register_str('driver', group='token', register_str('driver', group='token',
default='keystone.token.backends.kvs.Token') default='keystone.token.backends.kvs.Token')
register_str('driver', group='ec2', register_str('driver', group='ec2',

View File

@ -0,0 +1,103 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
from keystone.common import sql
from keystone.common.sql import migration
from keystone import exception
from keystone.policy.backends import rules
def handle_conflicts(type='object'):
"""Converts IntegrityError into HTTP 409 Conflict."""
def decorator(method):
@functools.wraps(method)
def wrapper(*args, **kwargs):
try:
return method(*args, **kwargs)
except sql.IntegrityError as e:
raise exception.Conflict(type=type, details=str(e))
return wrapper
return decorator
class PolicyModel(sql.ModelBase, sql.DictBase):
__tablename__ = 'policy'
attributes = ['id', 'blob', 'type']
id = sql.Column(sql.String(64), primary_key=True)
blob = sql.Column(sql.JsonBlob(), nullable=False)
type = sql.Column(sql.String(255), nullable=False)
extra = sql.Column(sql.JsonBlob())
class Policy(sql.Base, rules.Policy):
# Internal interface to manage the database
def db_sync(self):
migration.db_sync()
@handle_conflicts(type='policy')
def create_policy(self, policy_id, policy):
session = self.get_session()
with session.begin():
ref = PolicyModel.from_dict(policy)
session.add(ref)
session.flush()
return ref.to_dict()
def list_policies(self):
session = self.get_session()
refs = session.query(PolicyModel).all()
return [ref.to_dict() for ref in refs]
def _get_policy(self, session, policy_id):
"""Private method to get a policy model object (NOT a dictionary)."""
try:
return session.query(PolicyModel).filter_by(id=policy_id).one()
except sql.NotFound:
raise exception.PolicyNotFound(policy_id=policy_id)
def get_policy(self, policy_id):
session = self.get_session()
return self._get_policy(session, policy_id).to_dict()
@handle_conflicts(type='policy')
def update_policy(self, policy_id, policy):
session = self.get_session()
with session.begin():
ref = self._get_policy(session, policy_id)
old_dict = ref.to_dict()
old_dict.update(policy)
new_policy = PolicyModel.from_dict(old_dict)
ref.blob = new_policy.blob
ref.type = new_policy.type
ref.extra = new_policy.extra
session.flush()
return ref.to_dict()
def delete_policy(self, policy_id):
session = self.get_session()
with session.begin():
ref = self._get_policy(session, policy_id)
session.delete(ref)
session.flush()

View File

@ -68,10 +68,9 @@ class Driver(object):
raise exception.NotImplemented() raise exception.NotImplemented()
def create_policy(self, policy_id, policy): def create_policy(self, policy_id, policy):
"""Store a policy blob for a particular endpoint. """Store a policy blob.
:raises: keystone.exception.EndpointNotFound, :raises: keystone.exception.Conflict
keystone.exception.Conflict
""" """
raise exception.NotImplemented() raise exception.NotImplemented()
@ -91,8 +90,7 @@ class Driver(object):
def update_policy(self, policy_id, policy): def update_policy(self, policy_id, policy):
"""Update a policy blob. """Update a policy blob.
:raises: keystone.exception.PolicyNotFound, :raises: keystone.exception.PolicyNotFound
keystone.exception.EndpointNotFound
""" """
raise exception.NotImplemented() raise exception.NotImplemented()
@ -113,9 +111,6 @@ class PolicyControllerV3(controller.V3Controller):
ref = self._assign_unique_id(self._normalize_dict(policy)) ref = self._assign_unique_id(self._normalize_dict(policy))
self._require_attribute(ref, 'blob') self._require_attribute(ref, 'blob')
self._require_attribute(ref, 'type') self._require_attribute(ref, 'type')
self._require_attribute(ref, 'endpoint_id')
self.catalog_api.get_endpoint(context, ref['endpoint_id'])
ref = self.policy_api.create_policy(context, ref['id'], ref) ref = self.policy_api.create_policy(context, ref['id'], ref)
return {'policy': ref} return {'policy': ref}
@ -123,7 +118,6 @@ class PolicyControllerV3(controller.V3Controller):
def list_policies(self, context): def list_policies(self, context):
self.assert_admin(context) self.assert_admin(context)
refs = self.policy_api.list_policies(context) refs = self.policy_api.list_policies(context)
refs = self._filter_by_attribute(context, refs, 'endpoint_id')
refs = self._filter_by_attribute(context, refs, 'type') refs = self._filter_by_attribute(context, refs, 'type')
return {'policies': self._paginate(context, refs)} return {'policies': self._paginate(context, refs)}
@ -134,10 +128,6 @@ class PolicyControllerV3(controller.V3Controller):
def update_policy(self, context, policy_id, policy): def update_policy(self, context, policy_id, policy):
self.assert_admin(context) self.assert_admin(context)
if 'endpoint_id' in policy:
self.catalog_api.get_endpoint(context, policy['endpoint_id'])
ref = self.policy_api.update_policy(context, policy_id, policy) ref = self.policy_api.update_policy(context, policy_id, policy)
return {'policy': ref} return {'policy': ref}

View File

@ -206,6 +206,7 @@ class TestCase(NoModule, unittest.TestCase):
self.identity_api = importutils.import_object(CONF.identity.driver) self.identity_api = importutils.import_object(CONF.identity.driver)
self.token_api = importutils.import_object(CONF.token.driver) self.token_api = importutils.import_object(CONF.token.driver)
self.catalog_api = importutils.import_object(CONF.catalog.driver) self.catalog_api = importutils.import_object(CONF.catalog.driver)
self.policy_api = importutils.import_object(CONF.policy.driver)
def load_fixtures(self, fixtures): def load_fixtures(self, fixtures):
"""Hacky basic and naive fixture loading based on a python module. """Hacky basic and naive fixture loading based on a python module.

View File

@ -13,3 +13,6 @@ driver = keystone.contrib.ec2.backends.sql.Ec2
[catalog] [catalog]
driver = keystone.catalog.backends.sql.Catalog driver = keystone.catalog.backends.sql.Catalog
[policy]
driver = keystone.policy.backends.sql.Policy

View File

@ -903,3 +903,88 @@ class CatalogTests(object):
self.assertRaises(exception.EndpointNotFound, self.assertRaises(exception.EndpointNotFound,
self.catalog_api.delete_endpoint, self.catalog_api.delete_endpoint,
uuid.uuid4().hex) uuid.uuid4().hex)
class PolicyTests(object):
def _new_policy_ref(self):
return {
'id': uuid.uuid4().hex,
'policy': uuid.uuid4().hex,
'type': uuid.uuid4().hex,
'endpoint_id': uuid.uuid4().hex,
}
def assertEqualPolicies(self, a, b):
self.assertEqual(a['id'], b['id'])
self.assertEqual(a['endpoint_id'], b['endpoint_id'])
self.assertEqual(a['policy'], b['policy'])
self.assertEqual(a['type'], b['type'])
def test_create(self):
ref = self._new_policy_ref()
res = self.policy_api.create_policy(ref['id'], ref)
self.assertEqualPolicies(ref, res)
def test_get(self):
ref = self._new_policy_ref()
res = self.policy_api.create_policy(ref['id'], ref)
res = self.policy_api.get_policy(ref['id'])
self.assertEqualPolicies(ref, res)
def test_list(self):
ref = self._new_policy_ref()
self.policy_api.create_policy(ref['id'], ref)
res = self.policy_api.list_policies()
res = [x for x in res if x['id'] == ref['id']][0]
self.assertEqualPolicies(ref, res)
def test_update(self):
ref = self._new_policy_ref()
self.policy_api.create_policy(ref['id'], ref)
orig = ref
ref = self._new_policy_ref()
# (cannot change policy ID)
self.assertRaises(exception.ValidationError,
self.policy_man.update_policy,
{},
orig['id'],
ref)
ref['id'] = orig['id']
res = self.policy_api.update_policy(orig['id'], ref)
self.assertEqualPolicies(ref, res)
def test_delete(self):
ref = self._new_policy_ref()
self.policy_api.create_policy(ref['id'], ref)
self.policy_api.delete_policy(ref['id'])
self.assertRaises(exception.PolicyNotFound,
self.policy_man.delete_policy, {}, ref['id'])
self.assertRaises(exception.PolicyNotFound,
self.policy_man.get_policy, {}, ref['id'])
res = self.policy_api.list_policies()
self.assertFalse(len([x for x in res if x['id'] == ref['id']]))
def test_get_policy_404(self):
self.assertRaises(exception.PolicyNotFound,
self.policy_man.get_policy,
{},
uuid.uuid4().hex)
def test_update_policy_404(self):
self.assertRaises(exception.PolicyNotFound,
self.policy_man.update_policy,
{},
uuid.uuid4().hex,
{})
def test_delete_policy_404(self):
self.assertRaises(exception.PolicyNotFound,
self.policy_man.delete_policy,
{},
uuid.uuid4().hex)

View File

@ -21,6 +21,7 @@ from keystone import catalog
from keystone import config from keystone import config
from keystone import exception from keystone import exception
from keystone import identity from keystone import identity
from keystone import policy
from keystone import test from keystone import test
from keystone import token from keystone import token
@ -42,11 +43,13 @@ class SqlTests(test.TestCase):
self.catalog_man = catalog.Manager() self.catalog_man = catalog.Manager()
self.identity_man = identity.Manager() self.identity_man = identity.Manager()
self.token_man = token.Manager() self.token_man = token.Manager()
self.policy_man = policy.Manager()
# create shortcut references to each driver # create shortcut references to each driver
self.catalog_api = self.catalog_man.driver self.catalog_api = self.catalog_man.driver
self.identity_api = self.identity_man.driver self.identity_api = self.identity_man.driver
self.token_api = self.token_man.driver self.token_api = self.token_man.driver
self.policy_api = self.policy_man.driver
# populate the engine with tables & fixtures # populate the engine with tables & fixtures
self.load_fixtures(default_fixtures) self.load_fixtures(default_fixtures)
@ -272,3 +275,7 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests):
self.catalog_man.delete_service, {}, "c") self.catalog_man.delete_service, {}, "c")
self.assertRaises(exception.EndpointNotFound, self.assertRaises(exception.EndpointNotFound,
self.catalog_man.delete_endpoint, {}, "d") self.catalog_man.delete_endpoint, {}, "d")
class SqlPolicy(SqlTests, test_backend.PolicyTests):
pass

View File

@ -1097,7 +1097,14 @@ class KcEssex3TestCase(CompatTestCase, KeystoneClientTests):
def test_endpoint_delete_404(self): def test_endpoint_delete_404(self):
raise nose.exc.SkipTest('N/A') raise nose.exc.SkipTest('N/A')
def test_policy_crud(self):
"""Due to lack of endpoint CRUD"""
raise nose.exc.SkipTest('N/A')
class Kc11TestCase(CompatTestCase, KeystoneClientTests): class Kc11TestCase(CompatTestCase, KeystoneClientTests):
def get_checkout(self): def get_checkout(self):
return KEYSTONECLIENT_REPO, '0.1.1' return KEYSTONECLIENT_REPO, '0.1.1'
def test_policy_crud(self):
raise nose.exc.SkipTest('N/A')

View File

@ -16,6 +16,8 @@
import uuid import uuid
import nose.exc
from keystone.common import sql from keystone.common import sql
from keystone import config from keystone import config
from keystone import test from keystone import test
@ -94,3 +96,76 @@ class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase):
self.assertRaises(client_exceptions.NotFound, self.assertRaises(client_exceptions.NotFound,
client.endpoints.delete, client.endpoints.delete,
id=uuid.uuid4().hex) id=uuid.uuid4().hex)
def test_policy_crud(self):
# FIXME(dolph): this test was written prior to the v3 implementation of
# the client and essentially refers to a non-existent
# policy manager in the v2 client. this test needs to be
# moved to a test suite running against the v3 api
raise nose.exc.SkipTest('Written prior to v3 client; needs refactor')
from keystoneclient import exceptions as client_exceptions
client = self.get_client(admin=True)
policy_blob = uuid.uuid4().hex
policy_type = uuid.uuid4().hex
service = client.services.create(
name=uuid.uuid4().hex,
service_type=uuid.uuid4().hex,
description=uuid.uuid4().hex)
endpoint = client.endpoints.create(
service_id=service.id,
region=uuid.uuid4().hex,
adminurl=uuid.uuid4().hex,
internalurl=uuid.uuid4().hex,
publicurl=uuid.uuid4().hex)
# create
policy = client.policies.create(
blob=policy_blob,
type=policy_type,
endpoint=endpoint.id)
self.assertEquals(policy_blob, policy.policy)
self.assertEquals(policy_type, policy.type)
self.assertEquals(endpoint.id, policy.endpoint_id)
policy = client.policies.get(policy=policy.id)
self.assertEquals(policy_blob, policy.policy)
self.assertEquals(policy_type, policy.type)
self.assertEquals(endpoint.id, policy.endpoint_id)
endpoints = [x for x in client.endpoints.list() if x.id == endpoint.id]
endpoint = endpoints[0]
self.assertEquals(policy_blob, policy.policy)
self.assertEquals(policy_type, policy.type)
self.assertEquals(endpoint.id, policy.endpoint_id)
# update
policy_blob = uuid.uuid4().hex
policy_type = uuid.uuid4().hex
endpoint = client.endpoints.create(
service_id=service.id,
region=uuid.uuid4().hex,
adminurl=uuid.uuid4().hex,
internalurl=uuid.uuid4().hex,
publicurl=uuid.uuid4().hex)
policy = client.policies.update(
policy=policy.id,
blob=policy_blob,
type=policy_type,
endpoint=endpoint.id)
policy = client.policies.get(policy=policy.id)
self.assertEquals(policy_blob, policy.policy)
self.assertEquals(policy_type, policy.type)
self.assertEquals(endpoint.id, policy.endpoint_id)
# delete
client.policies.delete(policy=policy.id)
self.assertRaises(
client_exceptions.NotFound,
client.policies.get,
policy=policy.id)
policies = [x for x in client.policies.list() if x.id == policy.id]
self.assertEquals(len(policies), 0)

View File

@ -50,8 +50,7 @@ class SqlUpgradeTests(test.TestCase):
super(SqlUpgradeTests, self).tearDown() super(SqlUpgradeTests, self).tearDown()
def test_blank_db_to_start(self): def test_blank_db_to_start(self):
self.assertFalse(self.is_user_table_created(), self.assertTableDoesNotExist('user')
"User should not be defined yet")
def test_start_version_0(self): def test_start_version_0(self):
version = migration.db_version() version = migration.db_version()
@ -66,7 +65,7 @@ class SqlUpgradeTests(test.TestCase):
def test_upgrade_0_to_1(self): def test_upgrade_0_to_1(self):
self.assertEqual(self.schema.version, 0, "DB is at version 0") self.assertEqual(self.schema.version, 0, "DB is at version 0")
self._migrate(self.repo_path, 1) self._migrate(self.repo_path, 1)
self.assertEqual(self.schema.version, 1, "DB is at version 0") self.assertEqual(self.schema.version, 1, "DB is at version 1")
self.assertTableColumns("user", ["id", "name", "extra"]) self.assertTableColumns("user", ["id", "name", "extra"])
self.assertTableColumns("tenant", ["id", "name", "extra"]) self.assertTableColumns("tenant", ["id", "name", "extra"])
self.assertTableColumns("role", ["id", "name"]) self.assertTableColumns("role", ["id", "name"])
@ -75,6 +74,16 @@ class SqlUpgradeTests(test.TestCase):
self.assertTableColumns("metadata", ["user_id", "tenant_id", "data"]) self.assertTableColumns("metadata", ["user_id", "tenant_id", "data"])
self.populate_user_table() self.populate_user_table()
def test_upgrade_5_to_6(self):
self._migrate(self.repo_path, 5)
self.assertEqual(self.schema.version, 5)
self.assertTableDoesNotExist('policy')
self._migrate(self.repo_path, 6)
self.assertEqual(self.schema.version, 6)
self.assertTableExists('policy')
self.assertTableColumns('policy', ['id', 'type', 'blob', 'extra'])
def populate_user_table(self): def populate_user_table(self):
for user in default_fixtures.USERS: for user in default_fixtures.USERS:
extra = copy.deepcopy(user) extra = copy.deepcopy(user)
@ -92,12 +101,21 @@ class SqlUpgradeTests(test.TestCase):
s = sqlalchemy.select([table]) s = sqlalchemy.select([table])
return s return s
def is_user_table_created(self): def assertTableExists(self, table_name):
"""Asserts that a given table exists can be selected by name."""
try: try:
self.select_table("user") self.select_table(table_name)
return True
except sqlalchemy.exc.NoSuchTableError: except sqlalchemy.exc.NoSuchTableError:
return False raise AssertionError('Table "%s" does not exist' % table_name)
def assertTableDoesNotExist(self, table_name):
"""Asserts that a given table exists cannot be selected by name."""
try:
self.assertTableExists(table_name)
except AssertionError:
pass
else:
raise AssertionError('Table "%s" already exists' % table_name)
def _migrate(self, repository, version): def _migrate(self, repository, version):
upgrade = True upgrade = True

181
tests/test_v3.py Normal file
View File

@ -0,0 +1,181 @@
import uuid
from keystone.common.sql import util as sql_util
from keystone import test
import test_content_types
BASE_URL = 'http://127.0.0.1:35357/v3'
class RestfulTestCase(test_content_types.RestfulTestCase):
def setUp(self):
self.config([
test.etcdir('keystone.conf.sample'),
test.testsdir('test_overrides.conf'),
test.testsdir('backend_sql.conf'),
test.testsdir('backend_sql_disk.conf')])
sql_util.setup_test_database()
self.load_backends()
self.public_server = self.serveapp('keystone', name='main')
self.admin_server = self.serveapp('keystone', name='admin')
def tearDown(self):
self.public_server.kill()
self.admin_server.kill()
self.public_server = None
self.admin_server = None
def new_ref(self):
"""Populates a ref with attributes common to all API entities."""
return {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': uuid.uuid4().hex,
'enabled': True}
def new_service_ref(self):
ref = self.new_ref()
ref['type'] = uuid.uuid4().hex
return ref
def new_endpoint_ref(self, service_id):
ref = self.new_ref()
ref['interface'] = uuid.uuid4().hex
ref['service_id'] = service_id
return ref
def new_domain_ref(self):
ref = self.new_ref()
return ref
def new_project_ref(self, domain_id):
ref = self.new_ref()
ref['domain_id'] = domain_id
return ref
def new_user_ref(self, domain_id, project_id=None):
ref = self.new_ref()
ref['domain_id'] = domain_id
ref['email'] = uuid.uuid4().hex
if project_id:
ref['project_id'] = project_id
return ref
def new_credential_ref(self, user_id, project_id=None):
ref = self.new_ref()
ref['user_id'] = user_id
ref['blob'] = uuid.uuid4().hex
ref['type'] = uuid.uuid4().hex
if project_id:
ref['project_id'] = project_id
return ref
def new_role_ref(self):
ref = self.new_ref()
return ref
def new_policy_ref(self):
ref = self.new_ref()
ref['blob'] = uuid.uuid4().hex
ref['type'] = uuid.uuid4().hex
return ref
def get_scoped_token(self):
"""Convenience method so that we can test authenticated requests."""
# FIXME(dolph): should use real auth
return 'ADMIN'
r = self.admin_request(
method='POST',
path='/v3/tokens',
body={
'auth': {
'passwordCredentials': {
'username': self.user_foo['name'],
'password': self.user_foo['password'],
},
'tenantId': self.tenant_bar['id'],
},
})
return r.body['access']['token']['id']
def v3_request(self, path, **kwargs):
path = '/v3' + path
return self.admin_request(
path=path,
token=self.get_scoped_token(),
**kwargs)
def get(self, path, **kwargs):
return self.v3_request(method='GET', path=path, **kwargs)
def head(self, path, **kwargs):
return self.v3_request(method='HEAD', path=path, **kwargs)
def post(self, path, **kwargs):
return self.v3_request(method='POST', path=path, **kwargs)
def patch(self, path, **kwargs):
return self.v3_request(method='PATCH', path=path, **kwargs)
def delete(self, path, **kwargs):
return self.v3_request(method='DELETE', path=path, **kwargs)
def assertValidListResponse(self, resp, key, entity_validator, ref=None):
"""Make assertions common to all API list responses.
If a reference is provided, it's ID will be searched for in the
response, and asserted to be equal.
"""
entities = resp.body.get(key)
self.assertIsNotNone(entities)
self.assertTrue(len(entities))
for entity in entities:
self.assertIsNotNone(entity)
self.assertValidEntity(entity)
entity_validator(entity)
if ref:
entity = [x for x in entities if x['id'] == ref['id']][0]
self.assertValidEntity(entity, ref)
entity_validator(entity, ref)
return entities
def assertValidResponse(self, resp, key, entity_validator, ref):
"""Make assertions common to all API responses."""
entity = resp.body.get(key)
self.assertIsNotNone(entity)
self.assertValidEntity(entity, ref)
entity_validator(entity, ref)
return entity
def assertValidEntity(self, entity, ref=None):
"""Make assertions common to all API entities.
If a reference is provided, the entity will also be compared against
the reference.
"""
keys = ['name', 'description', 'enabled']
for k in ['id'] + keys:
msg = '%s unnexpectedly None in %s' % (k, entity)
self.assertIsNotNone(entity.get(k), msg)
# FIXME(dolph): need to test this in v3
# self.assertIsNotNone(entity.get('link'))
# self.assertIsNotNone(entity['link'].get('href'))
# self.assertEquals(entity['link'].get('rel'), 'self')
if ref:
for k in keys:
msg = '%s not equal: %s != %s' % (k, ref[k], entity[k])
self.assertEquals(ref[k], entity[k])
return entity
class VersionTestCase(RestfulTestCase):
def test_get_version(self):
pass

78
tests/test_v3_policy.py Normal file
View File

@ -0,0 +1,78 @@
import uuid
import test_v3
class PolicyTestCase(test_v3.RestfulTestCase):
"""Test policy CRUD"""
def setUp(self):
super(PolicyTestCase, self).setUp()
self.policy_id = uuid.uuid4().hex
self.policy = self.new_policy_ref()
self.policy['id'] = self.policy_id
self.policy_api.create_policy(
self.policy_id,
self.policy.copy())
# policy validation
def assertValidPolicyListResponse(self, resp, ref):
return self.assertValidListResponse(
resp,
'policies',
self.assertValidPolicy,
ref)
def assertValidPolicyResponse(self, resp, ref):
return self.assertValidResponse(
resp,
'policy',
self.assertValidPolicy,
ref)
def assertValidPolicy(self, entity, ref=None):
self.assertIsNotNone(entity.get('blob'))
self.assertIsNotNone(entity.get('type'))
if ref:
self.assertEqual(ref['blob'], entity['blob'])
self.assertEqual(ref['type'], entity['type'])
return entity
# policy crud tests
def test_create_policy(self):
"""POST /policies"""
ref = self.new_policy_ref()
r = self.post(
'/policies',
body={'policy': ref})
return self.assertValidPolicyResponse(r, ref)
def test_list_policies(self):
"""GET /policies"""
r = self.get('/policies')
self.assertValidPolicyListResponse(r, self.policy)
def test_get_policy(self):
"""GET /policies/{policy_id}"""
r = self.get(
'/policies/%(policy_id)s' % {
'policy_id': self.policy_id})
self.assertValidPolicyResponse(r, self.policy)
def test_update_policy(self):
"""PATCH /policies/{policy_id}"""
policy = self.new_policy_ref()
policy['id'] = self.policy_id
r = self.patch(
'/policies/%(policy_id)s' % {
'policy_id': self.policy_id},
body={'policy': policy})
self.assertValidPolicyResponse(r, policy)
def test_delete_policy(self):
"""DELETE /policies/{policy_id}"""
self.delete(
'/policies/%(policy_id)s' % {
'policy_id': self.policy_id})