# Copyright (c) 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

from swift.common.middleware import keystoneauth
from swift.common.swob import Request, Response
from swift.common.http import HTTP_FORBIDDEN
from swift.common.utils import split_path
from swift.proxy.controllers.base import get_cache_key
from test.unit import FakeLogger

UNKNOWN_ID = keystoneauth.UNKNOWN_ID


def _fake_token_info(version='2'):
    if version == '2':
        return {'access': 'fake_value'}
    if version == '3':
        return {'token': 'fake_value'}


def operator_roles(test_auth):
    # Return copy -- not a reference
    return list(test_auth.account_rules[test_auth.reseller_prefixes[0]].get(
                'operator_roles'))


def get_account_for_tenant(test_auth, tenant_id):
        """Convenience function reduces unit test churn"""
        return '%s%s' % (test_auth.reseller_prefixes[0], tenant_id)


def get_identity_headers(status='Confirmed', tenant_id='1',
                         tenant_name='acct', project_domain_name='domA',
                         project_domain_id='99',
                         user_name='usr', user_id='42',
                         user_domain_name='domA', user_domain_id='99',
                         role='admin',
                         service_role=None):
    if role is None:
        role = []
    if isinstance(role, list):
        role = ','.join(role)
    res = dict(X_IDENTITY_STATUS=status,
               X_TENANT_ID=tenant_id,
               X_TENANT_NAME=tenant_name,
               X_PROJECT_ID=tenant_id,
               X_PROJECT_NAME=tenant_name,
               X_PROJECT_DOMAIN_ID=project_domain_id,
               X_PROJECT_DOMAIN_NAME=project_domain_name,
               X_ROLES=role,
               X_USER_NAME=user_name,
               X_USER_ID=user_id,
               X_USER_DOMAIN_NAME=user_domain_name,
               X_USER_DOMAIN_ID=user_domain_id)
    if service_role:
        res.update(X_SERVICE_ROLES=service_role)
    return res


class FakeApp(object):
    def __init__(self, status_headers_body_iter=None):
        self.calls = 0
        self.call_contexts = []
        self.status_headers_body_iter = status_headers_body_iter
        if not self.status_headers_body_iter:
            self.status_headers_body_iter = iter([('404 Not Found', {}, '')])

    def __call__(self, env, start_response):
        self.calls += 1
        self.request = Request.blank('', environ=env)
        if 'swift.authorize' in env:
            resp = env['swift.authorize'](self.request)
            if resp:
                return resp(env, start_response)
        context = {'method': self.request.method,
                   'headers': self.request.headers}
        self.call_contexts.append(context)
        status, headers, body = next(self.status_headers_body_iter)
        return Response(status=status, headers=headers,
                        body=body)(env, start_response)


class SwiftAuth(unittest.TestCase):
    def setUp(self):
        self.test_auth = keystoneauth.filter_factory({})(FakeApp())
        self.test_auth.logger = FakeLogger()

    def _make_request(self, path=None, headers=None, **kwargs):
        if not path:
            path = '/v1/%s/c/o' % get_account_for_tenant(self.test_auth, 'foo')
        return Request.blank(path, headers=headers, **kwargs)

    def _get_successful_middleware(self):
        response_iter = iter([('200 OK', {}, '')])
        return keystoneauth.filter_factory({})(FakeApp(response_iter))

    def test_invalid_request_authorized(self):
        role = self.test_auth.reseller_admin_role
        headers = get_identity_headers(role=role)
        req = self._make_request('/', headers=headers)
        resp = req.get_response(self._get_successful_middleware())
        self.assertEqual(resp.status_int, 404)

    def test_invalid_request_non_authorized(self):
        req = self._make_request('/')
        resp = req.get_response(self._get_successful_middleware())
        self.assertEqual(resp.status_int, 404)

    def test_confirmed_identity_is_authorized(self):
        role = self.test_auth.reseller_admin_role
        headers = get_identity_headers(role=role)
        req = self._make_request('/v1/AUTH_acct/c', headers)
        resp = req.get_response(self._get_successful_middleware())
        self.assertEqual(resp.status_int, 200)

    def test_detect_reseller_request(self):
        role = self.test_auth.reseller_admin_role
        headers = get_identity_headers(role=role)
        req = self._make_request('/v1/AUTH_acct/c', headers)
        req.get_response(self._get_successful_middleware())
        self.assertTrue(req.environ.get('reseller_request'))

    def test_confirmed_identity_is_not_authorized(self):
        headers = get_identity_headers()
        req = self._make_request('/v1/AUTH_acct/c', headers)
        resp = req.get_response(self.test_auth)
        self.assertEqual(resp.status_int, 403)

    def test_anonymous_is_authorized_for_permitted_referrer(self):
        req = self._make_request(headers={'X_IDENTITY_STATUS': 'Invalid'})
        req.acl = '.r:*'
        resp = req.get_response(self._get_successful_middleware())
        self.assertEqual(resp.status_int, 200)

    def test_anonymous_with_validtoken_authorized_for_permitted_referrer(self):
        req = self._make_request(headers={'X_IDENTITY_STATUS': 'Confirmed'})
        req.acl = '.r:*'
        resp = req.get_response(self._get_successful_middleware())
        self.assertEqual(resp.status_int, 200)

    def test_anonymous_is_not_authorized_for_unknown_reseller_prefix(self):
        req = self._make_request(path='/v1/BLAH_foo/c/o',
                                 headers={'X_IDENTITY_STATUS': 'Invalid'})
        resp = req.get_response(self.test_auth)
        self.assertEqual(resp.status_int, 401)

    def test_denied_responses(self):

        def get_resp_status(headers):
            req = self._make_request(headers=headers)
            resp = req.get_response(self.test_auth)
            return resp.status_int

        self.assertEqual(get_resp_status({'X_IDENTITY_STATUS': 'Confirmed'}),
                         403)
        self.assertEqual(get_resp_status(
                         {'X_IDENTITY_STATUS': 'Confirmed',
                          'X_SERVICE_IDENTITY_STATUS': 'Confirmed'}), 403)
        self.assertEqual(get_resp_status({}), 401)
        self.assertEqual(get_resp_status(
                         {'X_IDENTITY_STATUS': 'Invalid'}), 401)
        self.assertEqual(get_resp_status(
                         {'X_IDENTITY_STATUS': 'Invalid',
                          'X_SERVICE_IDENTITY_STATUS': 'Confirmed'}), 401)
        self.assertEqual(get_resp_status(
                         {'X_IDENTITY_STATUS': 'Confirmed',
                          'X_SERVICE_IDENTITY_STATUS': 'Invalid'}), 401)
        self.assertEqual(get_resp_status(
                         {'X_IDENTITY_STATUS': 'Invalid',
                          'X_SERVICE_IDENTITY_STATUS': 'Invalid'}), 401)

    def test_blank_reseller_prefix(self):
        conf = {'reseller_prefix': ''}
        test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        account = tenant_id = 'foo'
        self.assertTrue(test_auth._account_matches_tenant(account, tenant_id))

    def test_reseller_prefix_added_underscore(self):
        conf = {'reseller_prefix': 'AUTH'}
        test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        self.assertEqual(test_auth.reseller_prefixes[0], "AUTH_")

    def test_reseller_prefix_not_added_double_underscores(self):
        conf = {'reseller_prefix': 'AUTH_'}
        test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        self.assertEqual(test_auth.reseller_prefixes[0], "AUTH_")

    def test_override_asked_for_but_not_allowed(self):
        conf = {'allow_overrides': 'false'}
        self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        req = self._make_request('/v1/AUTH_account',
                                 environ={'swift.authorize_override': True})
        resp = req.get_response(self.test_auth)
        self.assertEqual(resp.status_int, 401)

    def test_override_asked_for_and_allowed(self):
        conf = {'allow_overrides': 'true'}
        self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        req = self._make_request('/v1/AUTH_account',
                                 environ={'swift.authorize_override': True})
        resp = req.get_response(self.test_auth)
        self.assertEqual(resp.status_int, 404)

    def test_override_default_allowed(self):
        req = self._make_request('/v1/AUTH_account',
                                 environ={'swift.authorize_override': True})
        resp = req.get_response(self.test_auth)
        self.assertEqual(resp.status_int, 404)

    def test_anonymous_options_allowed(self):
        req = self._make_request('/v1/AUTH_account',
                                 environ={'REQUEST_METHOD': 'OPTIONS'})
        resp = req.get_response(self._get_successful_middleware())
        self.assertEqual(resp.status_int, 200)

    def test_identified_options_allowed(self):
        headers = get_identity_headers()
        headers['REQUEST_METHOD'] = 'OPTIONS'
        req = self._make_request('/v1/AUTH_account',
                                 headers=get_identity_headers(),
                                 environ={'REQUEST_METHOD': 'OPTIONS'})
        resp = req.get_response(self._get_successful_middleware())
        self.assertEqual(resp.status_int, 200)

    def test_auth_scheme(self):
        req = self._make_request(path='/v1/BLAH_foo/c/o',
                                 headers={'X_IDENTITY_STATUS': 'Invalid'})
        resp = req.get_response(self.test_auth)
        self.assertEqual(resp.status_int, 401)
        self.assertTrue('Www-Authenticate' in resp.headers)

    def test_project_domain_id_sysmeta_set(self):
        proj_id = '12345678'
        proj_domain_id = '13'
        headers = get_identity_headers(tenant_id=proj_id,
                                       project_domain_id=proj_domain_id)
        account = get_account_for_tenant(self.test_auth, proj_id)
        path = '/v1/' + account
        # fake cached account info
        info_key = get_cache_key(account)
        env = {'swift.infocache': {info_key: {'status': 0, 'sysmeta': {}}},
               'keystone.token_info': _fake_token_info(version='3')}
        req = Request.blank(path, environ=env, headers=headers)
        req.method = 'POST'
        headers_out = {'X-Account-Sysmeta-Project-Domain-Id': proj_domain_id}
        fake_app = FakeApp(iter([('200 OK', headers_out, '')]))
        test_auth = keystoneauth.filter_factory({})(fake_app)
        resp = req.get_response(test_auth)
        self.assertEqual(resp.status_int, 200)
        self.assertEqual(len(fake_app.call_contexts), 1)
        headers_sent = fake_app.call_contexts[0]['headers']
        self.assertTrue('X-Account-Sysmeta-Project-Domain-Id' in headers_sent,
                        headers_sent)
        self.assertEqual(headers_sent['X-Account-Sysmeta-Project-Domain-Id'],
                         proj_domain_id)
        self.assertTrue('X-Account-Project-Domain-Id' in resp.headers)
        self.assertEqual(resp.headers['X-Account-Project-Domain-Id'],
                         proj_domain_id)

    def test_project_domain_id_sysmeta_set_to_unknown(self):
        proj_id = '12345678'
        # token scoped to a different project
        headers = get_identity_headers(tenant_id='87654321',
                                       project_domain_id='default',
                                       role='reselleradmin')
        account = get_account_for_tenant(self.test_auth, proj_id)
        path = '/v1/' + account
        # fake cached account info
        info_key = get_cache_key(account)
        env = {'swift.infocache': {info_key: {'status': 0, 'sysmeta': {}}},
               'keystone.token_info': _fake_token_info(version='3')}
        req = Request.blank(path, environ=env, headers=headers)
        req.method = 'POST'
        fake_app = FakeApp(iter([('200 OK', {}, '')]))
        test_auth = keystoneauth.filter_factory({})(fake_app)
        resp = req.get_response(test_auth)
        self.assertEqual(resp.status_int, 200)
        self.assertEqual(len(fake_app.call_contexts), 1)
        headers_sent = fake_app.call_contexts[0]['headers']
        self.assertTrue('X-Account-Sysmeta-Project-Domain-Id' in headers_sent,
                        headers_sent)
        self.assertEqual(headers_sent['X-Account-Sysmeta-Project-Domain-Id'],
                         UNKNOWN_ID)

    def test_project_domain_id_sysmeta_not_set(self):
        proj_id = '12345678'
        headers = get_identity_headers(tenant_id=proj_id, role='admin')
        account = get_account_for_tenant(self.test_auth, proj_id)
        path = '/v1/' + account
        info_key = get_cache_key(account)
        # v2 token
        env = {'swift.infocache': {info_key: {'status': 0, 'sysmeta': {}}},
               'keystone.token_info': _fake_token_info(version='2')}
        req = Request.blank(path, environ=env, headers=headers)
        req.method = 'POST'
        fake_app = FakeApp(iter([('200 OK', {}, '')]))
        test_auth = keystoneauth.filter_factory({})(fake_app)
        resp = req.get_response(test_auth)
        self.assertEqual(resp.status_int, 200)
        self.assertEqual(len(fake_app.call_contexts), 1)
        headers_sent = fake_app.call_contexts[0]['headers']
        self.assertFalse('X-Account-Sysmeta-Project-Domain-Id' in headers_sent,
                         headers_sent)

    def test_project_domain_id_sysmeta_set_unknown_with_v2(self):
        proj_id = '12345678'
        # token scoped to a different project
        headers = get_identity_headers(tenant_id='87654321',
                                       role='reselleradmin')
        account = get_account_for_tenant(self.test_auth, proj_id)
        path = '/v1/' + account
        info_key = get_cache_key(account)
        # v2 token
        env = {'swift.infocache': {info_key: {'status': 0, 'sysmeta': {}}},
               'keystone.token_info': _fake_token_info(version='2')}
        req = Request.blank(path, environ=env, headers=headers)
        req.method = 'POST'
        fake_app = FakeApp(iter([('200 OK', {}, '')]))
        test_auth = keystoneauth.filter_factory({})(fake_app)
        resp = req.get_response(test_auth)
        self.assertEqual(resp.status_int, 200)
        self.assertEqual(len(fake_app.call_contexts), 1)
        headers_sent = fake_app.call_contexts[0]['headers']
        self.assertTrue('X-Account-Sysmeta-Project-Domain-Id' in headers_sent,
                        headers_sent)
        self.assertEqual(headers_sent['X-Account-Sysmeta-Project-Domain-Id'],
                         UNKNOWN_ID)


class SwiftAuthMultiple(SwiftAuth):
    """Runs same tests as SwiftAuth with multiple reseller prefixes

    Runs SwiftAuth tests while a second reseller prefix item exists.
    Validates that there is no regression against the original
    single prefix configuration.
    """

    def setUp(self):
        self.test_auth = keystoneauth.filter_factory(
            {'reseller_prefix': 'AUTH, PRE2'})(FakeApp())
        self.test_auth.logger = FakeLogger()


class ServiceTokenFunctionality(unittest.TestCase):

    def _make_authed_request(self, conf, project_id, path, method='GET',
                             user_role='admin', service_role=None,
                             environ=None):
        """Make a request with keystoneauth as auth

        By default, acts as though the user had presented a token
        containing the 'admin' role in X-Auth-Token scoped to the specified
        project_id.

        :param conf: configuration for keystoneauth
        :param project_id: the project_id of the token
        :param path: the path of the request
        :param method: the method (defaults to GET)
        :param user_role: the role of X-Auth-Token (defaults to 'admin')
        :param service_role: the role in X-Service-Token (defaults to none)
        :param environ: a dict of items to be added to the request environ
                       (defaults to none)

        :returns: response object
        """
        headers = get_identity_headers(tenant_id=project_id,
                                       role=user_role,
                                       service_role=service_role)
        (version, account, _junk, _junk) = split_path(path, 2, 4, True)
        info_key = get_cache_key(account)
        env = {'swift.infocache': {info_key: {'status': 0, 'sysmeta': {}}},
               'keystone.token_info': _fake_token_info(version='2')}
        if environ:
            env.update(environ)
        req = Request.blank(path, environ=env, headers=headers)
        req.method = method
        fake_app = FakeApp(iter([('200 OK', {}, '')]))
        test_auth = keystoneauth.filter_factory(conf)(fake_app)
        resp = req.get_response(test_auth)
        return resp

    def test_existing_swift_owner_ignored(self):
        # a request without admin role is denied
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678',
            environ={'swift_owner': False},
            user_role='something_else')
        self.assertEqual(resp.status_int, 403)

        # ... even when swift_owner has previously been set True in request env
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678',
            environ={'swift_owner': True},
            user_role='something_else')
        self.assertEqual(resp.status_int, 403)

        # a request with admin role but to different account prefix is denied
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH'}, '12345678', '/v1/SERVICE_12345678',
            environ={'swift_owner': False})
        self.assertEqual(resp.status_int, 403)

        # ... even when swift_owner has previously been set True in request env
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH'}, '12345678', '/v1/SERVICE_12345678',
            environ={'swift_owner': True})
        self.assertEqual(resp.status_int, 403)

    def test_unknown_prefix(self):
        resp = self._make_authed_request({}, '12345678', '/v1/BLAH_12345678')
        self.assertEqual(resp.status_int, 403)
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH, PRE2'}, '12345678', '/v1/BLAH_12345678')
        self.assertEqual(resp.status_int, 403)

    def test_authed_for_path_single(self):
        resp = self._make_authed_request({}, '12345678', '/v1/AUTH_12345678')
        self.assertEqual(resp.status_int, 200)
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678')
        self.assertEqual(resp.status_int, 200)
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678/c')
        self.assertEqual(resp.status_int, 200)
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678',
            user_role='ResellerAdmin')
        self.assertEqual(resp.status_int, 200)
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_anything',
            user_role='ResellerAdmin')
        self.assertEqual(resp.status_int, 200)

    def test_denied_for_path_single(self):
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_789')
        self.assertEqual(resp.status_int, 403)
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678',
            user_role='something_else')
        self.assertEqual(resp.status_int, 403)
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678',
            method='DELETE')
        self.assertEqual(resp.status_int, 403)

    def test_authed_for_primary_path_multiple(self):
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH, PRE2',
             'PRE2_service_roles': 'service'},
            '12345678', '/v1/AUTH_12345678')
        self.assertEqual(resp.status_int, 200)

    def test_denied_for_second_path_with_only_operator_role(self):
        # User only presents X-Auth-Token
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH, PRE2',
             'PRE2_service_roles': 'service'},
            '12345678', '/v1/PRE2_12345678')
        self.assertEqual(resp.status_int, 403)

        # User puts token in X-Service-Token
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH, PRE2',
             'PRE2_service_roles': 'service'},
            '12345678', '/v1/PRE2_12345678',
            user_role='', service_role='admin')
        self.assertEqual(resp.status_int, 403)

        # User puts token in both X-Auth-Token and X-Service-Token
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH, PRE2',
             'PRE2_service_roles': 'service'},
            '12345678', '/v1/PRE2_12345678',
            user_role='admin', service_role='admin')
        self.assertEqual(resp.status_int, 403)

    def test_authed_for_second_path_with_operator_role_and_service(self):
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH, PRE2',
             'PRE2_service_roles': 'service'},
            '12345678', '/v1/PRE2_12345678', service_role='service')
        self.assertEqual(resp.status_int, 200)

    def test_denied_for_second_path_with_only_service(self):
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH, PRE2',
             'PRE2_service_roles': 'service'},
            '12345678', '/v1/PRE2_12345678', user_role='something_else',
            service_role='service')
        self.assertEqual(resp.status_int, 403)

    def test_denied_for_second_path_for_service_user(self):
        # User presents token with 'service' role in X-Auth-Token
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH, PRE2',
             'PRE2_service_roles': 'service'},
            '12345678', '/v1/PRE2_12345678', user_role='service')
        self.assertEqual(resp.status_int, 403)

        # User presents token with 'service' role in X-Auth-Token
        # and also in X-Service-Token
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH, PRE2',
             'PRE2_service_roles': 'service'},
            '12345678', '/v1/PRE2_12345678', user_role='service',
            service_role='service')
        self.assertEqual(resp.status_int, 403)

    def test_delete_denied_for_second_path(self):
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH, PRE2',
             'PRE2_service_roles': 'service'},
            '12345678', '/v1/PRE2_12345678', service_role='service',
            method='DELETE')
        self.assertEqual(resp.status_int, 403)

    def test_delete_of_second_path_by_reseller_admin(self):
        resp = self._make_authed_request(
            {'reseller_prefix': 'AUTH, PRE2',
             'PRE2_service_roles': 'service'},
            '12345678', '/v1/PRE2_12345678', user_role='ResellerAdmin',
            method='DELETE')
        self.assertEqual(resp.status_int, 200)


class BaseTestAuthorize(unittest.TestCase):
    def setUp(self):
        self.test_auth = keystoneauth.filter_factory({})(FakeApp())
        self.test_auth.logger = FakeLogger()

    def _make_request(self, path, **kwargs):
        return Request.blank(path, **kwargs)

    def _get_account(self, identity=None):
        if not identity:
            identity = self._get_identity()
        return get_account_for_tenant(self.test_auth,
                                      identity.get('HTTP_X_PROJECT_ID') or
                                      identity.get('HTTP_X_TENANT_ID'))

    def _get_identity(self, tenant_id='tenant_id', tenant_name='tenant_name',
                      user_id='user_id', user_name='user_name', roles=None,
                      project_domain_name='domA', project_domain_id='foo',
                      user_domain_name='domA', user_domain_id='foo'):
        if roles is None:
            roles = []
        if isinstance(roles, list):
            roles = ','.join(roles)
        return {'HTTP_X_USER_ID': user_id,
                'HTTP_X_USER_NAME': user_name,
                'HTTP_X_USER_DOMAIN_NAME': user_domain_name,
                'HTTP_X_USER_DOMAIN_ID': user_domain_id,
                'HTTP_X_PROJECT_ID': tenant_id,
                'HTTP_X_PROJECT_NAME': tenant_name,
                'HTTP_X_PROJECT_DOMAIN_ID': project_domain_id,
                'HTTP_X_PROJECT_DOMAIN_NAME': project_domain_name,
                'HTTP_X_ROLES': roles,
                'HTTP_X_IDENTITY_STATUS': 'Confirmed'}

    def _get_identity_for_v2(self, **kwargs):
        identity = self._get_identity(**kwargs)
        for suffix in ['ID', 'NAME']:
            identity['HTTP_X_TENANT_{0}'.format(suffix)] = identity.pop(
                'HTTP_X_PROJECT_{0}'.format(suffix))
        return identity

    def _get_env_id(self, tenant_id='tenant_id', tenant_name='tenant_name',
                    user_id='user_id', user_name='user_name', roles=[],
                    project_domain_name='domA', project_domain_id='99',
                    user_domain_name='domA', user_domain_id='99',
                    auth_version='3'):
        env = self._get_identity(tenant_id, tenant_name, user_id, user_name,
                                 roles, project_domain_name,
                                 project_domain_id, user_domain_name,
                                 user_domain_id)
        token_info = _fake_token_info(version=auth_version)
        env.update({'keystone.token_info': token_info})
        return self.test_auth._keystone_identity(env)


class TestAuthorize(BaseTestAuthorize):
    def _check_authenticate(self, account=None, identity=None, headers=None,
                            exception=None, acl=None, env=None, path=None):
        if not identity:
            identity = self._get_identity()
        if not account:
            account = self._get_account(identity)
        if not path:
            path = '/v1/%s/c' % account
        # fake cached account info
        info_key = get_cache_key(account)
        default_env = {
            'REMOTE_USER': (identity.get('HTTP_X_PROJECT_ID') or
                            identity.get('HTTP_X_TENANT_ID')),
            'swift.infocache': {info_key: {'status': 200, 'sysmeta': {}}}}
        default_env.update(identity)
        if env:
            default_env.update(env)
        req = self._make_request(path, headers=headers, environ=default_env)
        req.acl = acl

        env_identity = self.test_auth._keystone_identity(req.environ)
        result = self.test_auth.authorize(env_identity, req)

        # if we have requested an exception but nothing came back then
        if exception and not result:
            self.fail("error %s was not returned" % (str(exception)))
        elif exception:
            self.assertEqual(result.status_int, exception)
        else:
            self.assertIsNone(result)
        return req

    def test_authorize_fails_for_unauthorized_user(self):
        self._check_authenticate(exception=HTTP_FORBIDDEN)

    def test_authorize_fails_for_invalid_reseller_prefix(self):
        self._check_authenticate(account='BLAN_a',
                                 exception=HTTP_FORBIDDEN)

    def test_authorize_succeeds_for_reseller_admin(self):
        roles = [self.test_auth.reseller_admin_role]
        identity = self._get_identity(roles=roles)
        req = self._check_authenticate(identity=identity)
        self.assertTrue(req.environ.get('swift_owner'))

    def test_authorize_succeeds_for_insensitive_reseller_admin(self):
        roles = [self.test_auth.reseller_admin_role.upper()]
        identity = self._get_identity(roles=roles)
        req = self._check_authenticate(identity=identity)
        self.assertTrue(req.environ.get('swift_owner'))

    def test_authorize_succeeds_as_owner_for_operator_role(self):
        roles = operator_roles(self.test_auth)
        identity = self._get_identity(roles=roles)
        req = self._check_authenticate(identity=identity)
        self.assertTrue(req.environ.get('swift_owner'))

    def test_authorize_succeeds_as_owner_for_insensitive_operator_role(self):
        roles = [r.upper() for r in operator_roles(self.test_auth)]
        identity = self._get_identity(roles=roles)
        req = self._check_authenticate(identity=identity)
        self.assertTrue(req.environ.get('swift_owner'))

    def test_authorize_fails_same_user_and_tenant(self):
        # Historically the is_admin option allowed access when user_name
        # matched tenant_name, but it is no longer supported. This test is a
        # sanity check that the option no longer works.
        self.test_auth.is_admin = True
        identity = self._get_identity(user_name='same_name',
                                      tenant_name='same_name')
        req = self._check_authenticate(identity=identity,
                                       exception=HTTP_FORBIDDEN)
        self.assertFalse(bool(req.environ.get('swift_owner')))

    def test_authorize_succeeds_for_container_sync(self):
        env = {'swift_sync_key': 'foo', 'REMOTE_ADDR': '127.0.0.1'}
        headers = {'x-container-sync-key': 'foo', 'x-timestamp': '1'}
        self._check_authenticate(env=env, headers=headers)

    def test_authorize_fails_for_invalid_referrer(self):
        env = {'HTTP_REFERER': 'http://invalid.com/index.html'}
        self._check_authenticate(acl='.r:example.com', env=env,
                                 exception=HTTP_FORBIDDEN)

    def test_authorize_fails_for_referrer_without_rlistings(self):
        env = {'HTTP_REFERER': 'http://example.com/index.html'}
        self._check_authenticate(acl='.r:example.com', env=env,
                                 exception=HTTP_FORBIDDEN)

    def test_authorize_succeeds_for_referrer_with_rlistings(self):
        env = {'HTTP_REFERER': 'http://example.com/index.html'}
        self._check_authenticate(acl='.r:example.com,.rlistings', env=env)

    def test_authorize_succeeds_for_referrer_with_obj(self):
        path = '/v1/%s/c/o' % self._get_account()
        env = {'HTTP_REFERER': 'http://example.com/index.html'}
        self._check_authenticate(acl='.r:example.com', env=env, path=path)

    def test_authorize_succeeds_for_user_role_in_roles(self):
        acl = 'allowme'
        identity = self._get_identity(roles=[acl])
        self._check_authenticate(identity=identity, acl=acl)

    def test_authorize_succeeds_for_tenant_name_user_in_roles(self):
        identity = self._get_identity_for_v2()
        user_name = identity['HTTP_X_USER_NAME']
        user_id = identity['HTTP_X_USER_ID']
        tenant_name = identity['HTTP_X_TENANT_NAME']
        for user in [user_id, user_name, '*']:
            acl = '%s:%s' % (tenant_name, user)
            self._check_authenticate(identity=identity, acl=acl)

    def test_authorize_succeeds_for_project_name_user_in_roles(self):
        identity = self._get_identity()
        user_name = identity['HTTP_X_USER_NAME']
        user_id = identity['HTTP_X_USER_ID']
        project_name = identity['HTTP_X_PROJECT_NAME']
        for user in [user_id, user_name, '*']:
            acl = '%s:%s' % (project_name, user)
            self._check_authenticate(identity=identity, acl=acl)

    def test_authorize_succeeds_for_tenant_id_user_in_roles(self):
        identity = self._get_identity_for_v2()
        user_name = identity['HTTP_X_USER_NAME']
        user_id = identity['HTTP_X_USER_ID']
        tenant_id = identity['HTTP_X_TENANT_ID']
        for user in [user_id, user_name, '*']:
            acl = '%s:%s' % (tenant_id, user)
            self._check_authenticate(identity=identity, acl=acl)

    def test_authorize_succeeds_for_project_id_user_in_roles(self):
        identity = self._get_identity()
        user_name = identity['HTTP_X_USER_NAME']
        user_id = identity['HTTP_X_USER_ID']
        project_id = identity['HTTP_X_PROJECT_ID']
        for user in [user_id, user_name, '*']:
            acl = '%s:%s' % (project_id, user)
            self._check_authenticate(identity=identity, acl=acl)

    def test_authorize_succeeds_for_wildcard_tenant_user_in_roles(self):
        identity = self._get_identity()
        user_name = identity['HTTP_X_USER_NAME']
        user_id = identity['HTTP_X_USER_ID']
        for user in [user_id, user_name, '*']:
            acl = '*:%s' % user
            self._check_authenticate(identity=identity, acl=acl)

    def test_cross_tenant_authorization_success(self):
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantID:userA']),
            'tenantID:userA')
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantNAME:userA']),
            'tenantNAME:userA')
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME', ['*:userA']),
            '*:userA')

        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantID:userID']),
            'tenantID:userID')
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantNAME:userID']),
            'tenantNAME:userID')
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME', ['*:userID']),
            '*:userID')

        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME', ['tenantID:*']),
            'tenantID:*')
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME', ['tenantNAME:*']),
            'tenantNAME:*')
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME', ['*:*']),
            '*:*')

    def test_cross_tenant_authorization_failure(self):
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantXYZ:userA']),
            None)

    def test_cross_tenant_authorization_allow_names(self):
        # tests that the allow_names arg does the right thing
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantNAME:userA'], allow_names=True),
            'tenantNAME:userA')
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantNAME:userID'], allow_names=True),
            'tenantNAME:userID')
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantID:userA'], allow_names=True),
            'tenantID:userA')
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantID:userID'], allow_names=True),
            'tenantID:userID')
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantNAME:userA'], allow_names=False),
            None)
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantID:userA'], allow_names=False),
            None)
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantNAME:userID'], allow_names=False),
            None)
        self.assertEqual(
            self.test_auth._authorize_cross_tenant(
                'userID', 'userA', 'tenantID', 'tenantNAME',
                ['tenantID:userID'], allow_names=False),
            'tenantID:userID')

    def test_delete_own_account_not_allowed(self):
        roles = operator_roles(self.test_auth)
        identity = self._get_identity(roles=roles)
        account = self._get_account(identity)
        self._check_authenticate(account=account,
                                 identity=identity,
                                 exception=HTTP_FORBIDDEN,
                                 path='/v1/' + account,
                                 env={'REQUEST_METHOD': 'DELETE'})

    def test_delete_own_account_when_reseller_allowed(self):
        roles = [self.test_auth.reseller_admin_role]
        identity = self._get_identity(roles=roles)
        account = self._get_account(identity)
        req = self._check_authenticate(account=account,
                                       identity=identity,
                                       path='/v1/' + account,
                                       env={'REQUEST_METHOD': 'DELETE'})
        self.assertEqual(bool(req.environ.get('swift_owner')), True)

    def test_identity_set_up_at_call(self):
        def fake_start_response(*args, **kwargs):
            pass
        the_env = self._get_identity(
            tenant_id='test', roles=['reselleradmin'])
        self.test_auth(the_env, fake_start_response)

        subreq = Request.blank(
            '/v1/%s/c/o' % get_account_for_tenant(self.test_auth, 'test'))
        subreq.environ.update(
            self._get_identity(tenant_id='test', roles=['got_erased']))

        authorize_resp = the_env['swift.authorize'](subreq)
        self.assertIsNone(authorize_resp)

    def test_names_disallowed_in_acls_outside_default_domain(self):
        id = self._get_identity_for_v2(user_domain_id='non-default',
                                       project_domain_id='non-default')
        env = {'keystone.token_info': _fake_token_info(version='3')}
        acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)

        id = self._get_identity(user_domain_id='non-default',
                                project_domain_id='non-default')
        acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)

    def test_names_allowed_in_acls_inside_default_domain(self):
        id = self._get_identity_for_v2(user_domain_id='default',
                                       project_domain_id='default')
        env = {'keystone.token_info': _fake_token_info(version='3')}
        acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)

        id = self._get_identity(user_domain_id='default',
                                project_domain_id='default')
        acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)

    def test_names_allowed_in_acls_inside_default_domain_with_config(self):
        conf = {'allow_names_in_acls': 'yes'}
        self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        self.test_auth.logger = FakeLogger()
        id = self._get_identity_for_v2(user_domain_id='default',
                                       project_domain_id='default')
        env = {'keystone.token_info': _fake_token_info(version='3')}
        acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)

        id = self._get_identity(user_domain_id='default',
                                project_domain_id='default')
        acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)

    def test_names_disallowed_in_acls_inside_default_domain(self):
        conf = {'allow_names_in_acls': 'false'}
        self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        self.test_auth.logger = FakeLogger()
        id = self._get_identity_for_v2(user_domain_id='default',
                                       project_domain_id='default')
        env = {'keystone.token_info': _fake_token_info(version='3')}
        acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)

        id = self._get_identity(user_domain_id='default',
                                project_domain_id='default')
        acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
        self._check_authenticate(acl=acl, identity=id, env=env,
                                 exception=HTTP_FORBIDDEN)
        acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
        self._check_authenticate(acl=acl, identity=id, env=env)

    def test_keystone_identity(self):
        user = ('U_ID', 'U_NAME')
        roles = ('ROLE1', 'ROLE2')
        service_roles = ('ROLE3', 'ROLE4')
        project = ('P_ID', 'P_NAME')
        user_domain = ('UD_ID', 'UD_NAME')
        project_domain = ('PD_ID', 'PD_NAME')

        # no valid identity info in headers
        req = Request.blank('/v/a/c/o')
        data = self.test_auth._keystone_identity(req.environ)
        self.assertIsNone(data)

        # valid identity info in headers, but status unconfirmed
        req.headers.update({'X-Identity-Status': 'Blah',
                            'X-Roles': '%s,%s' % roles,
                            'X-User-Id': user[0],
                            'X-User-Name': user[1],
                            'X-Tenant-Id': project[0],
                            'X-Tenant-Name': project[1],
                            'X-User-Domain-Id': user_domain[0],
                            'X-User-Domain-Name': user_domain[1],
                            'X-Project-Domain-Id': project_domain[0],
                            'X-Project-Domain-Name': project_domain[1]})
        data = self.test_auth._keystone_identity(req.environ)
        self.assertIsNone(data)

        # valid identity info in headers, no token info in environ
        req.headers.update({'X-Identity-Status': 'Confirmed'})
        expected = {'user': user,
                    'tenant': project,
                    'roles': list(roles),
                    'service_roles': [],
                    'user_domain': (None, None),
                    'project_domain': (None, None),
                    'auth_version': 0}
        data = self.test_auth._keystone_identity(req.environ)
        self.assertEqual(expected, data)

        # v2 token info in environ
        req.environ['keystone.token_info'] = _fake_token_info(version='2')
        expected = {'user': user,
                    'tenant': project,
                    'roles': list(roles),
                    'service_roles': [],
                    'user_domain': (None, None),
                    'project_domain': (None, None),
                    'auth_version': 2}
        data = self.test_auth._keystone_identity(req.environ)
        self.assertEqual(expected, data)

        # v3 token info in environ
        req.environ['keystone.token_info'] = _fake_token_info(version='3')
        expected = {'user': user,
                    'tenant': project,
                    'roles': list(roles),
                    'service_roles': [],
                    'user_domain': user_domain,
                    'project_domain': project_domain,
                    'auth_version': 3}
        data = self.test_auth._keystone_identity(req.environ)
        self.assertEqual(expected, data)

        # service token in environ
        req.headers.update({'X-Service-Roles': '%s,%s' % service_roles})
        expected = {'user': user,
                    'tenant': project,
                    'roles': list(roles),
                    'service_roles': list(service_roles),
                    'user_domain': user_domain,
                    'project_domain': project_domain,
                    'auth_version': 3}
        data = self.test_auth._keystone_identity(req.environ)
        self.assertEqual(expected, data)

    def test_get_project_domain_id(self):
        sysmeta = {}
        info = {'sysmeta': sysmeta}
        info_key = get_cache_key('AUTH_1234')
        env = {'PATH_INFO': '/v1/AUTH_1234',
               'swift.infocache': {info_key: info}}

        # account does not exist
        info['status'] = 404
        self.assertEqual(self.test_auth._get_project_domain_id(env),
                         (False, None))
        info['status'] = 0
        self.assertEqual(self.test_auth._get_project_domain_id(env),
                         (False, None))

        # account exists, no project domain id in sysmeta
        info['status'] = 200
        self.assertEqual(self.test_auth._get_project_domain_id(env),
                         (True, None))

        # account exists with project domain id in sysmeta
        sysmeta['project-domain-id'] = 'default'
        self.assertEqual(self.test_auth._get_project_domain_id(env),
                         (True, 'default'))


class TestIsNameAllowedInACL(BaseTestAuthorize):
    def setUp(self):
        super(TestIsNameAllowedInACL, self).setUp()
        self.default_id = 'default'

    def _assert_names_allowed(self, expected, user_domain_id=None,
                              req_project_domain_id=None,
                              sysmeta_project_domain_id=None,
                              scoped='account'):
        project_name = 'foo'
        account_id = '12345678'
        account = get_account_for_tenant(self.test_auth, account_id)
        parts = ('v1', account, None, None)
        path = '/%s/%s' % parts[0:2]

        sysmeta = {}
        if sysmeta_project_domain_id:
            sysmeta = {'project-domain-id': sysmeta_project_domain_id}

        # pretend account exists
        info = {'status': 200, 'sysmeta': sysmeta}
        info_key = get_cache_key(account)
        req = Request.blank(path,
                            environ={'swift.infocache': {info_key: info}})

        if scoped == 'account':
            project_name = 'account_name'
            project_id = account_id
        elif scoped == 'other':
            project_name = 'other_name'
            project_id = '87654321'
        else:
            # unscoped token
            project_name, project_id, req_project_domain_id = None, None, None

        if user_domain_id:
            id = self._get_env_id(tenant_name=project_name,
                                  tenant_id=project_id,
                                  user_domain_id=user_domain_id,
                                  project_domain_id=req_project_domain_id)
        else:
            # must be v2 token info
            id = self._get_env_id(tenant_name=project_name,
                                  tenant_id=project_id,
                                  auth_version='2')

        actual = self.test_auth._is_name_allowed_in_acl(req, parts, id)
        self.assertEqual(actual, expected, '%s, %s, %s, %s'
                         % (user_domain_id, req_project_domain_id,
                            sysmeta_project_domain_id, scoped))

    def test_is_name_allowed_in_acl_with_token_scoped_to_tenant(self):
        # no user or project domain ids in request token so must be v2,
        # user and project should be assumed to be in default domain
        self._assert_names_allowed(True, user_domain_id=None,
                                   req_project_domain_id=None,
                                   sysmeta_project_domain_id=None)
        self._assert_names_allowed(True, user_domain_id=None,
                                   req_project_domain_id=None,
                                   sysmeta_project_domain_id=self.default_id)
        self._assert_names_allowed(True, user_domain_id=None,
                                   req_project_domain_id=None,
                                   sysmeta_project_domain_id=UNKNOWN_ID)
        self._assert_names_allowed(True, user_domain_id=None,
                                   req_project_domain_id=None,
                                   sysmeta_project_domain_id='foo')

        # user in default domain, project domain in token info takes precedence
        self._assert_names_allowed(True, user_domain_id=self.default_id,
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id=None)
        self._assert_names_allowed(True, user_domain_id=self.default_id,
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id=UNKNOWN_ID)
        self._assert_names_allowed(True, user_domain_id=self.default_id,
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id='bar')
        self._assert_names_allowed(False, user_domain_id=self.default_id,
                                   req_project_domain_id='foo',
                                   sysmeta_project_domain_id=None)
        self._assert_names_allowed(False, user_domain_id=self.default_id,
                                   req_project_domain_id='foo',
                                   sysmeta_project_domain_id=self.default_id)
        self._assert_names_allowed(False, user_domain_id=self.default_id,
                                   req_project_domain_id='foo',
                                   sysmeta_project_domain_id='foo')

        # user in non-default domain so names should never be allowed
        self._assert_names_allowed(False, user_domain_id='foo',
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id=None)
        self._assert_names_allowed(False, user_domain_id='foo',
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id=self.default_id)
        self._assert_names_allowed(False, user_domain_id='foo',
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id=UNKNOWN_ID)
        self._assert_names_allowed(False, user_domain_id='foo',
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id='foo')

    def test_is_name_allowed_in_acl_with_unscoped_token(self):
        # user in default domain
        self._assert_names_allowed(True, user_domain_id=self.default_id,
                                   sysmeta_project_domain_id=None,
                                   scoped=False)
        self._assert_names_allowed(True, user_domain_id=self.default_id,
                                   sysmeta_project_domain_id=self.default_id,
                                   scoped=False)
        self._assert_names_allowed(False, user_domain_id=self.default_id,
                                   sysmeta_project_domain_id=UNKNOWN_ID,
                                   scoped=False)
        self._assert_names_allowed(False, user_domain_id=self.default_id,
                                   sysmeta_project_domain_id='foo',
                                   scoped=False)

        # user in non-default domain so names should never be allowed
        self._assert_names_allowed(False, user_domain_id='foo',
                                   sysmeta_project_domain_id=None,
                                   scoped=False)
        self._assert_names_allowed(False, user_domain_id='foo',
                                   sysmeta_project_domain_id=self.default_id,
                                   scoped=False)
        self._assert_names_allowed(False, user_domain_id='foo',
                                   sysmeta_project_domain_id=UNKNOWN_ID,
                                   scoped=False)
        self._assert_names_allowed(False, user_domain_id='foo',
                                   sysmeta_project_domain_id='foo',
                                   scoped=False)

    def test_is_name_allowed_in_acl_with_token_scoped_to_other_tenant(self):
        # user and scoped tenant in default domain
        self._assert_names_allowed(True, user_domain_id=self.default_id,
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id=None,
                                   scoped='other')
        self._assert_names_allowed(True, user_domain_id=self.default_id,
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id=self.default_id,
                                   scoped='other')
        self._assert_names_allowed(False, user_domain_id=self.default_id,
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id=UNKNOWN_ID,
                                   scoped='other')
        self._assert_names_allowed(False, user_domain_id=self.default_id,
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id='foo',
                                   scoped='other')

        # user in default domain, but scoped tenant in non-default domain
        self._assert_names_allowed(False, user_domain_id=self.default_id,
                                   req_project_domain_id='foo',
                                   sysmeta_project_domain_id=None,
                                   scoped='other')
        self._assert_names_allowed(False, user_domain_id=self.default_id,
                                   req_project_domain_id='foo',
                                   sysmeta_project_domain_id=self.default_id,
                                   scoped='other')
        self._assert_names_allowed(False, user_domain_id=self.default_id,
                                   req_project_domain_id='foo',
                                   sysmeta_project_domain_id=UNKNOWN_ID,
                                   scoped='other')
        self._assert_names_allowed(False, user_domain_id=self.default_id,
                                   req_project_domain_id='foo',
                                   sysmeta_project_domain_id='foo',
                                   scoped='other')

        # user in non-default domain, scoped tenant in default domain
        self._assert_names_allowed(False, user_domain_id='foo',
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id=None,
                                   scoped='other')
        self._assert_names_allowed(False, user_domain_id='foo',
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id=self.default_id,
                                   scoped='other')
        self._assert_names_allowed(False, user_domain_id='foo',
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id=UNKNOWN_ID,
                                   scoped='other')
        self._assert_names_allowed(False, user_domain_id='foo',
                                   req_project_domain_id=self.default_id,
                                   sysmeta_project_domain_id='foo',
                                   scoped='other')


class TestIsNameAllowedInACLWithConfiguredDomain(TestIsNameAllowedInACL):
    def setUp(self):
        super(TestIsNameAllowedInACLWithConfiguredDomain, self).setUp()
        conf = {'default_domain_id': 'mydefault'}
        self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        self.test_auth.logger = FakeLogger()
        self.default_id = 'mydefault'


class TestSetProjectDomain(BaseTestAuthorize):
    def _assert_set_project_domain(self, expected, account, req_project_id,
                                   req_project_domain_id,
                                   sysmeta_project_domain_id,
                                   warning=False):
        hdr = 'X-Account-Sysmeta-Project-Domain-Id'

        # set up fake account info in req env
        status = 0 if sysmeta_project_domain_id is None else 200
        sysmeta = {}
        if sysmeta_project_domain_id:
            sysmeta['project-domain-id'] = sysmeta_project_domain_id
        info = {'status': status, 'sysmeta': sysmeta}
        info_key = get_cache_key(account)
        env = {'swift.infocache': {info_key: info}}

        # create fake env identity
        env_id = self._get_env_id(tenant_id=req_project_id,
                                  project_domain_id=req_project_domain_id)

        # reset fake logger
        self.test_auth.logger = FakeLogger()
        num_warnings = 0

        # check account requests
        path = '/v1/%s' % account
        for method in ['PUT', 'POST']:
            req = Request.blank(path, environ=env)
            req.method = method
            path_parts = req.split_path(1, 4, True)
            self.test_auth._set_project_domain_id(req, path_parts, env_id)
            if warning:
                num_warnings += 1
                warnings = self.test_auth.logger.get_lines_for_level('warning')
                self.assertEqual(len(warnings), num_warnings)
                self.assertTrue(warnings[-1].startswith('Inconsistent proj'))
            if expected is not None:
                self.assertTrue(hdr in req.headers)
                self.assertEqual(req.headers[hdr], expected)
            else:
                self.assertFalse(hdr in req.headers, req.headers)

        for method in ['GET', 'HEAD', 'DELETE', 'OPTIONS']:
            req = Request.blank(path, environ=env)
            req.method = method
            self.test_auth._set_project_domain_id(req, path_parts, env_id)
            self.assertFalse(hdr in req.headers)

        # check container requests
        path = '/v1/%s/c' % account
        for method in ['PUT']:
            req = Request.blank(path, environ=env)
            req.method = method
            path_parts = req.split_path(1, 4, True)
            self.test_auth._set_project_domain_id(req, path_parts, env_id)
            if warning:
                num_warnings += 1
                warnings = self.test_auth.logger.get_lines_for_level('warning')
                self.assertEqual(len(warnings), num_warnings)
                self.assertTrue(warnings[-1].startswith('Inconsistent proj'))
            if expected is not None:
                self.assertTrue(hdr in req.headers)
                self.assertEqual(req.headers[hdr], expected)
            else:
                self.assertFalse(hdr in req.headers)

        for method in ['POST', 'GET', 'HEAD', 'DELETE', 'OPTIONS']:
            req = Request.blank(path, environ=env)
            req.method = method
            self.test_auth._set_project_domain_id(req, path_parts, env_id)
            self.assertFalse(hdr in req.headers)

        # never set for object requests
        path = '/v1/%s/c/o' % account
        for method in ['PUT', 'COPY', 'POST', 'GET', 'HEAD', 'DELETE',
                       'OPTIONS']:
            req = Request.blank(path, environ=env)
            req.method = method
            path_parts = req.split_path(1, 4, True)
            self.test_auth._set_project_domain_id(req, path_parts, env_id)
            self.assertFalse(hdr in req.headers)

    def test_set_project_domain_id_new_account(self):
        # scoped token with project domain info
        self._assert_set_project_domain('test_id',
                                        account='AUTH_1234',
                                        req_project_id='1234',
                                        req_project_domain_id='test_id',
                                        sysmeta_project_domain_id=None)

        # scoped v2 token without project domain id
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id='1234',
                                        req_project_domain_id=None,
                                        sysmeta_project_domain_id=None)

        # unscoped v2 token without project domain id
        self._assert_set_project_domain(UNKNOWN_ID,
                                        account='AUTH_1234',
                                        req_project_id=None,
                                        req_project_domain_id=None,
                                        sysmeta_project_domain_id=None)

        # token scoped on another project
        self._assert_set_project_domain(UNKNOWN_ID,
                                        account='AUTH_1234',
                                        req_project_id='4321',
                                        req_project_domain_id='default',
                                        sysmeta_project_domain_id=None)

    def test_set_project_domain_id_existing_v2_account(self):
        # project domain id provided in scoped request token,
        # update empty value
        self._assert_set_project_domain('default',
                                        account='AUTH_1234',
                                        req_project_id='1234',
                                        req_project_domain_id='default',
                                        sysmeta_project_domain_id='')

        # inconsistent project domain id provided in scoped request token,
        # leave known value
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id='1234',
                                        req_project_domain_id='unexpected_id',
                                        sysmeta_project_domain_id='',
                                        warning=True)

        # project domain id not provided, scoped request token,
        # no change to empty value
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id='1234',
                                        req_project_domain_id=None,
                                        sysmeta_project_domain_id='')

        # unscoped request token, no change to empty value
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id=None,
                                        req_project_domain_id=None,
                                        sysmeta_project_domain_id='')

        # token scoped on another project,
        # update empty value
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id='4321',
                                        req_project_domain_id=None,
                                        sysmeta_project_domain_id='')

    def test_set_project_domain_id_existing_account_unknown_domain(self):

        # project domain id provided in scoped request token,
        # set known value
        self._assert_set_project_domain('test_id',
                                        account='AUTH_1234',
                                        req_project_id='1234',
                                        req_project_domain_id='test_id',
                                        sysmeta_project_domain_id=UNKNOWN_ID)

        # project domain id not provided, scoped request token,
        # set empty value
        self._assert_set_project_domain('',
                                        account='AUTH_1234',
                                        req_project_id='1234',
                                        req_project_domain_id=None,
                                        sysmeta_project_domain_id=UNKNOWN_ID)

        # project domain id not provided, unscoped request token,
        # leave unknown value
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id=None,
                                        req_project_domain_id=None,
                                        sysmeta_project_domain_id=UNKNOWN_ID)

        # token scoped on another project, leave unknown value
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id='4321',
                                        req_project_domain_id='default',
                                        sysmeta_project_domain_id=UNKNOWN_ID)

    def test_set_project_domain_id_existing_known_domain(self):
        # project domain id provided in scoped request token,
        # leave known value
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id='1234',
                                        req_project_domain_id='test_id',
                                        sysmeta_project_domain_id='test_id')

        # inconsistent project domain id provided in scoped request token,
        # leave known value
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id='1234',
                                        req_project_domain_id='unexpected_id',
                                        sysmeta_project_domain_id='test_id',
                                        warning=True)

        # project domain id not provided, scoped request token,
        # leave known value
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id='1234',
                                        req_project_domain_id=None,
                                        sysmeta_project_domain_id='test_id')

        # project domain id not provided, unscoped request token,
        # leave known value
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id=None,
                                        req_project_domain_id=None,
                                        sysmeta_project_domain_id='test_id')

        # project domain id not provided, token scoped on another project,
        # leave known value
        self._assert_set_project_domain(None,
                                        account='AUTH_1234',
                                        req_project_id='4321',
                                        req_project_domain_id='default',
                                        sysmeta_project_domain_id='test_id')


class ResellerInInfo(unittest.TestCase):

    def setUp(self):
        self.default_rules = {'operator_roles': ['admin', 'swiftoperator'],
                              'service_roles': []}

    def test_defaults(self):
        test_auth = keystoneauth.filter_factory({})(FakeApp())
        self.assertEqual(test_auth.account_rules['AUTH_'], self.default_rules)

    def test_multiple(self):
        conf = {"reseller_prefix": "AUTH, '', PRE2"}
        test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        self.assertEqual(test_auth.account_rules['AUTH_'], self.default_rules)
        self.assertEqual(test_auth.account_rules[''], self.default_rules)
        self.assertEqual(test_auth.account_rules['PRE2_'], self.default_rules)


class PrefixAccount(unittest.TestCase):

    def test_default(self):
        conf = {}
        test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        self.assertEqual(get_account_for_tenant(test_auth,
                         '1234'), 'AUTH_1234')
        self.assertEqual(test_auth._get_account_prefix(
                         'AUTH_1234'), 'AUTH_')
        self.assertEqual(test_auth._get_account_prefix(
                         'JUNK_1234'), None)
        self.assertTrue(test_auth._account_matches_tenant(
                        'AUTH_1234', '1234'))
        self.assertFalse(test_auth._account_matches_tenant(
                         'AUTH_1234', '5678'))
        self.assertFalse(test_auth._account_matches_tenant(
                         'JUNK_1234', '1234'))

    def test_same_as_default(self):
        conf = {'reseller_prefix': 'AUTH'}
        test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        self.assertEqual(get_account_for_tenant(test_auth,
                         '1234'), 'AUTH_1234')
        self.assertEqual(test_auth._get_account_prefix(
                         'AUTH_1234'), 'AUTH_')
        self.assertEqual(test_auth._get_account_prefix(
                         'JUNK_1234'), None)
        self.assertTrue(test_auth._account_matches_tenant(
                        'AUTH_1234', '1234'))
        self.assertFalse(test_auth._account_matches_tenant(
                         'AUTH_1234', '5678'))

    def test_blank_reseller(self):
        conf = {'reseller_prefix': ''}
        test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        self.assertEqual(get_account_for_tenant(test_auth,
                         '1234'), '1234')
        self.assertEqual(test_auth._get_account_prefix(
                         '1234'), '')
        self.assertEqual(test_auth._get_account_prefix(
                         'JUNK_1234'), '')  # yes, it should return ''
        self.assertTrue(test_auth._account_matches_tenant(
                        '1234', '1234'))
        self.assertFalse(test_auth._account_matches_tenant(
                         '1234', '5678'))
        self.assertFalse(test_auth._account_matches_tenant(
                         'JUNK_1234', '1234'))

    def test_multiple_resellers(self):
        conf = {'reseller_prefix': 'AUTH, PRE2'}
        test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        self.assertEqual(get_account_for_tenant(test_auth,
                         '1234'), 'AUTH_1234')
        self.assertEqual(test_auth._get_account_prefix(
                         'AUTH_1234'), 'AUTH_')
        self.assertEqual(test_auth._get_account_prefix(
                         'JUNK_1234'), None)
        self.assertTrue(test_auth._account_matches_tenant(
                        'AUTH_1234', '1234'))
        self.assertTrue(test_auth._account_matches_tenant(
                        'PRE2_1234', '1234'))
        self.assertFalse(test_auth._account_matches_tenant(
                         'AUTH_1234', '5678'))
        self.assertFalse(test_auth._account_matches_tenant(
                         'PRE2_1234', '5678'))

    def test_blank_plus_other_reseller(self):
        conf = {'reseller_prefix': " '', PRE2"}
        test_auth = keystoneauth.filter_factory(conf)(FakeApp())
        self.assertEqual(get_account_for_tenant(test_auth,
                         '1234'), '1234')
        self.assertEqual(test_auth._get_account_prefix(
                         'PRE2_1234'), 'PRE2_')
        self.assertEqual(test_auth._get_account_prefix('JUNK_1234'), '')
        self.assertTrue(test_auth._account_matches_tenant(
                        '1234', '1234'))
        self.assertTrue(test_auth._account_matches_tenant(
                        'PRE2_1234', '1234'))
        self.assertFalse(test_auth._account_matches_tenant(
                         '1234', '5678'))
        self.assertFalse(test_auth._account_matches_tenant(
                         'PRE2_1234', '5678'))

if __name__ == '__main__':
    unittest.main()