Add a read-only role to keystoneauth

An idea was floated recently of a read-only role that can be used for
cluster-wide audits, and is otherwise safe. It was also included into
the "Consistent and Secure Default Policies" effort in OpenStack,
where it implements "reader" personas in system, domain, and project
scopes. This patch implements it for system scope, where it's most
useful for operators.

Change-Id: I5f5fff2e61a3e5fb4f4464262a8ea558a6e7d7ef
This commit is contained in:
Pete Zaitcev 2020-12-18 11:16:19 -06:00
parent 1f9b879547
commit 98a0275a9d
3 changed files with 86 additions and 1 deletions

View File

@ -498,6 +498,14 @@ user_test5_tester5 = testing5 service
# move between domains, you should disable backwards compatible name matching
# in ACLs by setting allow_names_in_acls to false:
# allow_names_in_acls = true
#
# In OpenStack terms, these reader roles are scoped for system: they
# can read anything across projects and domains.
# They are used for auditing and compliance fuctions.
# In Swift terms, these roles are as powerful as the reseller_admin_role,
# only do not modify the cluster.
# By default the list of reader roles is empty.
# system_reader_roles =
[filter:s3api]
use = egg:swift#s3api

View File

@ -181,6 +181,9 @@ class KeystoneAuth(object):
service_roles=[]))
self.reseller_admin_role = conf.get('reseller_admin_role',
'ResellerAdmin').lower()
self.system_reader_roles = {role.lower() for role in list_from_csv(
conf.get('system_reader_roles', ''))}
config_is_admin = conf.get('is_admin', "false").lower()
if swift_utils.config_true_value(config_is_admin):
self.logger.warning("The 'is_admin' option for keystoneauth is no "
@ -423,6 +426,19 @@ class KeystoneAuth(object):
req.environ['swift_owner'] = True
return
# The system_reader_role is almost as good as reseller_admin.
if self.system_reader_roles.intersection(user_roles):
# Note that if a system reader is trying to write, we're letting
# the request fall on other access checks below. This way,
# a compliance auditor can write a log file as a normal member.
if req.method in ('GET', 'HEAD'):
msg = 'User %s has system reader authorizing'
self.logger.debug(msg, tenant_id)
# We aren't setting 'swift_owner' nor 'reseller_request'
# because they are only ever used for something that modifies
# the contents of the cluster (setting ACL, deleting accounts).
return
# If we are not reseller admin and user is trying to delete its own
# account then deny it.
if not container and not obj and req.method == 'DELETE':

View File

@ -593,7 +593,7 @@ class BaseTestAuthorize(unittest.TestCase):
return self.test_auth._keystone_identity(env)
class TestAuthorize(BaseTestAuthorize):
class BaseTestAuthorizeCheck(BaseTestAuthorize):
def _check_authenticate(self, account=None, identity=None, headers=None,
exception=None, acl=None, env=None, path=None):
if not identity:
@ -626,6 +626,8 @@ class TestAuthorize(BaseTestAuthorize):
self.assertIsNone(result)
return req
class TestAuthorize(BaseTestAuthorizeCheck):
def test_authorize_fails_for_unauthorized_user(self):
self._check_authenticate(exception=HTTP_FORBIDDEN)
@ -1508,6 +1510,65 @@ class TestSetProjectDomain(BaseTestAuthorize):
sysmeta_project_domain_id='test_id')
class TestAuthorizeReader(BaseTestAuthorizeCheck):
system_reader_role_1 = 'compliance'
system_reader_role_2 = 'integrity'
# This cannot be in SetUp because it takes arguments from tests.
def _setup(self, system_reader_roles):
self.test_auth = keystoneauth.filter_factory(
{}, system_reader_roles=system_reader_roles)(FakeApp())
self.test_auth.logger = FakeLogger()
# Zero test: make sure that reader role has no default access
# when not in the list of system_reader_roles[].
def test_reader_none(self):
# We could rifle in the KeystoneAuth internals and tweak the list,
# but to create the middleware fresh is a clean, future-resistant way.
self._setup(None)
identity = self._get_identity(roles=[self.system_reader_role_1])
self._check_authenticate(exception=HTTP_FORBIDDEN,
identity=identity)
# HEAD is the same, right? No need to check, right?
def test_reader_get(self):
# While we're at it, test that our parsing of CSV works.
self._setup("%s, %s" %
(self.system_reader_role_1, self.system_reader_role_2))
identity = self._get_identity(roles=[self.system_reader_role_1])
self._check_authenticate(identity=identity)
def test_reader_put(self):
self._setup(self.system_reader_role_1)
identity = self._get_identity(roles=[self.system_reader_role_1])
self._check_authenticate(exception=HTTP_FORBIDDEN,
identity=identity,
env={'REQUEST_METHOD': 'PUT'})
self._check_authenticate(exception=HTTP_FORBIDDEN,
identity=identity,
env={'REQUEST_METHOD': 'POST'})
def test_reader_put_to_own(self):
roles = operator_roles(self.test_auth) + [self.system_reader_role_1]
identity = self._get_identity(roles=roles)
req = self._check_authenticate(identity=identity,
env={'REQUEST_METHOD': 'PUT'})
self.assertTrue(req.environ.get('swift_owner'))
# This should not be happening, but let's make sure that reader did not
# obtain any extra authorizations by combining with swiftoperator,
# because that is how reader is going to be used in practice.
def test_reader_put_elsewhere_fails(self):
roles = operator_roles(self.test_auth) + [self.system_reader_role_1]
identity = self._get_identity(roles=roles)
account = "%s%s" % (self._get_account(identity), "2")
self._check_authenticate(exception=HTTP_FORBIDDEN,
identity=identity,
account=account,
env={'REQUEST_METHOD': 'PUT'})
class ResellerInInfo(unittest.TestCase):
def setUp(self):