Enable UT test_auth.py

Remove the skip line in this file, and make some changes to
make them pass.

Partially Implements: blueprint test-addition-refactoring

Change-Id: I6b747da14bc743dbd2b79a6629e761d316de629f
This commit is contained in:
Yan Xing'an 2018-11-12 21:25:25 +08:00
parent 24f4459f26
commit 929183a893
7 changed files with 135 additions and 60 deletions

View File

@ -14,49 +14,26 @@
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_middleware import request_id from oslo_middleware import base
import webob.dec import webob.dec
import webob.exc import webob.exc
from tacker import context from tacker import context
from tacker import wsgi
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class TackerKeystoneContext(wsgi.Middleware): class TackerKeystoneContext(base.ConfigurableMiddleware):
"""Make a request context from keystone headers.""" """Make a request context from keystone headers."""
@webob.dec.wsgify @webob.dec.wsgify
def __call__(self, req): def __call__(self, req):
# Determine the user ID ctx = context.Context.from_environ(req.environ)
user_id = req.headers.get('X_USER_ID')
if not user_id: if not ctx.user_id:
LOG.debug("X_USER_ID is not found in request") LOG.debug("X_USER_ID is not found in request")
return webob.exc.HTTPUnauthorized() return webob.exc.HTTPUnauthorized()
# Determine the tenant
tenant_id = req.headers.get('X_PROJECT_ID')
# Suck out the roles
roles = [r.strip() for r in req.headers.get('X_ROLES', '').split(',')]
# Human-friendly names
tenant_name = req.headers.get('X_PROJECT_NAME')
user_name = req.headers.get('X_USER_NAME')
# Use request_id if already set
req_id = req.environ.get(request_id.ENV_REQUEST_ID)
# Get the auth token
auth_token = req.headers.get('X_AUTH_TOKEN',
req.headers.get('X_STORAGE_TOKEN'))
# Create a context with the authentication data
ctx = context.Context(user_id, tenant_id, roles=roles,
user_name=user_name, tenant_name=tenant_name,
request_id=req_id, auth_token=auth_token)
# Inject the context... # Inject the context...
req.environ['tacker.context'] = ctx req.environ['tacker.context'] = ctx

View File

@ -36,30 +36,26 @@ class ContextBase(oslo_context.RequestContext):
""" """
def __init__(self, user_id, tenant_id, is_admin=None, roles=None, def __init__(self, user_id=None, tenant_id=None, is_admin=None,
timestamp=None, request_id=None, tenant_name=None, timestamp=None, tenant_name=None, user_name=None,
user_name=None, overwrite=True, auth_token=None, is_advsvc=None, **kwargs):
**kwargs): # NOTE(jamielennox): We maintain this argument in order for tests that
"""Object initialization. # pass arguments positionally.
kwargs.setdefault('project_id', tenant_id)
# prefer project_name, as that's what's going to be set by
# keystone. Fall back to tenant_name if for some reason it's blank.
kwargs.setdefault('project_name', tenant_name)
super(ContextBase, self).__init__(
is_admin=is_admin, user_id=user_id, **kwargs)
:param overwrite: Set to False to ensure that the greenthread local
copy of the index is not overwritten.
:param kwargs: Extra arguments that might be present, but we ignore
because they possibly came in from older rpc messages.
"""
super(ContextBase, self).__init__(auth_token=auth_token,
user=user_id, tenant=tenant_id,
is_admin=is_admin,
request_id=request_id,
overwrite=overwrite,
roles=roles)
self.user_name = user_name self.user_name = user_name
self.tenant_name = tenant_name
if not timestamp: if not timestamp:
timestamp = datetime.datetime.utcnow() timestamp = datetime.datetime.utcnow()
self.timestamp = timestamp self.timestamp = timestamp
# self.is_advsvc = is_advsvc
# if self.is_advsvc is None:
# self.is_advsvc = self.is_admin or policy.check_is_advsvc(self)
if self.is_admin is None: if self.is_admin is None:
self.is_admin = policy.check_is_admin(self) self.is_admin = policy.check_is_admin(self)
@ -75,6 +71,14 @@ class ContextBase(oslo_context.RequestContext):
def tenant_id(self, tenant_id): def tenant_id(self, tenant_id):
self.tenant = tenant_id self.tenant = tenant_id
@property
def tenant_name(self):
return self.project_name
@tenant_name.setter
def tenant_name(self, tenant_name):
self.project_name = tenant_name
@property @property
def user_id(self): def user_id(self):
return self.user return self.user
@ -98,7 +102,35 @@ class ContextBase(oslo_context.RequestContext):
@classmethod @classmethod
def from_dict(cls, values): def from_dict(cls, values):
return cls(**values) return cls(user_id=values.get('user_id', values.get('user')),
tenant_id=values.get('tenant_id', values.get('project_id')),
is_admin=values.get('is_admin'),
roles=values.get('roles'),
timestamp=values.get('timestamp'),
request_id=values.get('request_id'),
tenant_name=values.get('tenant_name'),
user_name=values.get('user_name'),
auth_token=values.get('auth_token'))
def to_policy_values(self):
values = super(ContextBase, self).to_policy_values()
values['tenant_id'] = self.project_id
values['is_admin'] = self.is_admin
# NOTE(jamielennox): These are almost certainly unused and non-standard
# but kept for backwards compatibility. Remove them in Pike
# (oslo.context from Ocata release already issues deprecation warnings
# for non-standard keys).
values['user'] = self.user_id
values['tenant'] = self.project_id
values['domain'] = self.domain_id
values['user_domain'] = self.user_domain_id
values['project_domain'] = self.project_domain_id
values['tenant_name'] = self.project_name
values['project_name'] = self.project_name
values['user_name'] = self.user_name
return values
def elevated(self): def elevated(self):
"""Return a version of this context with admin flag set.""" """Return a version of this context with admin flag set."""

View File

@ -15,7 +15,7 @@
import collections import collections
import re import re
import sys
from oslo_config import cfg from oslo_config import cfg
from oslo_db import exception as db_exc from oslo_db import exception as db_exc
@ -36,6 +36,18 @@ _ENFORCER = None
ADMIN_CTX_POLICY = 'context_is_admin' ADMIN_CTX_POLICY = 'context_is_admin'
_BASE_RULES = [
policy.RuleDefault(
ADMIN_CTX_POLICY,
'role:admin',
description='Rule for cloud admin access'),
# policy.RuleDefault(
# _ADVSVC_CTX_POLICY,
# 'role:advsvc',
# description='Rule for advanced service role access'),
]
def reset(): def reset():
global _ENFORCER global _ENFORCER
if _ENFORCER: if _ENFORCER:
@ -49,6 +61,7 @@ def init(conf=cfg.CONF, policy_file=None):
global _ENFORCER global _ENFORCER
if not _ENFORCER: if not _ENFORCER:
_ENFORCER = policy.Enforcer(conf, policy_file=policy_file) _ENFORCER = policy.Enforcer(conf, policy_file=policy_file)
_ENFORCER.register_defaults(_BASE_RULES)
_ENFORCER.load_rules(True) _ENFORCER.load_rules(True)
@ -407,7 +420,31 @@ def check_is_admin(context):
"""Verify context has admin rights according to policy settings.""" """Verify context has admin rights according to policy settings."""
init() init()
# the target is user-self # the target is user-self
credentials = context.to_dict() credentials = context.to_policy_values()
if ADMIN_CTX_POLICY not in _ENFORCER.rules: try:
return _ENFORCER.authorize(ADMIN_CTX_POLICY, credentials, credentials)
except policy.PolicyNotRegistered:
return False return False
return _ENFORCER.enforce(ADMIN_CTX_POLICY, credentials, credentials)
def get_enforcer():
# NOTE(amotoki): This was borrowed from nova/policy.py.
# This method is for use by oslo.policy CLI scripts. Those scripts need the
# 'output-file' and 'namespace' options, but having those in sys.argv means
# loading the tacker config options will fail as those are not expected to
# be present. So we pass in an arg list with those stripped out.
conf_args = []
# Start at 1 because cfg.CONF expects the equivalent of sys.argv[1:]
i = 1
while i < len(sys.argv):
if sys.argv[i].strip('-') in ['namespace', 'output-file']:
i += 2
continue
conf_args.append(sys.argv[i])
i += 1
# 'project' must be 'tacker' so that get_enforcer looks at
# /etc/tacker/policy.json by default.
cfg.CONF(conf_args, project='tacker')
init()
return _ENFORCER

View File

@ -34,10 +34,10 @@ from tacker.common import config
from tacker.common import rpc as n_rpc from tacker.common import rpc as n_rpc
from tacker import context from tacker import context
from tacker import manager from tacker import manager
from tacker import policy
from tacker.tests import fake_notifier from tacker.tests import fake_notifier
from tacker.tests import post_mortem_debug from tacker.tests import post_mortem_debug
CONF = cfg.CONF CONF = cfg.CONF
CONF.import_opt('state_path', 'tacker.common.config') CONF.import_opt('state_path', 'tacker.common.config')
TRUE_STRING = ['True', '1'] TRUE_STRING = ['True', '1']
@ -105,6 +105,10 @@ class BaseTestCase(testtools.TestCase):
else: else:
conf(args) conf(args)
def setup_config(self, args=None):
"""Tests that need a non-default config can override this method."""
self.config_parse(args=args)
def setUp(self): def setUp(self):
super(BaseTestCase, self).setUp() super(BaseTestCase, self).setUp()
@ -152,6 +156,9 @@ class BaseTestCase(testtools.TestCase):
self.temp_dir = self.useFixture(fixtures.TempDir()).path self.temp_dir = self.useFixture(fixtures.TempDir()).path
cfg.CONF.set_override('state_path', self.temp_dir) cfg.CONF.set_override('state_path', self.temp_dir)
self.setup_config()
policy.init()
self.addCleanup(policy.reset)
self.addCleanup(mock.patch.stopall) self.addCleanup(mock.patch.stopall)
self.addCleanup(CONF.reset) self.addCleanup(CONF.reset)

View File

@ -0,0 +1,10 @@
{
"context_is_admin": "role:admin",
"admin_or_owner": "rule:context_is_admin or tenant_id:%(tenant_id)s",
"admin_only": "rule:context_is_admin",
"regular_user": "",
"shared": "field:vims:shared=True",
"default": "rule:admin_or_owner",
"get_vim": "rule:admin_or_owner or rule:shared"
}

View File

@ -27,8 +27,8 @@ class ConfigurationTest(base.BaseTestCase):
def test_defaults(self): def test_defaults(self):
self.assertEqual('0.0.0.0', cfg.CONF.bind_host) self.assertEqual('0.0.0.0', cfg.CONF.bind_host)
self.assertEqual(9890, cfg.CONF.bind_port) self.assertEqual(9890, cfg.CONF.bind_port)
self.assertEqual('api-paste.ini', cfg.CONF.api_paste_config) self.assertEqual('api-paste.ini.test', cfg.CONF.api_paste_config)
self.assertEqual('', cfg.CONF.api_extensions_path) self.assertEqual('unit/extensions', cfg.CONF.api_extensions_path)
self.assertEqual('policy.json', cfg.CONF.policy_file) self.assertEqual('policy.json', cfg.CONF.policy_file)
self.assertEqual('keystone', cfg.CONF.auth_strategy) self.assertEqual('keystone', cfg.CONF.auth_strategy)
self.assertTrue(cfg.CONF.allow_bulk) self.assertTrue(cfg.CONF.allow_bulk)

View File

@ -23,7 +23,6 @@ from tacker.tests import base
class TackerKeystoneContextTestCase(base.BaseTestCase): class TackerKeystoneContextTestCase(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TackerKeystoneContextTestCase, self).setUp() super(TackerKeystoneContextTestCase, self).setUp()
self.skip("Not ready yet")
@webob.dec.wsgify @webob.dec.wsgify
def fake_app(req): def fake_app(req):
@ -62,8 +61,7 @@ class TackerKeystoneContextTestCase(base.BaseTestCase):
self.request.headers['X_ROLES'] = 'role1, role2 , role3,role4,role5' self.request.headers['X_ROLES'] = 'role1, role2 , role3,role4,role5'
response = self.request.get_response(self.middleware) response = self.request.get_response(self.middleware)
self.assertEqual('200 OK', response.status) self.assertEqual('200 OK', response.status)
self.assertEqual(['role1', 'role2', self.assertEqual(['role1', 'role2', 'role3', 'role4', 'role5'],
'role3', 'role4', 'role5'],
self.context.roles) self.context.roles)
self.assertFalse(self.context.is_admin) self.assertFalse(self.context.is_admin)
@ -74,10 +72,10 @@ class TackerKeystoneContextTestCase(base.BaseTestCase):
'AdMiN') 'AdMiN')
response = self.request.get_response(self.middleware) response = self.request.get_response(self.middleware)
self.assertEqual('200 OK', response.status) self.assertEqual('200 OK', response.status)
self.assertEqual(['role1', 'role2', 'role3', self.assertEqual(['role1', 'role2', 'role3', 'role4', 'role5',
'role4', 'role5', 'AdMiN'], 'AdMiN'],
self.context.roles) self.context.roles)
self.assertEqual(True, self.context.is_admin) self.assertTrue(self.context.is_admin)
def test_with_user_tenant_name(self): def test_with_user_tenant_name(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid' self.request.headers['X_PROJECT_ID'] = 'testtenantid'
@ -98,3 +96,17 @@ class TackerKeystoneContextTestCase(base.BaseTestCase):
self.request.environ[request_id.ENV_REQUEST_ID] = req_id self.request.environ[request_id.ENV_REQUEST_ID] = req_id
self.request.get_response(self.middleware) self.request.get_response(self.middleware)
self.assertEqual(req_id, self.context.request_id) self.assertEqual(req_id, self.context.request_id)
def test_with_auth_token(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
response = self.request.get_response(self.middleware)
self.assertEqual('200 OK', response.status)
self.assertEqual('testauthtoken', self.context.auth_token)
def test_without_auth_token(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
del self.request.headers['X_AUTH_TOKEN']
self.request.get_response(self.middleware)
self.assertIsNone(self.context.auth_token)