Merge "Rewrite existing ACL tests with ddt, yaml"

This commit is contained in:
Zuul 2021-01-11 18:33:32 +00:00 committed by Gerrit Code Review
commit e7a372b017
2 changed files with 82 additions and 56 deletions

View File

@ -16,9 +16,9 @@ Tests for ACL. Checks whether certain kinds of requests
are blocked or allowed to be processed. are blocked or allowed to be processed.
""" """
from http import client as http_client import abc
from unittest import mock
import ddt
from oslo_config import cfg from oslo_config import cfg
from ironic.tests.unit.api import base from ironic.tests.unit.api import base
@ -29,75 +29,57 @@ cfg.CONF.import_opt('cache', 'keystonemiddleware.auth_token',
group='keystone_authtoken') group='keystone_authtoken')
class TestACL(base.BaseApiTest): class TestACLBase(base.BaseApiTest):
def setUp(self): def setUp(self):
super(TestACL, self).setUp() super(TestACLBase, self).setUp()
self.environ = {'fake.cache': utils.FakeMemcache()} self.environ = {'fake.cache': utils.FakeMemcache()}
self.fake_db_node = db_utils.get_test_node(chassis_id=None) self.format_data = {}
self.node_path = '/nodes/%s' % self.fake_db_node['uuid'] self._create_test_data()
def get_json(self, path, expect_errors=False, headers=None, q=None,
**param):
q = [] if q is None else q
return super(TestACL, self).get_json(path,
expect_errors=expect_errors,
headers=headers,
q=q,
extra_environ=self.environ,
**param)
def _make_app(self): def _make_app(self):
cfg.CONF.set_override('cache', 'fake.cache', cfg.CONF.set_override('cache', 'fake.cache',
group='keystone_authtoken') group='keystone_authtoken')
cfg.CONF.set_override('auth_strategy', 'keystone') cfg.CONF.set_override('auth_strategy', 'keystone')
return super(TestACL, self)._make_app() return super(TestACLBase, self)._make_app()
def test_non_authenticated(self): @abc.abstractmethod
response = self.get_json(self.node_path, expect_errors=True) def _create_test_data(self):
self.assertEqual(http_client.UNAUTHORIZED, response.status_int) pass
def test_authenticated(self):
with mock.patch.object(self.dbapi, 'get_node_by_uuid',
autospec=True) as mock_get_node:
mock_get_node.return_value = self.fake_db_node
def _test_request(self, path, params=None, headers=None, method='get',
assert_status=None, assert_dict_contains=None):
path = path.format(**self.format_data)
expect_errors = bool(assert_status)
if method == 'get':
response = self.get_json( response = self.get_json(
self.node_path, headers={'X-Auth-Token': utils.ADMIN_TOKEN}) path,
headers=headers,
expect_errors=expect_errors,
extra_environ=self.environ,
path_prefix=''
)
else:
assert False, 'Unimplemented test method: %s' % method
self.assertEqual(self.fake_db_node['uuid'], response['uuid']) if assert_status:
mock_get_node.assert_called_once_with(self.fake_db_node['uuid']) self.assertEqual(assert_status, response.status_int)
def test_non_admin(self): if assert_dict_contains:
response = self.get_json(self.node_path, for k, v in assert_dict_contains.items():
headers={'X-Auth-Token': utils.MEMBER_TOKEN}, self.assertIn(k, response)
expect_errors=True) self.assertEqual(v.format(**self.format_data), response[k])
self.assertEqual(http_client.FORBIDDEN, response.status_int)
def test_non_admin_with_admin_header(self): @ddt.ddt
response = self.get_json(self.node_path, class TestACLBasic(TestACLBase):
headers={'X-Auth-Token': utils.MEMBER_TOKEN,
'X-Roles': 'admin'},
expect_errors=True)
self.assertEqual(http_client.FORBIDDEN, response.status_int) def _create_test_data(self):
fake_db_node = db_utils.create_test_node(chassis_id=None)
self.format_data['node_uuid'] = fake_db_node['uuid']
def test_public_api(self): @ddt.file_data('test_acl_basic.yaml')
# expect_errors should be set to True: If expect_errors is set to False @ddt.unpack
# the response gets converted to JSON and we cannot read the response def test_basic(self, **kwargs):
# code so easy. self._test_request(**kwargs)
for route in ('/', '/v1'):
response = self.get_json(route,
path_prefix='', expect_errors=True)
self.assertEqual(http_client.OK, response.status_int)
def test_public_api_with_path_extensions(self):
routes = {'/v1/': http_client.OK,
'/v1.json': http_client.OK,
'/v1.xml': http_client.NOT_FOUND}
for url in routes:
response = self.get_json(url,
path_prefix='', expect_errors=True)
self.assertEqual(routes[url], response.status_int)

View File

@ -0,0 +1,44 @@
non_authenticated:
path: &node_path '/v1/nodes/{node_uuid}'
assert_status: 401
authenticated:
path: *node_path
headers:
X-Auth-Token: &admin_token '4562138218392831'
assert_dict_contains:
uuid: '{node_uuid}'
driver: 'fake-hardware'
non_admin:
path: *node_path
headers:
X-Auth-Token: &member_token '4562138218392832'
assert_status: 403
non_admin_with_admin_header:
path: *node_path
headers:
X-Auth-Token: *member_token
X-Roles: admin
assert_status: 403
public_api:
path: /
assert_status: 200
public_api_v1:
path: /v1
assert_status: 200
public_api_v1_slash:
path: /v1/
assert_status: 200
public_api_v1_json:
path: /v1.json
assert_status: 200
public_api_v1_xml:
path: /v1.xml
assert_status: 404