Merge "Restrict keystone cross-tenant ACLs to IDs"
This commit is contained in:
commit
034fae630c
@ -99,6 +99,7 @@ CS :ref:`container-sync`
|
||||
TA :ref:`common_tempauth`
|
||||
DLO :ref:`dynamic-large-objects`
|
||||
LE :ref:`list_endpoints`
|
||||
KS :ref:`keystoneauth`
|
||||
======================= =============================
|
||||
|
||||
|
||||
|
@ -285,6 +285,16 @@ user_test_tester3 = testing3
|
||||
# operator_roles = admin, swiftoperator
|
||||
# The reseller admin role has the ability to create and delete accounts
|
||||
# reseller_admin_role = ResellerAdmin
|
||||
# For backwards compatibility, keystoneauth will match names in cross-tenant
|
||||
# access control lists (ACLs) when both the requesting user and the tenant
|
||||
# are in the default domain i.e the domain to which existing tenants are
|
||||
# migrated. The default_domain_id value configured here should be the same as
|
||||
# the value used during migration of tenants to keystone domains.
|
||||
# default_domain_id = default
|
||||
# For a new installation, or an installation in which keystone projects may
|
||||
# move between domains, you should disable backwards compatible name matching
|
||||
# in ACLs by setting allow_names_in_acls to false:
|
||||
# allow_names_in_acls = true
|
||||
|
||||
[filter:healthcheck]
|
||||
use = egg:swift#healthcheck
|
||||
|
@ -13,11 +13,20 @@
|
||||
# under the License.
|
||||
|
||||
from swift.common import utils as swift_utils
|
||||
from swift.common.http import is_success
|
||||
from swift.common.middleware import acl as swift_acl
|
||||
from swift.common.request_helpers import get_sys_meta_prefix
|
||||
from swift.common.swob import HTTPNotFound, HTTPForbidden, HTTPUnauthorized
|
||||
from swift.common.utils import register_swift_info
|
||||
from swift.proxy.controllers.base import get_account_info
|
||||
import functools
|
||||
|
||||
PROJECT_DOMAIN_ID_HEADER = 'x-account-project-domain-id'
|
||||
PROJECT_DOMAIN_ID_SYSMETA_HEADER = \
|
||||
get_sys_meta_prefix('account') + 'project-domain-id'
|
||||
# a string that is unique w.r.t valid ids
|
||||
UNKNOWN_ID = '_unknown'
|
||||
|
||||
|
||||
class KeystoneAuth(object):
|
||||
"""Swift middleware to Keystone authorization system.
|
||||
@ -69,6 +78,37 @@ class KeystoneAuth(object):
|
||||
|
||||
reseller_prefix = NEWAUTH
|
||||
|
||||
The keystoneauth middleware supports cross-tenant access control using
|
||||
the syntax <tenant>:<user> in container Access Control Lists (ACLs). For
|
||||
a request to be granted by an ACL, <tenant> must match the UUID of the
|
||||
tenant to which the request token is scoped and <user> must match the
|
||||
UUID of the user authenticated by the request token.
|
||||
|
||||
Note that names must no longer be used in cross-tenant ACLs because with
|
||||
the introduction of domains in keystone names are no longer globally
|
||||
unique. For backwards compatibility, ACLs using names will be granted by
|
||||
keystoneauth when it can be established that both the grantee and the
|
||||
tenant being accessed are either not yet in a domain (e.g. the request
|
||||
token has been obtained via the keystone v2 API) or are both in the
|
||||
default domain to which legacy accounts would have been migrated. The id
|
||||
of the default domain is specified by the config option
|
||||
``default_domain_id``:
|
||||
|
||||
default_domain_id = default
|
||||
|
||||
The backwards compatible behavior can be disabled by setting the config
|
||||
option ``allow_names_in_acls`` to false::
|
||||
|
||||
allow_names_in_acls = false
|
||||
|
||||
To enable this backwards compatibility, keystoneauth will attempt to
|
||||
determine the domain id of a tenant when any new account is created,
|
||||
and persist this as account metadata. If an account is created for a tenant
|
||||
using a token with reselleradmin role that is not scoped on that tenant,
|
||||
keystoneauth is unable to determine the domain id of the tenant;
|
||||
keystoneauth will assume that the tenant may not be in the default domain
|
||||
and therefore not match names in ACLs for that account.
|
||||
|
||||
:param app: The next WSGI app in the pipeline
|
||||
:param conf: The dict of configuration values
|
||||
"""
|
||||
@ -87,6 +127,9 @@ class KeystoneAuth(object):
|
||||
self.is_admin = swift_utils.config_true_value(config_is_admin)
|
||||
config_overrides = conf.get('allow_overrides', 't').lower()
|
||||
self.allow_overrides = swift_utils.config_true_value(config_overrides)
|
||||
self.default_domain_id = conf.get('default_domain_id', 'default')
|
||||
self.allow_names_in_acls = swift_utils.config_true_value(
|
||||
conf.get('allow_names_in_acls', 'true'))
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
identity = self._keystone_identity(environ)
|
||||
@ -116,7 +159,18 @@ class KeystoneAuth(object):
|
||||
|
||||
environ['swift.clean_acl'] = swift_acl.clean_acl
|
||||
|
||||
return self.app(environ, start_response)
|
||||
def keystone_start_response(status, response_headers, exc_info=None):
|
||||
project_domain_id = None
|
||||
for key, val in response_headers:
|
||||
if key.lower() == PROJECT_DOMAIN_ID_SYSMETA_HEADER:
|
||||
project_domain_id = val
|
||||
break
|
||||
if project_domain_id:
|
||||
response_headers.append((PROJECT_DOMAIN_ID_HEADER,
|
||||
project_domain_id))
|
||||
return start_response(status, response_headers, exc_info)
|
||||
|
||||
return self.app(environ, keystone_start_response)
|
||||
|
||||
def _keystone_identity(self, environ):
|
||||
"""Extract the identity from the Keystone auth component."""
|
||||
@ -147,6 +201,21 @@ class KeystoneAuth(object):
|
||||
'tenant': (environ.get('HTTP_X_TENANT_ID'),
|
||||
environ.get('HTTP_X_TENANT_NAME')),
|
||||
'roles': roles}
|
||||
token_info = environ.get('keystone.token_info', {})
|
||||
auth_version = 0
|
||||
user_domain = project_domain = (None, None)
|
||||
if 'access' in token_info:
|
||||
# ignore any domain id headers that authtoken may have set
|
||||
auth_version = 2
|
||||
elif 'token' in token_info:
|
||||
auth_version = 3
|
||||
user_domain = (environ.get('HTTP_X_USER_DOMAIN_ID'),
|
||||
environ.get('HTTP_X_USER_DOMAIN_NAME'))
|
||||
project_domain = (environ.get('HTTP_X_PROJECT_DOMAIN_ID'),
|
||||
environ.get('HTTP_X_PROJECT_DOMAIN_NAME'))
|
||||
identity['user_domain'] = user_domain
|
||||
identity['project_domain'] = project_domain
|
||||
identity['auth_version'] = auth_version
|
||||
return identity
|
||||
|
||||
def _get_account_for_tenant(self, tenant_id):
|
||||
@ -156,8 +225,80 @@ class KeystoneAuth(object):
|
||||
"""Check reseller prefix."""
|
||||
return account == self._get_account_for_tenant(tenant_id)
|
||||
|
||||
def _get_project_domain_id(self, environ):
|
||||
info = get_account_info(environ, self.app, 'KS')
|
||||
domain_id = info.get('sysmeta', {}).get('project-domain-id')
|
||||
exists = is_success(info.get('status', 0))
|
||||
return exists, domain_id
|
||||
|
||||
def _set_project_domain_id(self, req, path_parts, env_identity):
|
||||
'''
|
||||
Try to determine the project domain id and save it as
|
||||
account metadata. Do this for a PUT or POST to the
|
||||
account, and also for a container PUT in case that
|
||||
causes the account to be auto-created.
|
||||
'''
|
||||
if PROJECT_DOMAIN_ID_SYSMETA_HEADER in req.headers:
|
||||
return
|
||||
|
||||
version, account, container, obj = path_parts
|
||||
method = req.method
|
||||
if (obj or (container and method != 'PUT')
|
||||
or method not in ['PUT', 'POST']):
|
||||
return
|
||||
|
||||
tenant_id, tenant_name = env_identity['tenant']
|
||||
exists, sysmeta_id = self._get_project_domain_id(req.environ)
|
||||
req_has_id, req_id, new_id = False, None, None
|
||||
if self._reseller_check(account, tenant_id):
|
||||
# domain id can be inferred from request (may be None)
|
||||
req_has_id = True
|
||||
req_id = env_identity['project_domain'][0]
|
||||
if not exists:
|
||||
# new account so set a domain id
|
||||
new_id = req_id if req_has_id else UNKNOWN_ID
|
||||
elif sysmeta_id is None and req_id == self.default_domain_id:
|
||||
# legacy account, update if default domain id in req
|
||||
new_id = req_id
|
||||
elif sysmeta_id == UNKNOWN_ID and req_has_id:
|
||||
# unknown domain, update if req confirms domain
|
||||
new_id = req_id or ''
|
||||
elif req_has_id and sysmeta_id != req_id:
|
||||
self.logger.warn("Inconsistent project domain id: " +
|
||||
"%s in token vs %s in account metadata."
|
||||
% (req_id, sysmeta_id))
|
||||
|
||||
if new_id is not None:
|
||||
req.headers[PROJECT_DOMAIN_ID_SYSMETA_HEADER] = new_id
|
||||
|
||||
def _is_name_allowed_in_acl(self, req, path_parts, identity):
|
||||
if not self.allow_names_in_acls:
|
||||
return False
|
||||
user_domain_id = identity['user_domain'][0]
|
||||
if user_domain_id and user_domain_id != self.default_domain_id:
|
||||
return False
|
||||
|
||||
proj_domain_id = identity['project_domain'][0]
|
||||
if proj_domain_id and proj_domain_id != self.default_domain_id:
|
||||
return False
|
||||
|
||||
# request user and scoped project are both in default domain
|
||||
tenant_id, tenant_name = identity['tenant']
|
||||
version, account, container, obj = path_parts
|
||||
if self._reseller_check(account, tenant_id):
|
||||
# account == scoped project, so account is also in default domain
|
||||
allow = True
|
||||
else:
|
||||
# retrieve account project domain id from account sysmeta
|
||||
exists, acc_domain_id = self._get_project_domain_id(req.environ)
|
||||
allow = exists and acc_domain_id in [self.default_domain_id, None]
|
||||
if allow:
|
||||
self.logger.debug("Names allowed in acls.")
|
||||
return allow
|
||||
|
||||
def _authorize_cross_tenant(self, user_id, user_name,
|
||||
tenant_id, tenant_name, roles):
|
||||
tenant_id, tenant_name, roles,
|
||||
allow_names=True):
|
||||
"""Check cross-tenant ACLs.
|
||||
|
||||
Match tenant:user, tenant and user could be its id, name or '*'
|
||||
@ -167,14 +308,21 @@ class KeystoneAuth(object):
|
||||
:param tenant_id: The tenant ID from the identity token.
|
||||
:param tenant_name: The tenant name from the identity token.
|
||||
:param roles: The given container ACL.
|
||||
:param allow_names: If True then attempt to match tenant and user names
|
||||
as well as id's.
|
||||
|
||||
:returns: matched string if tenant(name/id/*):user(name/id/*) matches
|
||||
the given ACL.
|
||||
None otherwise.
|
||||
|
||||
"""
|
||||
for tenant in [tenant_id, tenant_name, '*']:
|
||||
for user in [user_id, user_name, '*']:
|
||||
tenant_match = [tenant_id, '*']
|
||||
user_match = [user_id, '*']
|
||||
if allow_names:
|
||||
tenant_match = tenant_match + [tenant_name]
|
||||
user_match = user_match + [user_name]
|
||||
for tenant in tenant_match:
|
||||
for user in user_match:
|
||||
s = '%s:%s' % (tenant, user)
|
||||
if s in roles:
|
||||
return s
|
||||
@ -195,6 +343,8 @@ class KeystoneAuth(object):
|
||||
except ValueError:
|
||||
return HTTPNotFound(request=req)
|
||||
|
||||
self._set_project_domain_id(req, part, env_identity)
|
||||
|
||||
user_roles = [r.lower() for r in env_identity.get('roles', [])]
|
||||
|
||||
# Give unconditional access to a user with the reseller_admin
|
||||
@ -214,9 +364,12 @@ class KeystoneAuth(object):
|
||||
return self.denied_response(req)
|
||||
|
||||
# cross-tenant authorization
|
||||
matched_acl = None
|
||||
if roles:
|
||||
allow_names = self._is_name_allowed_in_acl(req, part, env_identity)
|
||||
matched_acl = self._authorize_cross_tenant(user_id, user_name,
|
||||
tenant_id, tenant_name,
|
||||
roles)
|
||||
roles, allow_names)
|
||||
if matched_acl is not None:
|
||||
log_msg = 'user %s allowed in ACL authorizing.'
|
||||
self.logger.debug(log_msg, matched_acl)
|
||||
|
@ -121,7 +121,7 @@ class AccountController(Controller):
|
||||
req, self.app.account_ring, account_partition, 'POST',
|
||||
req.swift_entity_path, [headers] * len(accounts))
|
||||
if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
|
||||
self.autocreate_account(req.environ, self.account_name)
|
||||
self.autocreate_account(req, self.account_name)
|
||||
resp = self.make_requests(
|
||||
req, self.app.account_ring, account_partition, 'POST',
|
||||
req.swift_entity_path, [headers] * len(accounts))
|
||||
|
@ -1157,7 +1157,7 @@ class Controller(object):
|
||||
"""
|
||||
return self.GETorHEAD(req)
|
||||
|
||||
def autocreate_account(self, env, account):
|
||||
def autocreate_account(self, req, account):
|
||||
"""
|
||||
Autocreate an account
|
||||
|
||||
@ -1169,12 +1169,17 @@ class Controller(object):
|
||||
headers = {'X-Timestamp': Timestamp(time.time()).internal,
|
||||
'X-Trans-Id': self.trans_id,
|
||||
'Connection': 'close'}
|
||||
# transfer any x-account-sysmeta headers from original request
|
||||
# to the autocreate PUT
|
||||
headers.update((k, v)
|
||||
for k, v in req.headers.iteritems()
|
||||
if is_sys_meta('account', k))
|
||||
resp = self.make_requests(Request.blank('/v1' + path),
|
||||
self.app.account_ring, partition, 'PUT',
|
||||
path, [headers] * len(nodes))
|
||||
if is_success(resp.status_int):
|
||||
self.app.logger.info('autocreate account %r' % path)
|
||||
clear_info_cache(self.app, env, account)
|
||||
clear_info_cache(self.app, req.environ, account)
|
||||
else:
|
||||
self.app.logger.warning('Could not autocreate account %r' % path)
|
||||
|
||||
|
@ -136,7 +136,7 @@ class ContainerController(Controller):
|
||||
account_partition, accounts, container_count = \
|
||||
self.account_info(self.account_name, req)
|
||||
if not accounts and self.app.account_autocreate:
|
||||
self.autocreate_account(req.environ, self.account_name)
|
||||
self.autocreate_account(req, self.account_name)
|
||||
account_partition, accounts, container_count = \
|
||||
self.account_info(self.account_name, req)
|
||||
if not accounts:
|
||||
|
@ -83,10 +83,13 @@ normalized_urls = None
|
||||
# If no config was read, we will fall back to old school env vars
|
||||
swift_test_auth_version = None
|
||||
swift_test_auth = os.environ.get('SWIFT_TEST_AUTH')
|
||||
swift_test_user = [os.environ.get('SWIFT_TEST_USER'), None, None]
|
||||
swift_test_key = [os.environ.get('SWIFT_TEST_KEY'), None, None]
|
||||
swift_test_tenant = ['', '', '']
|
||||
swift_test_perm = ['', '', '']
|
||||
swift_test_user = [os.environ.get('SWIFT_TEST_USER'), None, None, '']
|
||||
swift_test_key = [os.environ.get('SWIFT_TEST_KEY'), None, None, '']
|
||||
swift_test_tenant = ['', '', '', '']
|
||||
swift_test_perm = ['', '', '', '']
|
||||
swift_test_domain = ['', '', '', '']
|
||||
swift_test_user_id = ['', '', '', '']
|
||||
swift_test_tenant_id = ['', '', '', '']
|
||||
|
||||
skip, skip2, skip3 = False, False, False
|
||||
|
||||
@ -432,6 +435,7 @@ def setup_package():
|
||||
global swift_test_key
|
||||
global swift_test_tenant
|
||||
global swift_test_perm
|
||||
global swift_test_domain
|
||||
|
||||
if config:
|
||||
swift_test_auth_version = str(config.get('auth_version', '1'))
|
||||
@ -488,8 +492,13 @@ def setup_package():
|
||||
swift_test_user[2] = config['username3']
|
||||
swift_test_tenant[2] = config['account']
|
||||
swift_test_key[2] = config['password3']
|
||||
if 'username4' in config:
|
||||
swift_test_user[3] = config['username4']
|
||||
swift_test_tenant[3] = config['account4']
|
||||
swift_test_key[3] = config['password4']
|
||||
swift_test_domain[3] = config['domain4']
|
||||
|
||||
for _ in range(3):
|
||||
for _ in range(4):
|
||||
swift_test_perm[_] = swift_test_tenant[_] + ':' \
|
||||
+ swift_test_user[_]
|
||||
|
||||
@ -511,6 +520,15 @@ def setup_package():
|
||||
print >>sys.stderr, \
|
||||
'SKIPPING THIRD ACCOUNT FUNCTIONAL TESTS DUE TO NO CONFIG FOR THEM'
|
||||
|
||||
global skip_if_not_v3
|
||||
skip_if_not_v3 = (swift_test_auth_version != '3'
|
||||
or not all([not skip,
|
||||
swift_test_user[3],
|
||||
swift_test_key[3]]))
|
||||
if not skip and skip_if_not_v3:
|
||||
print >>sys.stderr, \
|
||||
'SKIPPING FUNCTIONAL TESTS SPECIFIC TO AUTH VERSION 3'
|
||||
|
||||
get_cluster_info()
|
||||
|
||||
|
||||
@ -549,10 +567,10 @@ class InternalServerError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
url = [None, None, None]
|
||||
token = [None, None, None]
|
||||
parsed = [None, None, None]
|
||||
conn = [None, None, None]
|
||||
url = [None, None, None, None]
|
||||
token = [None, None, None, None]
|
||||
parsed = [None, None, None, None]
|
||||
conn = [None, None, None, None]
|
||||
|
||||
|
||||
def connection(url):
|
||||
@ -579,7 +597,8 @@ def retry(func, *args, **kwargs):
|
||||
|
||||
# access our own account by default
|
||||
url_account = kwargs.pop('url_account', use_account + 1) - 1
|
||||
|
||||
os_options = {'user_domain_name': swift_test_domain[use_account],
|
||||
'project_domain_name': swift_test_domain[use_account]}
|
||||
while attempts <= retries:
|
||||
attempts += 1
|
||||
try:
|
||||
@ -590,7 +609,7 @@ def retry(func, *args, **kwargs):
|
||||
snet=False,
|
||||
tenant_name=swift_test_tenant[use_account],
|
||||
auth_version=swift_test_auth_version,
|
||||
os_options={})
|
||||
os_options=os_options)
|
||||
parsed[use_account] = conn[use_account] = None
|
||||
if not parsed[use_account] or not conn[use_account]:
|
||||
parsed[use_account], conn[use_account] = \
|
||||
|
@ -809,5 +809,33 @@ class TestAccount(unittest.TestCase):
|
||||
self.assertEqual(resp.status, 400)
|
||||
|
||||
|
||||
class TestAccountInNonDefaultDomain(unittest.TestCase):
|
||||
def setUp(self):
|
||||
if tf.skip or tf.skip2 or tf.skip_if_not_v3:
|
||||
raise SkipTest('AUTH VERSION 3 SPECIFIC TEST')
|
||||
|
||||
def test_project_domain_id_header(self):
|
||||
# make sure account exists (assumes account auto create)
|
||||
def post(url, token, parsed, conn):
|
||||
conn.request('POST', parsed.path, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
resp = retry(post, use_account=4)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# account in non-default domain should have a project domain id
|
||||
def head(url, token, parsed, conn):
|
||||
conn.request('HEAD', parsed.path, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
resp = retry(head, use_account=4)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
self.assertTrue('X-Account-Project-Domain-Id' in resp.headers)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -1511,5 +1511,179 @@ class TestContainer(unittest.TestCase):
|
||||
policy['name'])
|
||||
|
||||
|
||||
class BaseTestContainerACLs(unittest.TestCase):
|
||||
# subclasses can change the account in which container
|
||||
# is created/deleted by setUp/tearDown
|
||||
account = 1
|
||||
|
||||
def _get_account(self, url, token, parsed, conn):
|
||||
return parsed.path
|
||||
|
||||
def _get_tenant_id(self, url, token, parsed, conn):
|
||||
account = parsed.path
|
||||
return account.replace('/v1/AUTH_', '', 1)
|
||||
|
||||
def setUp(self):
|
||||
if tf.skip or tf.skip2 or tf.skip_if_not_v3:
|
||||
raise SkipTest('AUTH VERSION 3 SPECIFIC TEST')
|
||||
self.name = uuid4().hex
|
||||
|
||||
def put(url, token, parsed, conn):
|
||||
conn.request('PUT', parsed.path + '/' + self.name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
resp = retry(put, use_account=self.account)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 201)
|
||||
|
||||
def tearDown(self):
|
||||
if tf.skip or tf.skip2 or tf.skip_if_not_v3:
|
||||
raise SkipTest
|
||||
|
||||
def get(url, token, parsed, conn):
|
||||
conn.request('GET', parsed.path + '/' + self.name + '?format=json',
|
||||
'', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
def delete(url, token, parsed, conn, obj):
|
||||
conn.request('DELETE',
|
||||
'/'.join([parsed.path, self.name, obj['name']]), '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
while True:
|
||||
resp = retry(get, use_account=self.account)
|
||||
body = resp.read()
|
||||
self.assert_(resp.status // 100 == 2, resp.status)
|
||||
objs = json.loads(body)
|
||||
if not objs:
|
||||
break
|
||||
for obj in objs:
|
||||
resp = retry(delete, obj, use_account=self.account)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
def delete(url, token, parsed, conn):
|
||||
conn.request('DELETE', parsed.path + '/' + self.name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
resp = retry(delete, use_account=self.account)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
def _assert_cross_account_acl_granted(self, granted, grantee_account, acl):
|
||||
'''
|
||||
Check whether a given container ACL is granted when a user specified
|
||||
by account_b attempts to access a container.
|
||||
'''
|
||||
# Obtain the first account's string
|
||||
first_account = retry(self._get_account, use_account=self.account)
|
||||
|
||||
# Ensure we can't access the container with the grantee account
|
||||
def get2(url, token, parsed, conn):
|
||||
conn.request('GET', first_account + '/' + self.name, '',
|
||||
{'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
resp = retry(get2, use_account=grantee_account)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 403)
|
||||
|
||||
def put2(url, token, parsed, conn):
|
||||
conn.request('PUT', first_account + '/' + self.name + '/object',
|
||||
'test object', {'X-Auth-Token': token})
|
||||
return check_response(conn)
|
||||
|
||||
resp = retry(put2, use_account=grantee_account)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 403)
|
||||
|
||||
# Post ACL to the container
|
||||
def post(url, token, parsed, conn):
|
||||
conn.request('POST', parsed.path + '/' + self.name, '',
|
||||
{'X-Auth-Token': token,
|
||||
'X-Container-Read': acl,
|
||||
'X-Container-Write': acl})
|
||||
return check_response(conn)
|
||||
|
||||
resp = retry(post, use_account=self.account)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# Check access to container from grantee account with ACL in place
|
||||
resp = retry(get2, use_account=grantee_account)
|
||||
resp.read()
|
||||
expected = 204 if granted else 403
|
||||
self.assertEqual(resp.status, expected)
|
||||
|
||||
resp = retry(put2, use_account=grantee_account)
|
||||
resp.read()
|
||||
expected = 201 if granted else 403
|
||||
self.assertEqual(resp.status, expected)
|
||||
|
||||
# Make the container private again
|
||||
def post(url, token, parsed, conn):
|
||||
conn.request('POST', parsed.path + '/' + self.name, '',
|
||||
{'X-Auth-Token': token, 'X-Container-Read': '',
|
||||
'X-Container-Write': ''})
|
||||
return check_response(conn)
|
||||
|
||||
resp = retry(post, use_account=self.account)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 204)
|
||||
|
||||
# Ensure we can't access the container with the grantee account again
|
||||
resp = retry(get2, use_account=grantee_account)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 403)
|
||||
|
||||
resp = retry(put2, use_account=grantee_account)
|
||||
resp.read()
|
||||
self.assertEqual(resp.status, 403)
|
||||
|
||||
|
||||
class TestContainerACLsAccount1(BaseTestContainerACLs):
|
||||
def test_cross_account_acl_names_with_user_in_non_default_domain(self):
|
||||
# names in acls are disallowed when grantee is in a non-default domain
|
||||
acl = '%s:%s' % (tf.swift_test_tenant[3], tf.swift_test_user[3])
|
||||
self._assert_cross_account_acl_granted(False, 4, acl)
|
||||
|
||||
def test_cross_account_acl_ids_with_user_in_non_default_domain(self):
|
||||
# ids are allowed in acls when grantee is in a non-default domain
|
||||
tenant_id = retry(self._get_tenant_id, use_account=4)
|
||||
acl = '%s:%s' % (tenant_id, '*')
|
||||
self._assert_cross_account_acl_granted(True, 4, acl)
|
||||
|
||||
def test_cross_account_acl_names_in_default_domain(self):
|
||||
# names are allowed in acls when grantee and project are in
|
||||
# the default domain
|
||||
acl = '%s:%s' % (tf.swift_test_tenant[1], tf.swift_test_user[1])
|
||||
self._assert_cross_account_acl_granted(True, 2, acl)
|
||||
|
||||
def test_cross_account_acl_ids_in_default_domain(self):
|
||||
# ids are allowed in acls when grantee and project are in
|
||||
# the default domain
|
||||
tenant_id = retry(self._get_tenant_id, use_account=2)
|
||||
acl = '%s:%s' % (tenant_id, '*')
|
||||
self._assert_cross_account_acl_granted(True, 2, acl)
|
||||
|
||||
|
||||
class TestContainerACLsAccount4(BaseTestContainerACLs):
|
||||
account = 4
|
||||
|
||||
def test_cross_account_acl_names_with_project_in_non_default_domain(self):
|
||||
# names in acls are disallowed when project is in a non-default domain
|
||||
acl = '%s:%s' % (tf.swift_test_tenant[0], tf.swift_test_user[0])
|
||||
self._assert_cross_account_acl_granted(False, 1, acl)
|
||||
|
||||
def test_cross_account_acl_ids_with_project_in_non_default_domain(self):
|
||||
# ids are allowed in acls when project is in a non-default domain
|
||||
tenant_id = retry(self._get_tenant_id, use_account=1)
|
||||
acl = '%s:%s' % (tenant_id, '*')
|
||||
self._assert_cross_account_acl_granted(True, 1, acl)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -4,7 +4,8 @@ auth_host = 127.0.0.1
|
||||
auth_port = 8080
|
||||
auth_ssl = no
|
||||
auth_prefix = /auth/
|
||||
## sample config for Swift with Keystone
|
||||
## sample config for Swift with Keystone v2 API
|
||||
# For keystone v3 change auth_version to 3 and auth_prefix to /v3/
|
||||
#auth_version = 2
|
||||
#auth_host = localhost
|
||||
#auth_port = 5000
|
||||
@ -25,6 +26,13 @@ password2 = testing2
|
||||
username3 = tester3
|
||||
password3 = testing3
|
||||
|
||||
# Fourth user is required for keystone v3 specific tests.
|
||||
# Account must be in a non-default domain.
|
||||
#account4 = test4
|
||||
#username4 = tester4
|
||||
#password4 = testing4
|
||||
#domain4 = test-domain
|
||||
|
||||
collate = C
|
||||
|
||||
# Only necessary if a pre-exising server uses self-signed certificate
|
||||
|
@ -18,12 +18,23 @@ import unittest
|
||||
from swift.common.middleware import keystoneauth
|
||||
from swift.common.swob import Request, Response
|
||||
from swift.common.http import HTTP_FORBIDDEN
|
||||
from swift.proxy.controllers.base import _get_cache_key
|
||||
from test.unit import FakeLogger
|
||||
|
||||
UNKNOWN_ID = keystoneauth.UNKNOWN_ID
|
||||
|
||||
|
||||
def _fake_token_info(version='2'):
|
||||
if version == '2':
|
||||
return {'access': 'fake_value'}
|
||||
if version == '3':
|
||||
return {'token': 'fake_value'}
|
||||
|
||||
|
||||
class FakeApp(object):
|
||||
def __init__(self, status_headers_body_iter=None):
|
||||
self.calls = 0
|
||||
self.call_contexts = []
|
||||
self.status_headers_body_iter = status_headers_body_iter
|
||||
if not self.status_headers_body_iter:
|
||||
self.status_headers_body_iter = iter([('404 Not Found', {}, '')])
|
||||
@ -35,6 +46,9 @@ class FakeApp(object):
|
||||
resp = env['swift.authorize'](self.request)
|
||||
if resp:
|
||||
return resp(env, start_response)
|
||||
context = {'method': self.request.method,
|
||||
'headers': self.request.headers}
|
||||
self.call_contexts.append(context)
|
||||
status, headers, body = self.status_headers_body_iter.next()
|
||||
return Response(status=status, headers=headers,
|
||||
body=body)(env, start_response)
|
||||
@ -51,12 +65,23 @@ class SwiftAuth(unittest.TestCase):
|
||||
return Request.blank(path, headers=headers, **kwargs)
|
||||
|
||||
def _get_identity_headers(self, status='Confirmed', tenant_id='1',
|
||||
tenant_name='acct', user='usr', role=''):
|
||||
tenant_name='acct', project_domain_name='domA',
|
||||
project_domain_id='99',
|
||||
user_name='usr', user_id='42',
|
||||
user_domain_name='domA', user_domain_id='99',
|
||||
role='admin'):
|
||||
return dict(X_IDENTITY_STATUS=status,
|
||||
X_TENANT_ID=tenant_id,
|
||||
X_TENANT_NAME=tenant_name,
|
||||
X_PROJECT_ID=tenant_id,
|
||||
X_PROJECT_NAME=tenant_name,
|
||||
X_PROJECT_DOMAIN_ID=project_domain_id,
|
||||
X_PROJECT_DOMAIN_NAME=project_domain_name,
|
||||
X_ROLES=role,
|
||||
X_USER_NAME=user)
|
||||
X_USER_NAME=user_name,
|
||||
X_USER_ID=user_id,
|
||||
X_USER_DOMAIN_NAME=user_domain_name,
|
||||
X_USER_DOMAIN_ID=user_domain_id)
|
||||
|
||||
def _get_successful_middleware(self):
|
||||
response_iter = iter([('200 OK', {}, '')])
|
||||
@ -172,8 +197,105 @@ class SwiftAuth(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 401)
|
||||
self.assertTrue('Www-Authenticate' in resp.headers)
|
||||
|
||||
def test_project_domain_id_sysmeta_set(self):
|
||||
proj_id = '12345678'
|
||||
proj_domain_id = '13'
|
||||
headers = self._get_identity_headers(tenant_id=proj_id,
|
||||
project_domain_id=proj_domain_id)
|
||||
account = self.test_auth._get_account_for_tenant(proj_id)
|
||||
path = '/v1/' + account
|
||||
# fake cached account info
|
||||
_, info_key = _get_cache_key(account, None)
|
||||
env = {info_key: {'status': 0, 'sysmeta': {}},
|
||||
'keystone.token_info': _fake_token_info(version='3')}
|
||||
req = Request.blank(path, environ=env, headers=headers)
|
||||
req.method = 'POST'
|
||||
headers_out = {'X-Account-Sysmeta-Project-Domain-Id': proj_domain_id}
|
||||
fake_app = FakeApp(iter([('200 OK', headers_out, '')]))
|
||||
test_auth = keystoneauth.filter_factory({})(fake_app)
|
||||
resp = req.get_response(test_auth)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(len(fake_app.call_contexts), 1)
|
||||
headers_sent = fake_app.call_contexts[0]['headers']
|
||||
self.assertTrue('X-Account-Sysmeta-Project-Domain-Id' in headers_sent,
|
||||
headers_sent)
|
||||
self.assertEqual(headers_sent['X-Account-Sysmeta-Project-Domain-Id'],
|
||||
proj_domain_id)
|
||||
self.assertTrue('X-Account-Project-Domain-Id' in resp.headers)
|
||||
self.assertEqual(resp.headers['X-Account-Project-Domain-Id'],
|
||||
proj_domain_id)
|
||||
|
||||
class TestAuthorize(unittest.TestCase):
|
||||
def test_project_domain_id_sysmeta_set_to_unknown(self):
|
||||
proj_id = '12345678'
|
||||
# token scoped to a different project
|
||||
headers = self._get_identity_headers(tenant_id='87654321',
|
||||
project_domain_id='default',
|
||||
role='reselleradmin')
|
||||
account = self.test_auth._get_account_for_tenant(proj_id)
|
||||
path = '/v1/' + account
|
||||
# fake cached account info
|
||||
_, info_key = _get_cache_key(account, None)
|
||||
env = {info_key: {'status': 0, 'sysmeta': {}},
|
||||
'keystone.token_info': _fake_token_info(version='3')}
|
||||
req = Request.blank(path, environ=env, headers=headers)
|
||||
req.method = 'POST'
|
||||
fake_app = FakeApp(iter([('200 OK', {}, '')]))
|
||||
test_auth = keystoneauth.filter_factory({})(fake_app)
|
||||
resp = req.get_response(test_auth)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(len(fake_app.call_contexts), 1)
|
||||
headers_sent = fake_app.call_contexts[0]['headers']
|
||||
self.assertTrue('X-Account-Sysmeta-Project-Domain-Id' in headers_sent,
|
||||
headers_sent)
|
||||
self.assertEqual(headers_sent['X-Account-Sysmeta-Project-Domain-Id'],
|
||||
UNKNOWN_ID)
|
||||
|
||||
def test_project_domain_id_sysmeta_not_set(self):
|
||||
proj_id = '12345678'
|
||||
headers = self._get_identity_headers(tenant_id=proj_id, role='admin')
|
||||
account = self.test_auth._get_account_for_tenant(proj_id)
|
||||
path = '/v1/' + account
|
||||
_, info_key = _get_cache_key(account, None)
|
||||
# v2 token
|
||||
env = {info_key: {'status': 0, 'sysmeta': {}},
|
||||
'keystone.token_info': _fake_token_info(version='2')}
|
||||
req = Request.blank(path, environ=env, headers=headers)
|
||||
req.method = 'POST'
|
||||
fake_app = FakeApp(iter([('200 OK', {}, '')]))
|
||||
test_auth = keystoneauth.filter_factory({})(fake_app)
|
||||
resp = req.get_response(test_auth)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(len(fake_app.call_contexts), 1)
|
||||
headers_sent = fake_app.call_contexts[0]['headers']
|
||||
self.assertFalse('X-Account-Sysmeta-Project-Domain-Id' in headers_sent,
|
||||
headers_sent)
|
||||
|
||||
def test_project_domain_id_sysmeta_set_unknown_with_v2(self):
|
||||
proj_id = '12345678'
|
||||
# token scoped to a different project
|
||||
headers = self._get_identity_headers(tenant_id='87654321',
|
||||
role='reselleradmin')
|
||||
account = self.test_auth._get_account_for_tenant(proj_id)
|
||||
path = '/v1/' + account
|
||||
_, info_key = _get_cache_key(account, None)
|
||||
# v2 token
|
||||
env = {info_key: {'status': 0, 'sysmeta': {}},
|
||||
'keystone.token_info': _fake_token_info(version='2')}
|
||||
req = Request.blank(path, environ=env, headers=headers)
|
||||
req.method = 'POST'
|
||||
fake_app = FakeApp(iter([('200 OK', {}, '')]))
|
||||
test_auth = keystoneauth.filter_factory({})(fake_app)
|
||||
resp = req.get_response(test_auth)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(len(fake_app.call_contexts), 1)
|
||||
headers_sent = fake_app.call_contexts[0]['headers']
|
||||
self.assertTrue('X-Account-Sysmeta-Project-Domain-Id' in headers_sent,
|
||||
headers_sent)
|
||||
self.assertEqual(headers_sent['X-Account-Sysmeta-Project-Domain-Id'],
|
||||
UNKNOWN_ID)
|
||||
|
||||
|
||||
class BaseTestAuthorize(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.test_auth = keystoneauth.filter_factory({})(FakeApp())
|
||||
self.test_auth.logger = FakeLogger()
|
||||
@ -188,18 +310,39 @@ class TestAuthorize(unittest.TestCase):
|
||||
identity['HTTP_X_TENANT_ID'])
|
||||
|
||||
def _get_identity(self, tenant_id='tenant_id', tenant_name='tenant_name',
|
||||
user_id='user_id', user_name='user_name', roles=None):
|
||||
user_id='user_id', user_name='user_name', roles=None,
|
||||
project_domain_name='domA', project_domain_id='foo',
|
||||
user_domain_name='domA', user_domain_id='foo'):
|
||||
if roles is None:
|
||||
roles = []
|
||||
if isinstance(roles, list):
|
||||
roles = ','.join(roles)
|
||||
return {'HTTP_X_USER_ID': user_id,
|
||||
'HTTP_X_USER_NAME': user_name,
|
||||
'HTTP_X_USER_DOMAIN_NAME': user_domain_name,
|
||||
'HTTP_X_USER_DOMAIN_ID': user_domain_id,
|
||||
'HTTP_X_TENANT_ID': tenant_id,
|
||||
'HTTP_X_TENANT_NAME': tenant_name,
|
||||
'HTTP_X_PROJECT_DOMAIN_ID': project_domain_id,
|
||||
'HTTP_X_PROJECT_DOMAIN_NAME': project_domain_name,
|
||||
'HTTP_X_ROLES': roles,
|
||||
'HTTP_X_IDENTITY_STATUS': 'Confirmed'}
|
||||
|
||||
def _get_env_id(self, tenant_id='tenant_id', tenant_name='tenant_name',
|
||||
user_id='user_id', user_name='user_name', roles=[],
|
||||
project_domain_name='domA', project_domain_id='99',
|
||||
user_domain_name='domA', user_domain_id='99',
|
||||
auth_version='3'):
|
||||
env = self._get_identity(tenant_id, tenant_name, user_id, user_name,
|
||||
roles, project_domain_name,
|
||||
project_domain_id, user_domain_name,
|
||||
user_domain_id)
|
||||
token_info = _fake_token_info(version=auth_version)
|
||||
env.update({'keystone.token_info': token_info})
|
||||
return self.test_auth._integral_keystone_identity(env)
|
||||
|
||||
|
||||
class TestAuthorize(BaseTestAuthorize):
|
||||
def _check_authenticate(self, account=None, identity=None, headers=None,
|
||||
exception=None, acl=None, env=None, path=None):
|
||||
if not identity:
|
||||
@ -208,7 +351,10 @@ class TestAuthorize(unittest.TestCase):
|
||||
account = self._get_account(identity)
|
||||
if not path:
|
||||
path = '/v1/%s/c' % account
|
||||
default_env = {'REMOTE_USER': identity['HTTP_X_TENANT_ID']}
|
||||
# fake cached account info
|
||||
_, info_key = _get_cache_key(account, None)
|
||||
default_env = {'REMOTE_USER': identity['HTTP_X_TENANT_ID'],
|
||||
info_key: {'status': 200, 'sysmeta': {}}}
|
||||
default_env.update(identity)
|
||||
if env:
|
||||
default_env.update(env)
|
||||
@ -380,6 +526,49 @@ class TestAuthorize(unittest.TestCase):
|
||||
['tenantXYZ:userA']),
|
||||
None)
|
||||
|
||||
def test_cross_tenant_authorization_allow_names(self):
|
||||
# tests that the allow_names arg does the right thing
|
||||
self.assertEqual(
|
||||
self.test_auth._authorize_cross_tenant(
|
||||
'userID', 'userA', 'tenantID', 'tenantNAME',
|
||||
['tenantNAME:userA'], allow_names=True),
|
||||
'tenantNAME:userA')
|
||||
self.assertEqual(
|
||||
self.test_auth._authorize_cross_tenant(
|
||||
'userID', 'userA', 'tenantID', 'tenantNAME',
|
||||
['tenantNAME:userID'], allow_names=True),
|
||||
'tenantNAME:userID')
|
||||
self.assertEqual(
|
||||
self.test_auth._authorize_cross_tenant(
|
||||
'userID', 'userA', 'tenantID', 'tenantNAME',
|
||||
['tenantID:userA'], allow_names=True),
|
||||
'tenantID:userA')
|
||||
self.assertEqual(
|
||||
self.test_auth._authorize_cross_tenant(
|
||||
'userID', 'userA', 'tenantID', 'tenantNAME',
|
||||
['tenantID:userID'], allow_names=True),
|
||||
'tenantID:userID')
|
||||
self.assertEqual(
|
||||
self.test_auth._authorize_cross_tenant(
|
||||
'userID', 'userA', 'tenantID', 'tenantNAME',
|
||||
['tenantNAME:userA'], allow_names=False),
|
||||
None)
|
||||
self.assertEqual(
|
||||
self.test_auth._authorize_cross_tenant(
|
||||
'userID', 'userA', 'tenantID', 'tenantNAME',
|
||||
['tenantID:userA'], allow_names=False),
|
||||
None)
|
||||
self.assertEqual(
|
||||
self.test_auth._authorize_cross_tenant(
|
||||
'userID', 'userA', 'tenantID', 'tenantNAME',
|
||||
['tenantNAME:userID'], allow_names=False),
|
||||
None)
|
||||
self.assertEqual(
|
||||
self.test_auth._authorize_cross_tenant(
|
||||
'userID', 'userA', 'tenantID', 'tenantNAME',
|
||||
['tenantID:userID'], allow_names=False),
|
||||
'tenantID:userID')
|
||||
|
||||
def test_delete_own_account_not_allowed(self):
|
||||
roles = self.test_auth.operator_roles.split(',')
|
||||
identity = self._get_identity(roles=roles)
|
||||
@ -415,5 +604,576 @@ class TestAuthorize(unittest.TestCase):
|
||||
authorize_resp = the_env['swift.authorize'](subreq)
|
||||
self.assertEqual(authorize_resp, None)
|
||||
|
||||
def test_names_disallowed_in_acls_outside_default_domain(self):
|
||||
id = self._get_identity(user_domain_id='non-default',
|
||||
project_domain_id='non-default')
|
||||
env = {'keystone.token_info': _fake_token_info(version='3')}
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env,
|
||||
exception=HTTP_FORBIDDEN)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_ID'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env,
|
||||
exception=HTTP_FORBIDDEN)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_NAME'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env,
|
||||
exception=HTTP_FORBIDDEN)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env)
|
||||
|
||||
def test_names_allowed_in_acls_inside_default_domain(self):
|
||||
id = self._get_identity(user_domain_id='default',
|
||||
project_domain_id='default')
|
||||
env = {'keystone.token_info': _fake_token_info(version='3')}
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_ID'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_NAME'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env)
|
||||
|
||||
def test_names_allowed_in_acls_inside_default_domain_with_config(self):
|
||||
conf = {'allow_names_in_acls': 'yes'}
|
||||
self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
|
||||
self.test_auth.logger = FakeLogger()
|
||||
id = self._get_identity(user_domain_id='default',
|
||||
project_domain_id='default')
|
||||
env = {'keystone.token_info': _fake_token_info(version='3')}
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_ID'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_NAME'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env)
|
||||
|
||||
def test_names_disallowed_in_acls_inside_default_domain(self):
|
||||
conf = {'allow_names_in_acls': 'false'}
|
||||
self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
|
||||
self.test_auth.logger = FakeLogger()
|
||||
id = self._get_identity(user_domain_id='default',
|
||||
project_domain_id='default')
|
||||
env = {'keystone.token_info': _fake_token_info(version='3')}
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env,
|
||||
exception=HTTP_FORBIDDEN)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_ID'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env,
|
||||
exception=HTTP_FORBIDDEN)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_NAME'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env,
|
||||
exception=HTTP_FORBIDDEN)
|
||||
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
|
||||
self._check_authenticate(acl=acl, identity=id, env=env)
|
||||
|
||||
def test_integral_keystone_identity(self):
|
||||
user = ('U_ID', 'U_NAME')
|
||||
roles = ('ROLE1', 'ROLE2')
|
||||
project = ('P_ID', 'P_NAME')
|
||||
user_domain = ('UD_ID', 'UD_NAME')
|
||||
project_domain = ('PD_ID', 'PD_NAME')
|
||||
|
||||
# no valid identity info in headers
|
||||
req = Request.blank('/v/a/c/o')
|
||||
data = self.test_auth._integral_keystone_identity(req.environ)
|
||||
self.assertEqual(None, data)
|
||||
|
||||
# valid identity info in headers, but status unconfirmed
|
||||
req.headers.update({'X-Identity-Status': 'Blah',
|
||||
'X-Roles': '%s,%s' % roles,
|
||||
'X-User-Id': user[0],
|
||||
'X-User-Name': user[1],
|
||||
'X-Tenant-Id': project[0],
|
||||
'X-Tenant-Name': project[1],
|
||||
'X-User-Domain-Id': user_domain[0],
|
||||
'X-User-Domain-Name': user_domain[1],
|
||||
'X-Project-Domain-Id': project_domain[0],
|
||||
'X-Project-Domain-Name': project_domain[1]})
|
||||
data = self.test_auth._integral_keystone_identity(req.environ)
|
||||
self.assertEqual(None, data)
|
||||
|
||||
# valid identity info in headers, no token info in environ
|
||||
req.headers.update({'X-Identity-Status': 'Confirmed'})
|
||||
expected = {'user': user,
|
||||
'tenant': project,
|
||||
'roles': list(roles),
|
||||
'user_domain': (None, None),
|
||||
'project_domain': (None, None),
|
||||
'auth_version': 0}
|
||||
data = self.test_auth._integral_keystone_identity(req.environ)
|
||||
self.assertEquals(expected, data)
|
||||
|
||||
# v2 token info in environ
|
||||
req.environ['keystone.token_info'] = _fake_token_info(version='2')
|
||||
expected = {'user': user,
|
||||
'tenant': project,
|
||||
'roles': list(roles),
|
||||
'user_domain': (None, None),
|
||||
'project_domain': (None, None),
|
||||
'auth_version': 2}
|
||||
data = self.test_auth._integral_keystone_identity(req.environ)
|
||||
self.assertEquals(expected, data)
|
||||
|
||||
# v3 token info in environ
|
||||
req.environ['keystone.token_info'] = _fake_token_info(version='3')
|
||||
expected = {'user': user,
|
||||
'tenant': project,
|
||||
'roles': list(roles),
|
||||
'user_domain': user_domain,
|
||||
'project_domain': project_domain,
|
||||
'auth_version': 3}
|
||||
data = self.test_auth._integral_keystone_identity(req.environ)
|
||||
self.assertEquals(expected, data)
|
||||
|
||||
def test_get_project_domain_id(self):
|
||||
sysmeta = {}
|
||||
info = {'sysmeta': sysmeta}
|
||||
_, info_key = _get_cache_key('AUTH_1234', None)
|
||||
env = {'PATH_INFO': '/v1/AUTH_1234',
|
||||
info_key: info}
|
||||
|
||||
# account does not exist
|
||||
info['status'] = 404
|
||||
self.assertEqual(self.test_auth._get_project_domain_id(env),
|
||||
(False, None))
|
||||
info['status'] = 0
|
||||
self.assertEqual(self.test_auth._get_project_domain_id(env),
|
||||
(False, None))
|
||||
|
||||
# account exists, no project domain id in sysmeta
|
||||
info['status'] = 200
|
||||
self.assertEqual(self.test_auth._get_project_domain_id(env),
|
||||
(True, None))
|
||||
|
||||
# account exists with project domain id in sysmeta
|
||||
sysmeta['project-domain-id'] = 'default'
|
||||
self.assertEqual(self.test_auth._get_project_domain_id(env),
|
||||
(True, 'default'))
|
||||
|
||||
|
||||
class TestIsNameAllowedInACL(BaseTestAuthorize):
|
||||
def setUp(self):
|
||||
super(TestIsNameAllowedInACL, self).setUp()
|
||||
self.default_id = 'default'
|
||||
|
||||
def _assert_names_allowed(self, expected, user_domain_id=None,
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id=None,
|
||||
scoped='account'):
|
||||
project_name = 'foo'
|
||||
account_id = '12345678'
|
||||
account = self.test_auth._get_account_for_tenant(account_id)
|
||||
parts = ('v1', account, None, None)
|
||||
path = '/%s/%s' % parts[0:2]
|
||||
|
||||
sysmeta = {}
|
||||
if sysmeta_project_domain_id:
|
||||
sysmeta = {'project-domain-id': sysmeta_project_domain_id}
|
||||
|
||||
# pretend account exists
|
||||
info = {'status': 200, 'sysmeta': sysmeta}
|
||||
_, info_key = _get_cache_key(account, None)
|
||||
req = Request.blank(path, environ={info_key: info})
|
||||
|
||||
if scoped == 'account':
|
||||
project_name = 'account_name'
|
||||
project_id = account_id
|
||||
elif scoped == 'other':
|
||||
project_name = 'other_name'
|
||||
project_id = '87654321'
|
||||
else:
|
||||
# unscoped token
|
||||
project_name, project_id, req_project_domain_id = None, None, None
|
||||
|
||||
if user_domain_id:
|
||||
id = self._get_env_id(tenant_name=project_name,
|
||||
tenant_id=project_id,
|
||||
user_domain_id=user_domain_id,
|
||||
project_domain_id=req_project_domain_id)
|
||||
else:
|
||||
# must be v2 token info
|
||||
id = self._get_env_id(tenant_name=project_name,
|
||||
tenant_id=project_id,
|
||||
auth_version='2')
|
||||
|
||||
actual = self.test_auth._is_name_allowed_in_acl(req, parts, id)
|
||||
self.assertEqual(actual, expected, '%s, %s, %s, %s'
|
||||
% (user_domain_id, req_project_domain_id,
|
||||
sysmeta_project_domain_id, scoped))
|
||||
|
||||
def test_is_name_allowed_in_acl_with_token_scoped_to_tenant(self):
|
||||
# no user or project domain ids in request token so must be v2,
|
||||
# user and project should be assumed to be in default domain
|
||||
self._assert_names_allowed(True, user_domain_id=None,
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id=None)
|
||||
self._assert_names_allowed(True, user_domain_id=None,
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id=self.default_id)
|
||||
self._assert_names_allowed(True, user_domain_id=None,
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id=UNKNOWN_ID)
|
||||
self._assert_names_allowed(True, user_domain_id=None,
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id='foo')
|
||||
|
||||
# user in default domain, project domain in token info takes precedence
|
||||
self._assert_names_allowed(True, user_domain_id=self.default_id,
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=None)
|
||||
self._assert_names_allowed(True, user_domain_id=self.default_id,
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=UNKNOWN_ID)
|
||||
self._assert_names_allowed(True, user_domain_id=self.default_id,
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id='bar')
|
||||
self._assert_names_allowed(False, user_domain_id=self.default_id,
|
||||
req_project_domain_id='foo',
|
||||
sysmeta_project_domain_id=None)
|
||||
self._assert_names_allowed(False, user_domain_id=self.default_id,
|
||||
req_project_domain_id='foo',
|
||||
sysmeta_project_domain_id=self.default_id)
|
||||
self._assert_names_allowed(False, user_domain_id=self.default_id,
|
||||
req_project_domain_id='foo',
|
||||
sysmeta_project_domain_id='foo')
|
||||
|
||||
# user in non-default domain so names should never be allowed
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=None)
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=self.default_id)
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=UNKNOWN_ID)
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id='foo')
|
||||
|
||||
def test_is_name_allowed_in_acl_with_unscoped_token(self):
|
||||
# user in default domain
|
||||
self._assert_names_allowed(True, user_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=None,
|
||||
scoped=False)
|
||||
self._assert_names_allowed(True, user_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=self.default_id,
|
||||
scoped=False)
|
||||
self._assert_names_allowed(False, user_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=UNKNOWN_ID,
|
||||
scoped=False)
|
||||
self._assert_names_allowed(False, user_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id='foo',
|
||||
scoped=False)
|
||||
|
||||
# user in non-default domain so names should never be allowed
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
sysmeta_project_domain_id=None,
|
||||
scoped=False)
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
sysmeta_project_domain_id=self.default_id,
|
||||
scoped=False)
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
sysmeta_project_domain_id=UNKNOWN_ID,
|
||||
scoped=False)
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
sysmeta_project_domain_id='foo',
|
||||
scoped=False)
|
||||
|
||||
def test_is_name_allowed_in_acl_with_token_scoped_to_other_tenant(self):
|
||||
# user and scoped tenant in default domain
|
||||
self._assert_names_allowed(True, user_domain_id=self.default_id,
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=None,
|
||||
scoped='other')
|
||||
self._assert_names_allowed(True, user_domain_id=self.default_id,
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=self.default_id,
|
||||
scoped='other')
|
||||
self._assert_names_allowed(False, user_domain_id=self.default_id,
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=UNKNOWN_ID,
|
||||
scoped='other')
|
||||
self._assert_names_allowed(False, user_domain_id=self.default_id,
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id='foo',
|
||||
scoped='other')
|
||||
|
||||
# user in default domain, but scoped tenant in non-default domain
|
||||
self._assert_names_allowed(False, user_domain_id=self.default_id,
|
||||
req_project_domain_id='foo',
|
||||
sysmeta_project_domain_id=None,
|
||||
scoped='other')
|
||||
self._assert_names_allowed(False, user_domain_id=self.default_id,
|
||||
req_project_domain_id='foo',
|
||||
sysmeta_project_domain_id=self.default_id,
|
||||
scoped='other')
|
||||
self._assert_names_allowed(False, user_domain_id=self.default_id,
|
||||
req_project_domain_id='foo',
|
||||
sysmeta_project_domain_id=UNKNOWN_ID,
|
||||
scoped='other')
|
||||
self._assert_names_allowed(False, user_domain_id=self.default_id,
|
||||
req_project_domain_id='foo',
|
||||
sysmeta_project_domain_id='foo',
|
||||
scoped='other')
|
||||
|
||||
# user in non-default domain, scoped tenant in default domain
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=None,
|
||||
scoped='other')
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=self.default_id,
|
||||
scoped='other')
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id=UNKNOWN_ID,
|
||||
scoped='other')
|
||||
self._assert_names_allowed(False, user_domain_id='foo',
|
||||
req_project_domain_id=self.default_id,
|
||||
sysmeta_project_domain_id='foo',
|
||||
scoped='other')
|
||||
|
||||
|
||||
class TestIsNameAllowedInACLWithConfiguredDomain(TestIsNameAllowedInACL):
|
||||
def setUp(self):
|
||||
super(TestIsNameAllowedInACLWithConfiguredDomain, self).setUp()
|
||||
conf = {'default_domain_id': 'mydefault'}
|
||||
self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
|
||||
self.test_auth.logger = FakeLogger()
|
||||
self.default_id = 'mydefault'
|
||||
|
||||
|
||||
class TestSetProjectDomain(BaseTestAuthorize):
|
||||
def _assert_set_project_domain(self, expected, account, req_project_id,
|
||||
req_project_domain_id,
|
||||
sysmeta_project_domain_id,
|
||||
warning=False):
|
||||
hdr = 'X-Account-Sysmeta-Project-Domain-Id'
|
||||
|
||||
# set up fake account info in req env
|
||||
status = 0 if sysmeta_project_domain_id is None else 200
|
||||
sysmeta = {}
|
||||
if sysmeta_project_domain_id:
|
||||
sysmeta['project-domain-id'] = sysmeta_project_domain_id
|
||||
info = {'status': status, 'sysmeta': sysmeta}
|
||||
_, info_key = _get_cache_key(account, None)
|
||||
env = {info_key: info}
|
||||
|
||||
# create fake env identity
|
||||
env_id = self._get_env_id(tenant_id=req_project_id,
|
||||
project_domain_id=req_project_domain_id)
|
||||
|
||||
# reset fake logger
|
||||
self.test_auth.logger = FakeLogger()
|
||||
num_warnings = 0
|
||||
|
||||
# check account requests
|
||||
path = '/v1/%s' % account
|
||||
for method in ['PUT', 'POST']:
|
||||
req = Request.blank(path, environ=env)
|
||||
req.method = method
|
||||
path_parts = req.split_path(1, 4, True)
|
||||
self.test_auth._set_project_domain_id(req, path_parts, env_id)
|
||||
if warning:
|
||||
num_warnings += 1
|
||||
warnings = self.test_auth.logger.get_lines_for_level('warning')
|
||||
self.assertEqual(len(warnings), num_warnings)
|
||||
self.assertTrue(warnings[-1].startswith('Inconsistent proj'))
|
||||
if expected is not None:
|
||||
self.assertTrue(hdr in req.headers)
|
||||
self.assertEqual(req.headers[hdr], expected)
|
||||
else:
|
||||
self.assertFalse(hdr in req.headers, req.headers)
|
||||
|
||||
for method in ['GET', 'HEAD', 'DELETE', 'OPTIONS']:
|
||||
req = Request.blank(path, environ=env)
|
||||
req.method = method
|
||||
self.test_auth._set_project_domain_id(req, path_parts, env_id)
|
||||
self.assertFalse(hdr in req.headers)
|
||||
|
||||
# check container requests
|
||||
path = '/v1/%s/c' % account
|
||||
for method in ['PUT']:
|
||||
req = Request.blank(path, environ=env)
|
||||
req.method = method
|
||||
path_parts = req.split_path(1, 4, True)
|
||||
self.test_auth._set_project_domain_id(req, path_parts, env_id)
|
||||
if warning:
|
||||
num_warnings += 1
|
||||
warnings = self.test_auth.logger.get_lines_for_level('warning')
|
||||
self.assertEqual(len(warnings), num_warnings)
|
||||
self.assertTrue(warnings[-1].startswith('Inconsistent proj'))
|
||||
if expected is not None:
|
||||
self.assertTrue(hdr in req.headers)
|
||||
self.assertEqual(req.headers[hdr], expected)
|
||||
else:
|
||||
self.assertFalse(hdr in req.headers)
|
||||
|
||||
for method in ['POST', 'GET', 'HEAD', 'DELETE', 'OPTIONS']:
|
||||
req = Request.blank(path, environ=env)
|
||||
req.method = method
|
||||
self.test_auth._set_project_domain_id(req, path_parts, env_id)
|
||||
self.assertFalse(hdr in req.headers)
|
||||
|
||||
# never set for object requests
|
||||
path = '/v1/%s/c/o' % account
|
||||
for method in ['PUT', 'COPY', 'POST', 'GET', 'HEAD', 'DELETE',
|
||||
'OPTIONS']:
|
||||
req = Request.blank(path, environ=env)
|
||||
req.method = method
|
||||
path_parts = req.split_path(1, 4, True)
|
||||
self.test_auth._set_project_domain_id(req, path_parts, env_id)
|
||||
self.assertFalse(hdr in req.headers)
|
||||
|
||||
def test_set_project_domain_id_new_account(self):
|
||||
# scoped token with project domain info
|
||||
self._assert_set_project_domain('test_id',
|
||||
account='AUTH_1234',
|
||||
req_project_id='1234',
|
||||
req_project_domain_id='test_id',
|
||||
sysmeta_project_domain_id=None)
|
||||
|
||||
# scoped v2 token without project domain id
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id='1234',
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id=None)
|
||||
|
||||
# unscoped v2 token without project domain id
|
||||
self._assert_set_project_domain(UNKNOWN_ID,
|
||||
account='AUTH_1234',
|
||||
req_project_id=None,
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id=None)
|
||||
|
||||
# token scoped on another project
|
||||
self._assert_set_project_domain(UNKNOWN_ID,
|
||||
account='AUTH_1234',
|
||||
req_project_id='4321',
|
||||
req_project_domain_id='default',
|
||||
sysmeta_project_domain_id=None)
|
||||
|
||||
def test_set_project_domain_id_existing_v2_account(self):
|
||||
# project domain id provided in scoped request token,
|
||||
# update empty value
|
||||
self._assert_set_project_domain('default',
|
||||
account='AUTH_1234',
|
||||
req_project_id='1234',
|
||||
req_project_domain_id='default',
|
||||
sysmeta_project_domain_id='')
|
||||
|
||||
# inconsistent project domain id provided in scoped request token,
|
||||
# leave known value
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id='1234',
|
||||
req_project_domain_id='unexpected_id',
|
||||
sysmeta_project_domain_id='',
|
||||
warning=True)
|
||||
|
||||
# project domain id not provided, scoped request token,
|
||||
# no change to empty value
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id='1234',
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id='')
|
||||
|
||||
# unscoped request token, no change to empty value
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id=None,
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id='')
|
||||
|
||||
# token scoped on another project,
|
||||
# update empty value
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id='4321',
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id='')
|
||||
|
||||
def test_set_project_domain_id_existing_account_unknown_domain(self):
|
||||
|
||||
# project domain id provided in scoped request token,
|
||||
# set known value
|
||||
self._assert_set_project_domain('test_id',
|
||||
account='AUTH_1234',
|
||||
req_project_id='1234',
|
||||
req_project_domain_id='test_id',
|
||||
sysmeta_project_domain_id=UNKNOWN_ID)
|
||||
|
||||
# project domain id not provided, scoped request token,
|
||||
# set empty value
|
||||
self._assert_set_project_domain('',
|
||||
account='AUTH_1234',
|
||||
req_project_id='1234',
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id=UNKNOWN_ID)
|
||||
|
||||
# project domain id not provided, unscoped request token,
|
||||
# leave unknown value
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id=None,
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id=UNKNOWN_ID)
|
||||
|
||||
# token scoped on another project, leave unknown value
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id='4321',
|
||||
req_project_domain_id='default',
|
||||
sysmeta_project_domain_id=UNKNOWN_ID)
|
||||
|
||||
def test_set_project_domain_id_existing_known_domain(self):
|
||||
# project domain id provided in scoped request token,
|
||||
# leave known value
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id='1234',
|
||||
req_project_domain_id='test_id',
|
||||
sysmeta_project_domain_id='test_id')
|
||||
|
||||
# inconsistent project domain id provided in scoped request token,
|
||||
# leave known value
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id='1234',
|
||||
req_project_domain_id='unexpected_id',
|
||||
sysmeta_project_domain_id='test_id',
|
||||
warning=True)
|
||||
|
||||
# project domain id not provided, scoped request token,
|
||||
# leave known value
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id='1234',
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id='test_id')
|
||||
|
||||
# project domain id not provided, unscoped request token,
|
||||
# leave known value
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id=None,
|
||||
req_project_domain_id=None,
|
||||
sysmeta_project_domain_id='test_id')
|
||||
|
||||
# project domain id not provided, token scoped on another project,
|
||||
# leave known value
|
||||
self._assert_set_project_domain(None,
|
||||
account='AUTH_1234',
|
||||
req_project_id='4321',
|
||||
req_project_domain_id='default',
|
||||
sysmeta_project_domain_id='test_id')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -317,6 +317,17 @@ def set_http_connect(*args, **kwargs):
|
||||
return new_connect
|
||||
|
||||
|
||||
def _make_callback_func(calls):
|
||||
def callback(ipaddr, port, device, partition, method, path,
|
||||
headers=None, query_string=None, ssl=False):
|
||||
context = {}
|
||||
context['method'] = method
|
||||
context['path'] = path
|
||||
context['headers'] = headers or {}
|
||||
calls.append(context)
|
||||
return callback
|
||||
|
||||
|
||||
# tests
|
||||
class TestController(unittest.TestCase):
|
||||
|
||||
@ -5411,6 +5422,47 @@ class TestContainerController(unittest.TestCase):
|
||||
503, 201, 201), # put container success
|
||||
201, missing_container=True)
|
||||
|
||||
def test_PUT_autocreate_account_with_sysmeta(self):
|
||||
# x-account-sysmeta headers in a container PUT request should be
|
||||
# transferred to the account autocreate PUT request
|
||||
with save_globals():
|
||||
controller = proxy_server.ContainerController(self.app, 'account',
|
||||
'container')
|
||||
|
||||
def test_status_map(statuses, expected, headers=None, **kwargs):
|
||||
set_http_connect(*statuses, **kwargs)
|
||||
self.app.memcache.store = {}
|
||||
req = Request.blank('/v1/a/c', {}, headers=headers)
|
||||
req.content_length = 0
|
||||
self.app.update_request(req)
|
||||
res = controller.PUT(req)
|
||||
expected = str(expected)
|
||||
self.assertEquals(res.status[:len(expected)], expected)
|
||||
|
||||
self.app.account_autocreate = True
|
||||
calls = []
|
||||
callback = _make_callback_func(calls)
|
||||
key, value = 'X-Account-Sysmeta-Blah', 'something'
|
||||
headers = {key: value}
|
||||
|
||||
# all goes according to plan
|
||||
test_status_map(
|
||||
(404, 404, 404, # account_info fails on 404
|
||||
201, 201, 201, # PUT account
|
||||
200, # account_info success
|
||||
201, 201, 201), # put container success
|
||||
201, missing_container=True,
|
||||
headers=headers,
|
||||
give_connect=callback)
|
||||
|
||||
self.assertEqual(10, len(calls))
|
||||
for call in calls[3:6]:
|
||||
self.assertEqual('/account', call['path'])
|
||||
self.assertTrue(key in call['headers'],
|
||||
'%s call, key %s missing in headers %s' %
|
||||
(call['method'], key, call['headers']))
|
||||
self.assertEqual(value, call['headers'][key])
|
||||
|
||||
def test_POST(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.ContainerController(self.app, 'account',
|
||||
@ -6244,10 +6296,12 @@ class TestAccountController(unittest.TestCase):
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing())
|
||||
|
||||
def assert_status_map(self, method, statuses, expected, env_expected=None):
|
||||
def assert_status_map(self, method, statuses, expected, env_expected=None,
|
||||
headers=None, **kwargs):
|
||||
headers = headers or {}
|
||||
with save_globals():
|
||||
set_http_connect(*statuses)
|
||||
req = Request.blank('/v1/a', {})
|
||||
set_http_connect(*statuses, **kwargs)
|
||||
req = Request.blank('/v1/a', {}, headers=headers)
|
||||
self.app.update_request(req)
|
||||
res = method(req)
|
||||
self.assertEquals(res.status_int, expected)
|
||||
@ -6406,6 +6460,33 @@ class TestAccountController(unittest.TestCase):
|
||||
controller.POST,
|
||||
(404, 404, 404, 403, 403, 403, 400, 400, 400), 400)
|
||||
|
||||
def test_POST_autocreate_with_sysmeta(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.AccountController(self.app, 'account')
|
||||
self.app.memcache = FakeMemcacheReturnsNone()
|
||||
# first test with autocreate being False
|
||||
self.assertFalse(self.app.account_autocreate)
|
||||
self.assert_status_map(controller.POST,
|
||||
(404, 404, 404), 404)
|
||||
# next turn it on and test account being created than updated
|
||||
controller.app.account_autocreate = True
|
||||
calls = []
|
||||
callback = _make_callback_func(calls)
|
||||
key, value = 'X-Account-Sysmeta-Blah', 'something'
|
||||
headers = {key: value}
|
||||
self.assert_status_map(
|
||||
controller.POST,
|
||||
(404, 404, 404, 202, 202, 202, 201, 201, 201), 201,
|
||||
# POST , autocreate PUT, POST again
|
||||
headers=headers,
|
||||
give_connect=callback)
|
||||
self.assertEqual(9, len(calls))
|
||||
for call in calls:
|
||||
self.assertTrue(key in call['headers'],
|
||||
'%s call, key %s missing in headers %s' %
|
||||
(call['method'], key, call['headers']))
|
||||
self.assertEqual(value, call['headers'][key])
|
||||
|
||||
def test_connection_refused(self):
|
||||
self.app.account_ring.get_nodes('account')
|
||||
for dev in self.app.account_ring.devs:
|
||||
|
Loading…
Reference in New Issue
Block a user