Merged gundlach's branch.

This commit is contained in:
Eric Day
2010-09-23 11:24:26 -07:00
5 changed files with 60 additions and 73 deletions

View File

@@ -32,14 +32,18 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
from nova import api
from nova import flags
from nova import utils
from nova import wsgi
from nova import server
FLAGS = flags.FLAGS
flags.DEFINE_integer('api_port', 8773, 'API port')
def main(_args):
from nova import api
from nova import wsgi
wsgi.run_server(api.API(), FLAGS.api_port)
if __name__ == '__main__':
utils.default_flagfile()
wsgi.run_server(api.API(), FLAGS.api_port)
server.serve('nova-api', main)

View File

@@ -44,7 +44,7 @@ flags.DEFINE_list('allowed_roles',
# NOTE(vish): a user with one of these roles will be a superuser and
# have access to all api commands
flags.DEFINE_list('superuser_roles', ['cloudadmin'],
'Roles that ignore rbac checking completely')
'Roles that ignore authorization checking completely')
# NOTE(vish): a user with one of these roles will have it for every
# project, even if he or she is not a member of the project
@@ -304,7 +304,7 @@ class AuthManager(object):
return "%s:%s" % (user.access, Project.safe_id(project))
def is_superuser(self, user):
"""Checks for superuser status, allowing user to bypass rbac
"""Checks for superuser status, allowing user to bypass authorization
@type user: User or uid
@param user: User to check.

View File

@@ -18,12 +18,13 @@
import unittest
import logging
import webob
from nova import exception
from nova import flags
from nova import test
from nova.api import ec2
from nova.auth import manager
from nova.auth import rbac
FLAGS = flags.FLAGS
@@ -72,9 +73,17 @@ class AccessTestCase(test.BaseTestCase):
try:
self.project.add_role(self.testsys, 'sysadmin')
except: pass
self.context = Context()
self.context.project = self.project
#user is set in each test
def noopWSGIApp(environ, start_response):
start_response('200 OK', [])
return ['']
self.mw = ec2.Authorizer(noopWSGIApp)
self.mw.action_roles = {'str': {
'_allow_all': ['all'],
'_allow_none': [],
'_allow_project_manager': ['projectmanager'],
'_allow_sys_and_net': ['sysadmin', 'netadmin'],
'_allow_sysadmin': ['sysadmin']}}
def tearDown(self):
um = manager.AuthManager()
@@ -87,76 +96,46 @@ class AccessTestCase(test.BaseTestCase):
um.delete_user('testsys')
super(AccessTestCase, self).tearDown()
def response_status(self, user, methodName):
context = Context()
context.project = self.project
context.user = user
environ = {'ec2.context' : context,
'ec2.controller': 'some string',
'ec2.action': methodName}
req = webob.Request.blank('/', environ)
resp = req.get_response(self.mw)
return resp.status_int
def shouldAllow(self, user, methodName):
self.assertEqual(200, self.response_status(user, methodName))
def shouldDeny(self, user, methodName):
self.assertEqual(401, self.response_status(user, methodName))
def test_001_allow_all(self):
self.context.user = self.testadmin
self.assertTrue(self._allow_all(self.context))
self.context.user = self.testpmsys
self.assertTrue(self._allow_all(self.context))
self.context.user = self.testnet
self.assertTrue(self._allow_all(self.context))
self.context.user = self.testsys
self.assertTrue(self._allow_all(self.context))
users = [self.testadmin, self.testpmsys, self.testnet, self.testsys]
for user in users:
self.shouldAllow(user, '_allow_all')
def test_002_allow_none(self):
self.context.user = self.testadmin
self.assertTrue(self._allow_none(self.context))
self.context.user = self.testpmsys
self.assertRaises(exception.NotAuthorized, self._allow_none, self.context)
self.context.user = self.testnet
self.assertRaises(exception.NotAuthorized, self._allow_none, self.context)
self.context.user = self.testsys
self.assertRaises(exception.NotAuthorized, self._allow_none, self.context)
self.shouldAllow(self.testadmin, '_allow_none')
users = [self.testpmsys, self.testnet, self.testsys]
for user in users:
self.shouldDeny(user, '_allow_none')
def test_003_allow_project_manager(self):
self.context.user = self.testadmin
self.assertTrue(self._allow_project_manager(self.context))
self.context.user = self.testpmsys
self.assertTrue(self._allow_project_manager(self.context))
self.context.user = self.testnet
self.assertRaises(exception.NotAuthorized, self._allow_project_manager, self.context)
self.context.user = self.testsys
self.assertRaises(exception.NotAuthorized, self._allow_project_manager, self.context)
for user in [self.testadmin, self.testpmsys]:
self.shouldAllow(user, '_allow_project_manager')
for user in [self.testnet, self.testsys]:
self.shouldDeny(user, '_allow_project_manager')
def test_004_allow_sys_and_net(self):
self.context.user = self.testadmin
self.assertTrue(self._allow_sys_and_net(self.context))
self.context.user = self.testpmsys # doesn't have the per project sysadmin
self.assertRaises(exception.NotAuthorized, self._allow_sys_and_net, self.context)
self.context.user = self.testnet
self.assertTrue(self._allow_sys_and_net(self.context))
self.context.user = self.testsys
self.assertTrue(self._allow_sys_and_net(self.context))
def test_005_allow_sys_no_pm(self):
self.context.user = self.testadmin
self.assertTrue(self._allow_sys_no_pm(self.context))
self.context.user = self.testpmsys
self.assertRaises(exception.NotAuthorized, self._allow_sys_no_pm, self.context)
self.context.user = self.testnet
self.assertRaises(exception.NotAuthorized, self._allow_sys_no_pm, self.context)
self.context.user = self.testsys
self.assertTrue(self._allow_sys_no_pm(self.context))
@rbac.allow('all')
def _allow_all(self, context):
return True
@rbac.allow('none')
def _allow_none(self, context):
return True
@rbac.allow('projectmanager')
def _allow_project_manager(self, context):
return True
@rbac.allow('sysadmin', 'netadmin')
def _allow_sys_and_net(self, context):
return True
@rbac.allow('sysadmin')
@rbac.deny('projectmanager')
def _allow_sys_no_pm(self, context):
return True
for user in [self.testadmin, self.testnet, self.testsys]:
self.shouldAllow(user, '_allow_sys_and_net')
# denied because it doesn't have the per project sysadmin
for user in [self.testpmsys]:
self.shouldDeny(user, '_allow_sys_and_net')
if __name__ == "__main__":
# TODO: Implement use_fake as an option

View File

@@ -25,12 +25,17 @@ import random
import StringIO
import webob
from nova import flags
from nova import test
from nova import api
from nova.api.ec2 import cloud
from nova.auth import manager
FLAGS = flags.FLAGS
FLAGS.FAKE_subdomain = 'ec2'
class FakeHttplibSocket(object):
"""a fake socket implementation for httplib.HTTPResponse, trivial"""
def __init__(self, response_string):

View File

@@ -49,8 +49,7 @@ from nova import datastore
from nova import flags
from nova import twistd
#TODO(gundlach): rewrite and readd this after merge
#from nova.tests.access_unittest import *
from nova.tests.access_unittest import *
from nova.tests.auth_unittest import *
from nova.tests.api_unittest import *
from nova.tests.cloud_unittest import *