diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index d28a196055..503ba0483a 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -498,6 +498,14 @@ user_test5_tester5 = testing5 service # move between domains, you should disable backwards compatible name matching # in ACLs by setting allow_names_in_acls to false: # allow_names_in_acls = true +# +# In OpenStack terms, these reader roles are scoped for system: they +# can read anything across projects and domains. +# They are used for auditing and compliance fuctions. +# In Swift terms, these roles are as powerful as the reseller_admin_role, +# only do not modify the cluster. +# By default the list of reader roles is empty. +# system_reader_roles = [filter:s3api] use = egg:swift#s3api diff --git a/swift/common/middleware/keystoneauth.py b/swift/common/middleware/keystoneauth.py index cb1b4c7c39..dd70b73661 100644 --- a/swift/common/middleware/keystoneauth.py +++ b/swift/common/middleware/keystoneauth.py @@ -181,6 +181,9 @@ class KeystoneAuth(object): service_roles=[])) self.reseller_admin_role = conf.get('reseller_admin_role', 'ResellerAdmin').lower() + self.system_reader_roles = {role.lower() for role in list_from_csv( + conf.get('system_reader_roles', ''))} + config_is_admin = conf.get('is_admin', "false").lower() if swift_utils.config_true_value(config_is_admin): self.logger.warning("The 'is_admin' option for keystoneauth is no " @@ -423,6 +426,19 @@ class KeystoneAuth(object): req.environ['swift_owner'] = True return + # The system_reader_role is almost as good as reseller_admin. + if self.system_reader_roles.intersection(user_roles): + # Note that if a system reader is trying to write, we're letting + # the request fall on other access checks below. This way, + # a compliance auditor can write a log file as a normal member. + if req.method in ('GET', 'HEAD'): + msg = 'User %s has system reader authorizing' + self.logger.debug(msg, tenant_id) + # We aren't setting 'swift_owner' nor 'reseller_request' + # because they are only ever used for something that modifies + # the contents of the cluster (setting ACL, deleting accounts). + return + # If we are not reseller admin and user is trying to delete its own # account then deny it. if not container and not obj and req.method == 'DELETE': diff --git a/test/unit/common/middleware/test_keystoneauth.py b/test/unit/common/middleware/test_keystoneauth.py index 34d39f489b..832d857ed8 100644 --- a/test/unit/common/middleware/test_keystoneauth.py +++ b/test/unit/common/middleware/test_keystoneauth.py @@ -593,7 +593,7 @@ class BaseTestAuthorize(unittest.TestCase): return self.test_auth._keystone_identity(env) -class TestAuthorize(BaseTestAuthorize): +class BaseTestAuthorizeCheck(BaseTestAuthorize): def _check_authenticate(self, account=None, identity=None, headers=None, exception=None, acl=None, env=None, path=None): if not identity: @@ -626,6 +626,8 @@ class TestAuthorize(BaseTestAuthorize): self.assertIsNone(result) return req + +class TestAuthorize(BaseTestAuthorizeCheck): def test_authorize_fails_for_unauthorized_user(self): self._check_authenticate(exception=HTTP_FORBIDDEN) @@ -1508,6 +1510,65 @@ class TestSetProjectDomain(BaseTestAuthorize): sysmeta_project_domain_id='test_id') +class TestAuthorizeReader(BaseTestAuthorizeCheck): + + system_reader_role_1 = 'compliance' + system_reader_role_2 = 'integrity' + + # This cannot be in SetUp because it takes arguments from tests. + def _setup(self, system_reader_roles): + self.test_auth = keystoneauth.filter_factory( + {}, system_reader_roles=system_reader_roles)(FakeApp()) + self.test_auth.logger = FakeLogger() + + # Zero test: make sure that reader role has no default access + # when not in the list of system_reader_roles[]. + def test_reader_none(self): + # We could rifle in the KeystoneAuth internals and tweak the list, + # but to create the middleware fresh is a clean, future-resistant way. + self._setup(None) + identity = self._get_identity(roles=[self.system_reader_role_1]) + self._check_authenticate(exception=HTTP_FORBIDDEN, + identity=identity) + + # HEAD is the same, right? No need to check, right? + def test_reader_get(self): + # While we're at it, test that our parsing of CSV works. + self._setup("%s, %s" % + (self.system_reader_role_1, self.system_reader_role_2)) + identity = self._get_identity(roles=[self.system_reader_role_1]) + self._check_authenticate(identity=identity) + + def test_reader_put(self): + self._setup(self.system_reader_role_1) + identity = self._get_identity(roles=[self.system_reader_role_1]) + self._check_authenticate(exception=HTTP_FORBIDDEN, + identity=identity, + env={'REQUEST_METHOD': 'PUT'}) + self._check_authenticate(exception=HTTP_FORBIDDEN, + identity=identity, + env={'REQUEST_METHOD': 'POST'}) + + def test_reader_put_to_own(self): + roles = operator_roles(self.test_auth) + [self.system_reader_role_1] + identity = self._get_identity(roles=roles) + req = self._check_authenticate(identity=identity, + env={'REQUEST_METHOD': 'PUT'}) + self.assertTrue(req.environ.get('swift_owner')) + + # This should not be happening, but let's make sure that reader did not + # obtain any extra authorizations by combining with swiftoperator, + # because that is how reader is going to be used in practice. + def test_reader_put_elsewhere_fails(self): + roles = operator_roles(self.test_auth) + [self.system_reader_role_1] + identity = self._get_identity(roles=roles) + account = "%s%s" % (self._get_account(identity), "2") + self._check_authenticate(exception=HTTP_FORBIDDEN, + identity=identity, + account=account, + env={'REQUEST_METHOD': 'PUT'}) + + class ResellerInInfo(unittest.TestCase): def setUp(self):