Merge trunk.

This commit is contained in:
Soren Hansen
2010-09-29 13:29:45 +02:00
17 changed files with 628 additions and 204 deletions

View File

@@ -135,15 +135,48 @@ class VpnCommands(object):
class ShellCommands(object): class ShellCommands(object):
def run(self): def bpython(self):
"Runs a Python interactive interpreter. Tries to use IPython, if it's available." """Runs a bpython shell.
try:
import IPython Falls back to Ipython/python shell if unavailable"""
# Explicitly pass an empty list as arguments, because otherwise IPython self.run('bpython')
# would use sys.argv from this script.
shell = IPython.Shell.IPShell(argv=[]) def ipython(self):
shell.mainloop() """Runs an Ipython shell.
except ImportError:
Falls back to Python shell if unavailable"""
self.run('ipython')
def python(self):
"""Runs a python shell.
Falls back to Python shell if unavailable"""
self.run('python')
def run(self, shell=None):
"""Runs a Python interactive interpreter.
args: [shell=bpython]"""
if not shell:
shell = 'bpython'
if shell == 'bpython':
try:
import bpython
bpython.embed()
except ImportError:
shell = 'ipython'
if shell == 'ipython':
try:
import IPython
# Explicitly pass an empty list as arguments, because otherwise IPython
# would use sys.argv from this script.
shell = IPython.Shell.IPShell(argv=[])
shell.mainloop()
except ImportError:
shell = 'python'
if shell == 'python':
import code import code
try: # Try activating rlcompleter, because it's handy. try: # Try activating rlcompleter, because it's handy.
import readline import readline
@@ -156,6 +189,11 @@ class ShellCommands(object):
readline.parse_and_bind("tab:complete") readline.parse_and_bind("tab:complete")
code.interact() code.interact()
def script(self, path):
"""Runs the script from the specifed path with flags set properly.
arguments: path"""
exec(compile(open(path).read(), path, 'exec'), locals(), globals())
class RoleCommands(object): class RoleCommands(object):
"""Class for managing roles.""" """Class for managing roles."""
@@ -228,6 +266,19 @@ class UserCommands(object):
for user in self.manager.get_users(): for user in self.manager.get_users():
print user.name print user.name
def modify(self, name, access_key, secret_key, is_admin):
"""update a users keys & admin flag
arguments: accesskey secretkey admin
leave any field blank to ignore it, admin should be 'T', 'F', or blank
"""
if not is_admin:
is_admin = None
elif is_admin.upper()[0] == 'T':
is_admin = True
else:
is_admin = False
print "is_admin: %r" % is_admin
self.manager.modify_user(name, access_key, secret_key, is_admin)
class ProjectCommands(object): class ProjectCommands(object):
"""Class for managing projects.""" """Class for managing projects."""
@@ -253,7 +304,7 @@ class ProjectCommands(object):
def environment(self, project_id, user_id, filename='novarc'): def environment(self, project_id, user_id, filename='novarc'):
"""Exports environment variables to an sourcable file """Exports environment variables to an sourcable file
arguments: project_id user_id [filename='novarc]""" arguments: project_id user_id [filename='novarc]"""
rc = self.manager.get_environment_rc(project_id, user_id) rc = self.manager.get_environment_rc(user_id, project_id)
with open(filename, 'w') as f: with open(filename, 'w') as f:
f.write(rc) f.write(rc)
@@ -316,7 +367,7 @@ class FloatingIpCommands(object):
for floating_ip in floating_ips: for floating_ip in floating_ips:
instance = None instance = None
if floating_ip['fixed_ip']: if floating_ip['fixed_ip']:
instance = floating_ip['fixed_ip']['instance']['str_id'] instance = floating_ip['fixed_ip']['instance']['ec2_id']
print "%s\t%s\t%s" % (floating_ip['host'], print "%s\t%s\t%s" % (floating_ip['host'],
floating_ip['address'], floating_ip['address'],
instance) instance)

View File

@@ -20,11 +20,17 @@ Nova User API client library.
""" """
import base64 import base64
import boto import boto
import httplib
from boto.ec2.regioninfo import RegionInfo from boto.ec2.regioninfo import RegionInfo
DEFAULT_CLC_URL='http://127.0.0.1:8773'
DEFAULT_REGION='nova'
DEFAULT_ACCESS_KEY='admin'
DEFAULT_SECRET_KEY='admin'
class UserInfo(object): class UserInfo(object):
""" """
Information about a Nova user, as parsed through SAX Information about a Nova user, as parsed through SAX
@@ -68,13 +74,13 @@ class UserRole(object):
def __init__(self, connection=None): def __init__(self, connection=None):
self.connection = connection self.connection = connection
self.role = None self.role = None
def __repr__(self): def __repr__(self):
return 'UserRole:%s' % self.role return 'UserRole:%s' % self.role
def startElement(self, name, attrs, connection): def startElement(self, name, attrs, connection):
return None return None
def endElement(self, name, value, connection): def endElement(self, name, value, connection):
if name == 'role': if name == 'role':
self.role = value self.role = value
@@ -128,20 +134,20 @@ class ProjectMember(object):
def __init__(self, connection=None): def __init__(self, connection=None):
self.connection = connection self.connection = connection
self.memberId = None self.memberId = None
def __repr__(self): def __repr__(self):
return 'ProjectMember:%s' % self.memberId return 'ProjectMember:%s' % self.memberId
def startElement(self, name, attrs, connection): def startElement(self, name, attrs, connection):
return None return None
def endElement(self, name, value, connection): def endElement(self, name, value, connection):
if name == 'member': if name == 'member':
self.memberId = value self.memberId = value
else: else:
setattr(self, name, str(value)) setattr(self, name, str(value))
class HostInfo(object): class HostInfo(object):
""" """
Information about a Nova Host, as parsed through SAX: Information about a Nova Host, as parsed through SAX:
@@ -171,35 +177,56 @@ class HostInfo(object):
class NovaAdminClient(object): class NovaAdminClient(object):
def __init__(self, clc_ip='127.0.0.1', region='nova', access_key='admin', def __init__(self, clc_url=DEFAULT_CLC_URL, region=DEFAULT_REGION,
secret_key='admin', **kwargs): access_key=DEFAULT_ACCESS_KEY, secret_key=DEFAULT_SECRET_KEY,
self.clc_ip = clc_ip **kwargs):
parts = self.split_clc_url(clc_url)
self.clc_url = clc_url
self.region = region self.region = region
self.access = access_key self.access = access_key
self.secret = secret_key self.secret = secret_key
self.apiconn = boto.connect_ec2(aws_access_key_id=access_key, self.apiconn = boto.connect_ec2(aws_access_key_id=access_key,
aws_secret_access_key=secret_key, aws_secret_access_key=secret_key,
is_secure=False, is_secure=parts['is_secure'],
region=RegionInfo(None, region, clc_ip), region=RegionInfo(None,
port=8773, region,
parts['ip']),
port=parts['port'],
path='/services/Admin', path='/services/Admin',
**kwargs) **kwargs)
self.apiconn.APIVersion = 'nova' self.apiconn.APIVersion = 'nova'
def connection_for(self, username, project, **kwargs): def connection_for(self, username, project, clc_url=None, region=None,
**kwargs):
""" """
Returns a boto ec2 connection for the given username. Returns a boto ec2 connection for the given username.
""" """
if not clc_url:
clc_url = self.clc_url
if not region:
region = self.region
parts = self.split_clc_url(clc_url)
user = self.get_user(username) user = self.get_user(username)
access_key = '%s:%s' % (user.accesskey, project) access_key = '%s:%s' % (user.accesskey, project)
return boto.connect_ec2( return boto.connect_ec2(aws_access_key_id=access_key,
aws_access_key_id=access_key, aws_secret_access_key=user.secretkey,
aws_secret_access_key=user.secretkey, is_secure=parts['is_secure'],
is_secure=False, region=RegionInfo(None,
region=RegionInfo(None, self.region, self.clc_ip), self.region,
port=8773, parts['ip']),
path='/services/Cloud', port=parts['port'],
**kwargs) path='/services/Cloud',
**kwargs)
def split_clc_url(self, clc_url):
"""
Splits a cloud controller endpoint url.
"""
parts = httplib.urlsplit(clc_url)
is_secure = parts.scheme == 'https'
ip, port = parts.netloc.split(':')
return {'ip': ip, 'port': int(port), 'is_secure': is_secure}
def get_users(self): def get_users(self):
""" grabs the list of all users """ """ grabs the list of all users """
@@ -289,7 +316,7 @@ class NovaAdminClient(object):
if project.projectname != None: if project.projectname != None:
return project return project
def create_project(self, projectname, manager_user, description=None, def create_project(self, projectname, manager_user, description=None,
member_users=None): member_users=None):
""" """
@@ -322,7 +349,7 @@ class NovaAdminClient(object):
Adds a user to a project. Adds a user to a project.
""" """
return self.modify_project_member(user, project, operation='add') return self.modify_project_member(user, project, operation='add')
def remove_project_member(self, user, project): def remove_project_member(self, user, project):
""" """
Removes a user from a project. Removes a user from a project.

View File

@@ -256,8 +256,7 @@ class LdapDriver(object):
if not self.__user_exists(uid): if not self.__user_exists(uid):
raise exception.NotFound("User %s doesn't exist" % uid) raise exception.NotFound("User %s doesn't exist" % uid)
self.__remove_from_all(uid) self.__remove_from_all(uid)
self.conn.delete_s('uid=%s,%s' % (uid, self.conn.delete_s(self.__uid_to_dn(uid))
FLAGS.ldap_user_subtree))
def delete_project(self, project_id): def delete_project(self, project_id):
"""Delete a project""" """Delete a project"""
@@ -265,6 +264,19 @@ class LdapDriver(object):
self.__delete_roles(project_dn) self.__delete_roles(project_dn)
self.__delete_group(project_dn) self.__delete_group(project_dn)
def modify_user(self, uid, access_key=None, secret_key=None, admin=None):
"""Modify an existing project"""
if not access_key and not secret_key and admin is None:
return
attr = []
if access_key:
attr.append((self.ldap.MOD_REPLACE, 'accessKey', access_key))
if secret_key:
attr.append((self.ldap.MOD_REPLACE, 'secretKey', secret_key))
if admin is not None:
attr.append((self.ldap.MOD_REPLACE, 'isAdmin', str(admin).upper()))
self.conn.modify_s(self.__uid_to_dn(uid), attr)
def __user_exists(self, uid): def __user_exists(self, uid):
"""Check if user exists""" """Check if user exists"""
return self.get_user(uid) != None return self.get_user(uid) != None

View File

@@ -632,6 +632,12 @@ class AuthManager(object):
with self.driver() as drv: with self.driver() as drv:
drv.delete_user(uid) drv.delete_user(uid)
def modify_user(self, user, access_key=None, secret_key=None, admin=None):
"""Modify credentials for a user"""
uid = User.safe_id(user)
with self.driver() as drv:
drv.modify_user(uid, access_key, secret_key, admin)
def get_credentials(self, user, project=None): def get_credentials(self, user, project=None):
"""Get credential zip for user in project""" """Get credential zip for user in project"""
if not isinstance(user, User): if not isinstance(user, User):

View File

@@ -188,6 +188,8 @@ DEFINE_string('rabbit_userid', 'guest', 'rabbit userid')
DEFINE_string('rabbit_password', 'guest', 'rabbit password') DEFINE_string('rabbit_password', 'guest', 'rabbit password')
DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host') DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host')
DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to') DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to')
DEFINE_string('cc_host', '127.0.0.1', 'ip of api server')
DEFINE_integer('cc_port', 8773, 'cloud controller port')
DEFINE_string('ec2_url', 'http://127.0.0.1:8773/services/Cloud', DEFINE_string('ec2_url', 'http://127.0.0.1:8773/services/Cloud',
'Url to ec2 api server') 'Url to ec2 api server')

View File

@@ -37,3 +37,10 @@ class Manager(object):
if not db_driver: if not db_driver:
db_driver = FLAGS.db_driver db_driver = FLAGS.db_driver
self.db = utils.import_object(db_driver) # pylint: disable-msg=C0103 self.db = utils.import_object(db_driver) # pylint: disable-msg=C0103
def init_host(self):
"""Do any initialization that needs to be run if this is a standalone service.
Child classes should override this method.
"""
pass

View File

@@ -84,19 +84,6 @@ class Consumer(messaging.Consumer):
self.failed_connection = False self.failed_connection = False
super(Consumer, self).__init__(*args, **kwargs) super(Consumer, self).__init__(*args, **kwargs)
# TODO(termie): it would be nice to give these some way of automatically
# cleaning up after themselves
def attach_to_tornado(self, io_inst=None):
"""Attach a callback to tornado that fires 10 times a second"""
from tornado import ioloop
if io_inst is None:
io_inst = ioloop.IOLoop.instance()
injected = ioloop.PeriodicCallback(
lambda: self.fetch(enable_callbacks=True), 100, io_loop=io_inst)
injected.start()
return injected
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connections""" """Wraps the parent fetch with some logic for failed connections"""
# TODO(vish): the logic for failed connections and logging should be # TODO(vish): the logic for failed connections and logging should be
@@ -124,6 +111,7 @@ class Consumer(messaging.Consumer):
"""Attach a callback to twisted that fires 10 times a second""" """Attach a callback to twisted that fires 10 times a second"""
loop = task.LoopingCall(self.fetch, enable_callbacks=True) loop = task.LoopingCall(self.fetch, enable_callbacks=True)
loop.start(interval=0.1) loop.start(interval=0.1)
return loop
class Publisher(messaging.Publisher): class Publisher(messaging.Publisher):
@@ -294,6 +282,37 @@ def call(topic, msg):
return wait_msg.result return wait_msg.result
def call_twisted(topic, msg):
"""Sends a message on a topic and wait for a response"""
LOG.debug("Making asynchronous call...")
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug("MSG_ID is %s" % (msg_id))
conn = Connection.instance()
d = defer.Deferred()
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
def deferred_receive(data, message):
"""Acks message and callbacks or errbacks"""
message.ack()
if data['failure']:
return d.errback(RemoteError(*data['failure']))
else:
return d.callback(data['result'])
consumer.register_callback(deferred_receive)
injected = consumer.attach_to_twisted()
# clean up after the injected listened and return x
d.addCallback(lambda x: injected.stop() and x or x)
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
return d
def cast(topic, msg): def cast(topic, msg):
"""Sends a message on a topic without waiting for a response""" """Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...") LOG.debug("Making asynchronous cast...")

View File

@@ -50,6 +50,7 @@ class Service(object, service.Service):
self.topic = topic self.topic = topic
manager_class = utils.import_class(manager) manager_class = utils.import_class(manager)
self.manager = manager_class(host=host, *args, **kwargs) self.manager = manager_class(host=host, *args, **kwargs)
self.manager.init_host()
self.model_disconnected = False self.model_disconnected = False
super(Service, self).__init__(*args, **kwargs) super(Service, self).__init__(*args, **kwargs)
try: try:

View File

@@ -34,6 +34,7 @@ from twisted.trial import unittest
from nova import db from nova import db
from nova import fakerabbit from nova import fakerabbit
from nova import flags from nova import flags
from nova import rpc
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
@@ -63,20 +64,30 @@ class TrialTestCase(unittest.TestCase):
self.mox = mox.Mox() self.mox = mox.Mox()
self.stubs = stubout.StubOutForTesting() self.stubs = stubout.StubOutForTesting()
self.flag_overrides = {} self.flag_overrides = {}
self.injected = []
self._monkeyPatchAttach()
def tearDown(self): # pylint: disable-msg=C0103 def tearDown(self): # pylint: disable-msg=C0103
"""Runs after each test method to finalize/tear down test environment""" """Runs after each test method to finalize/tear down test environment"""
super(TrialTestCase, self).tearDown()
self.reset_flags() self.reset_flags()
self.mox.UnsetStubs() self.mox.UnsetStubs()
self.stubs.UnsetAll() self.stubs.UnsetAll()
self.stubs.SmartUnsetAll() self.stubs.SmartUnsetAll()
self.mox.VerifyAll() self.mox.VerifyAll()
rpc.Consumer.attach_to_twisted = self.originalAttach
for x in self.injected:
try:
x.stop()
except AssertionError:
pass
if FLAGS.fake_rabbit: if FLAGS.fake_rabbit:
fakerabbit.reset_all() fakerabbit.reset_all()
db.security_group_destroy_all(None) db.security_group_destroy_all(None)
super(TrialTestCase, self).tearDown()
def flags(self, **kw): def flags(self, **kw):
"""Override flag variables for a test""" """Override flag variables for a test"""
for k, v in kw.iteritems(): for k, v in kw.iteritems():
@@ -92,16 +103,51 @@ class TrialTestCase(unittest.TestCase):
for k, v in self.flag_overrides.iteritems(): for k, v in self.flag_overrides.iteritems():
setattr(FLAGS, k, v) setattr(FLAGS, k, v)
def run(self, result=None):
test_method = getattr(self, self._testMethodName)
setattr(self,
self._testMethodName,
self._maybeInlineCallbacks(test_method, result))
rv = super(TrialTestCase, self).run(result)
setattr(self, self._testMethodName, test_method)
return rv
def _maybeInlineCallbacks(self, func, result):
def _wrapped():
g = func()
if isinstance(g, defer.Deferred):
return g
if not hasattr(g, 'send'):
return defer.succeed(g)
inlined = defer.inlineCallbacks(func)
d = inlined()
return d
_wrapped.func_name = func.func_name
return _wrapped
def _monkeyPatchAttach(self):
self.originalAttach = rpc.Consumer.attach_to_twisted
def _wrapped(innerSelf):
rv = self.originalAttach(innerSelf)
self.injected.append(rv)
return rv
_wrapped.func_name = self.originalAttach.func_name
rpc.Consumer.attach_to_twisted = _wrapped
class BaseTestCase(TrialTestCase): class BaseTestCase(TrialTestCase):
# TODO(jaypipes): Can this be moved into the TrialTestCase class? # TODO(jaypipes): Can this be moved into the TrialTestCase class?
"""Base test case class for all unit tests.""" """Base test case class for all unit tests.
DEPRECATED: This is being removed once Tornado is gone, use TrialTestCase.
"""
def setUp(self): # pylint: disable-msg=C0103 def setUp(self): # pylint: disable-msg=C0103
"""Run before each test method to initialize test environment""" """Run before each test method to initialize test environment"""
super(BaseTestCase, self).setUp() super(BaseTestCase, self).setUp()
# TODO(termie): we could possibly keep a more global registry of # TODO(termie): we could possibly keep a more global registry of
# the injected listeners... this is fine for now though # the injected listeners... this is fine for now though
self.injected = []
self.ioloop = ioloop.IOLoop.instance() self.ioloop = ioloop.IOLoop.instance()
self._waiting = None self._waiting = None
@@ -111,8 +157,6 @@ class BaseTestCase(TrialTestCase):
def tearDown(self):# pylint: disable-msg=C0103 def tearDown(self):# pylint: disable-msg=C0103
"""Runs after each test method to finalize/tear down test environment""" """Runs after each test method to finalize/tear down test environment"""
super(BaseTestCase, self).tearDown() super(BaseTestCase, self).tearDown()
for x in self.injected:
x.stop()
if FLAGS.fake_rabbit: if FLAGS.fake_rabbit:
fakerabbit.reset_all() fakerabbit.reset_all()

View File

@@ -31,7 +31,7 @@ FLAGS = flags.FLAGS
class Context(object): class Context(object):
pass pass
class AccessTestCase(test.BaseTestCase): class AccessTestCase(test.TrialTestCase):
def setUp(self): def setUp(self):
super(AccessTestCase, self).setUp() super(AccessTestCase, self).setUp()
um = manager.AuthManager() um = manager.AuthManager()

View File

@@ -28,25 +28,71 @@ from nova.api.ec2 import cloud
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
class user_generator(object):
def __init__(self, manager, **user_state):
if 'name' not in user_state:
user_state['name'] = 'test1'
self.manager = manager
self.user = manager.create_user(**user_state)
class AuthTestCase(test.BaseTestCase): def __enter__(self):
return self.user
def __exit__(self, value, type, trace):
self.manager.delete_user(self.user)
class project_generator(object):
def __init__(self, manager, **project_state):
if 'name' not in project_state:
project_state['name'] = 'testproj'
if 'manager_user' not in project_state:
project_state['manager_user'] = 'test1'
self.manager = manager
self.project = manager.create_project(**project_state)
def __enter__(self):
return self.project
def __exit__(self, value, type, trace):
self.manager.delete_project(self.project)
class user_and_project_generator(object):
def __init__(self, manager, user_state={}, project_state={}):
self.manager = manager
if 'name' not in user_state:
user_state['name'] = 'test1'
if 'name' not in project_state:
project_state['name'] = 'testproj'
if 'manager_user' not in project_state:
project_state['manager_user'] = 'test1'
self.user = manager.create_user(**user_state)
self.project = manager.create_project(**project_state)
def __enter__(self):
return (self.user, self.project)
def __exit__(self, value, type, trace):
self.manager.delete_user(self.user)
self.manager.delete_project(self.project)
class AuthManagerTestCase(test.TrialTestCase):
def setUp(self): def setUp(self):
super(AuthTestCase, self).setUp() super(AuthManagerTestCase, self).setUp()
self.flags(connection_type='fake') self.flags(connection_type='fake')
self.manager = manager.AuthManager() self.manager = manager.AuthManager()
def test_001_can_create_users(self): def test_create_and_find_user(self):
self.manager.create_user('test1', 'access', 'secret') with user_generator(self.manager):
self.manager.create_user('test2') self.assert_(self.manager.get_user('test1'))
def test_002_can_get_user(self): def test_create_and_find_with_properties(self):
user = self.manager.get_user('test1') with user_generator(self.manager, name="herbert", secret="classified",
access="private-party"):
def test_003_can_retreive_properties(self): u = self.manager.get_user('herbert')
user = self.manager.get_user('test1') self.assertEqual('herbert', u.id)
self.assertEqual('test1', user.id) self.assertEqual('herbert', u.name)
self.assertEqual('access', user.access) self.assertEqual('classified', u.secret)
self.assertEqual('secret', user.secret) self.assertEqual('private-party', u.access)
def test_004_signature_is_valid(self): def test_004_signature_is_valid(self):
#self.assertTrue(self.manager.authenticate( **boto.generate_url ... ? ? ? )) #self.assertTrue(self.manager.authenticate( **boto.generate_url ... ? ? ? ))
@@ -63,133 +109,216 @@ class AuthTestCase(test.BaseTestCase):
'export S3_URL="http://127.0.0.1:3333/"\n' + 'export S3_URL="http://127.0.0.1:3333/"\n' +
'export EC2_USER_ID="test1"\n') 'export EC2_USER_ID="test1"\n')
def test_010_can_list_users(self): def test_can_list_users(self):
users = self.manager.get_users() with user_generator(self.manager):
logging.warn(users) with user_generator(self.manager, name="test2"):
self.assertTrue(filter(lambda u: u.id == 'test1', users)) users = self.manager.get_users()
self.assert_(filter(lambda u: u.id == 'test1', users))
self.assert_(filter(lambda u: u.id == 'test2', users))
self.assert_(not filter(lambda u: u.id == 'test3', users))
def test_can_add_and_remove_user_role(self):
with user_generator(self.manager):
self.assertFalse(self.manager.has_role('test1', 'itsec'))
self.manager.add_role('test1', 'itsec')
self.assertTrue(self.manager.has_role('test1', 'itsec'))
self.manager.remove_role('test1', 'itsec')
self.assertFalse(self.manager.has_role('test1', 'itsec'))
def test_101_can_add_user_role(self): def test_can_create_and_get_project(self):
self.assertFalse(self.manager.has_role('test1', 'itsec')) with user_and_project_generator(self.manager) as (u,p):
self.manager.add_role('test1', 'itsec') self.assert_(self.manager.get_user('test1'))
self.assertTrue(self.manager.has_role('test1', 'itsec')) self.assert_(self.manager.get_user('test1'))
self.assert_(self.manager.get_project('testproj'))
def test_199_can_remove_user_role(self): def test_can_list_projects(self):
self.assertTrue(self.manager.has_role('test1', 'itsec')) with user_and_project_generator(self.manager):
self.manager.remove_role('test1', 'itsec') with project_generator(self.manager, name="testproj2"):
self.assertFalse(self.manager.has_role('test1', 'itsec')) projects = self.manager.get_projects()
self.assert_(filter(lambda p: p.name == 'testproj', projects))
self.assert_(filter(lambda p: p.name == 'testproj2', projects))
self.assert_(not filter(lambda p: p.name == 'testproj3',
projects))
def test_201_can_create_project(self): def test_can_create_and_get_project_with_attributes(self):
project = self.manager.create_project('testproj', 'test1', 'A test project', ['test1']) with user_generator(self.manager):
self.assertTrue(filter(lambda p: p.name == 'testproj', self.manager.get_projects())) with project_generator(self.manager, description='A test project'):
self.assertEqual(project.name, 'testproj') project = self.manager.get_project('testproj')
self.assertEqual(project.description, 'A test project') self.assertEqual('A test project', project.description)
self.assertEqual(project.project_manager_id, 'test1')
self.assertTrue(project.has_member('test1'))
def test_202_user1_is_project_member(self): def test_can_create_project_with_manager(self):
self.assertTrue(self.manager.get_user('test1').is_project_member('testproj')) with user_and_project_generator(self.manager) as (user, project):
self.assertEqual('test1', project.project_manager_id)
self.assertTrue(self.manager.is_project_manager(user, project))
def test_203_user2_is_not_project_member(self): def test_create_project_assigns_manager_to_members(self):
self.assertFalse(self.manager.get_user('test2').is_project_member('testproj')) with user_and_project_generator(self.manager) as (user, project):
self.assertTrue(self.manager.is_project_member(user, project))
def test_204_user1_is_project_manager(self): def test_no_extra_project_members(self):
self.assertTrue(self.manager.get_user('test1').is_project_manager('testproj')) with user_generator(self.manager, name='test2') as baduser:
with user_and_project_generator(self.manager) as (user, project):
self.assertFalse(self.manager.is_project_member(baduser,
project))
def test_205_user2_is_not_project_manager(self): def test_no_extra_project_managers(self):
self.assertFalse(self.manager.get_user('test2').is_project_manager('testproj')) with user_generator(self.manager, name='test2') as baduser:
with user_and_project_generator(self.manager) as (user, project):
self.assertFalse(self.manager.is_project_manager(baduser,
project))
def test_206_can_add_user_to_project(self): def test_can_add_user_to_project(self):
self.manager.add_to_project('test2', 'testproj') with user_generator(self.manager, name='test2') as user:
self.assertTrue(self.manager.get_project('testproj').has_member('test2')) with user_and_project_generator(self.manager) as (_user, project):
self.manager.add_to_project(user, project)
project = self.manager.get_project('testproj')
self.assertTrue(self.manager.is_project_member(user, project))
def test_207_can_remove_user_from_project(self): def test_can_remove_user_from_project(self):
self.manager.remove_from_project('test2', 'testproj') with user_generator(self.manager, name='test2') as user:
self.assertFalse(self.manager.get_project('testproj').has_member('test2')) with user_and_project_generator(self.manager) as (_user, project):
self.manager.add_to_project(user, project)
project = self.manager.get_project('testproj')
self.assertTrue(self.manager.is_project_member(user, project))
self.manager.remove_from_project(user, project)
project = self.manager.get_project('testproj')
self.assertFalse(self.manager.is_project_member(user, project))
def test_208_can_remove_add_user_with_role(self): def test_can_add_remove_user_with_role(self):
self.manager.add_to_project('test2', 'testproj') with user_generator(self.manager, name='test2') as user:
self.manager.add_role('test2', 'developer', 'testproj') with user_and_project_generator(self.manager) as (_user, project):
self.manager.remove_from_project('test2', 'testproj') # NOTE(todd): after modifying users you must reload project
self.assertFalse(self.manager.has_role('test2', 'developer', 'testproj')) self.manager.add_to_project(user, project)
self.manager.add_to_project('test2', 'testproj') project = self.manager.get_project('testproj')
self.manager.remove_from_project('test2', 'testproj') self.manager.add_role(user, 'developer', project)
self.assertTrue(self.manager.is_project_member(user, project))
self.manager.remove_from_project(user, project)
project = self.manager.get_project('testproj')
self.assertFalse(self.manager.has_role(user, 'developer',
project))
self.assertFalse(self.manager.is_project_member(user, project))
def test_209_can_generate_x509(self): def test_can_generate_x509(self):
# MUST HAVE RUN CLOUD SETUP BY NOW # NOTE(todd): this doesn't assert against the auth manager
self.cloud = cloud.CloudController() # so it probably belongs in crypto_unittest
self.cloud.setup() # but I'm leaving it where I found it.
_key, cert_str = self.manager._generate_x509_cert('test1', 'testproj') with user_and_project_generator(self.manager) as (user, project):
logging.debug(cert_str) # NOTE(todd): Should mention why we must setup controller first
# (somebody please clue me in)
cloud_controller = cloud.CloudController()
cloud_controller.setup()
_key, cert_str = self.manager._generate_x509_cert('test1',
'testproj')
logging.debug(cert_str)
# Need to verify that it's signed by the right intermediate CA # Need to verify that it's signed by the right intermediate CA
full_chain = crypto.fetch_ca(project_id='testproj', chain=True) full_chain = crypto.fetch_ca(project_id='testproj', chain=True)
int_cert = crypto.fetch_ca(project_id='testproj', chain=False) int_cert = crypto.fetch_ca(project_id='testproj', chain=False)
cloud_cert = crypto.fetch_ca() cloud_cert = crypto.fetch_ca()
logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain) logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain)
signed_cert = X509.load_cert_string(cert_str) signed_cert = X509.load_cert_string(cert_str)
chain_cert = X509.load_cert_string(full_chain) chain_cert = X509.load_cert_string(full_chain)
int_cert = X509.load_cert_string(int_cert) int_cert = X509.load_cert_string(int_cert)
cloud_cert = X509.load_cert_string(cloud_cert) cloud_cert = X509.load_cert_string(cloud_cert)
self.assertTrue(signed_cert.verify(chain_cert.get_pubkey())) self.assertTrue(signed_cert.verify(chain_cert.get_pubkey()))
self.assertTrue(signed_cert.verify(int_cert.get_pubkey())) self.assertTrue(signed_cert.verify(int_cert.get_pubkey()))
if not FLAGS.use_intermediate_ca:
self.assertTrue(signed_cert.verify(cloud_cert.get_pubkey()))
else:
self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey()))
if not FLAGS.use_intermediate_ca: def test_adding_role_to_project_is_ignored_unless_added_to_user(self):
self.assertTrue(signed_cert.verify(cloud_cert.get_pubkey())) with user_and_project_generator(self.manager) as (user, project):
else: self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey())) self.manager.add_role(user, 'sysadmin', project)
# NOTE(todd): it will still show up in get_user_roles(u, project)
self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
self.manager.add_role(user, 'sysadmin')
self.assertTrue(self.manager.has_role(user, 'sysadmin', project))
def test_210_can_add_project_role(self): def test_add_user_role_doesnt_infect_project_roles(self):
project = self.manager.get_project('testproj') with user_and_project_generator(self.manager) as (user, project):
self.assertFalse(project.has_role('test1', 'sysadmin')) self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
self.manager.add_role('test1', 'sysadmin') self.manager.add_role(user, 'sysadmin')
self.assertFalse(project.has_role('test1', 'sysadmin')) self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
project.add_role('test1', 'sysadmin')
self.assertTrue(project.has_role('test1', 'sysadmin'))
def test_211_can_list_project_roles(self): def test_can_list_user_roles(self):
project = self.manager.get_project('testproj') with user_and_project_generator(self.manager) as (user, project):
user = self.manager.get_user('test1') self.manager.add_role(user, 'sysadmin')
self.manager.add_role(user, 'netadmin', project) roles = self.manager.get_user_roles(user)
roles = self.manager.get_user_roles(user) self.assertTrue('sysadmin' in roles)
self.assertTrue('sysadmin' in roles) self.assertFalse('netadmin' in roles)
self.assertFalse('netadmin' in roles)
project_roles = self.manager.get_user_roles(user, project)
self.assertTrue('sysadmin' in project_roles)
self.assertTrue('netadmin' in project_roles)
# has role should be false because global role is missing
self.assertFalse(self.manager.has_role(user, 'netadmin', project))
def test_can_list_project_roles(self):
with user_and_project_generator(self.manager) as (user, project):
self.manager.add_role(user, 'sysadmin')
self.manager.add_role(user, 'sysadmin', project)
self.manager.add_role(user, 'netadmin', project)
project_roles = self.manager.get_user_roles(user, project)
self.assertTrue('sysadmin' in project_roles)
self.assertTrue('netadmin' in project_roles)
# has role should be false user-level role is missing
self.assertFalse(self.manager.has_role(user, 'netadmin', project))
def test_212_can_remove_project_role(self): def test_can_remove_user_roles(self):
project = self.manager.get_project('testproj') with user_and_project_generator(self.manager) as (user, project):
self.assertTrue(project.has_role('test1', 'sysadmin')) self.manager.add_role(user, 'sysadmin')
project.remove_role('test1', 'sysadmin') self.assertTrue(self.manager.has_role(user, 'sysadmin'))
self.assertFalse(project.has_role('test1', 'sysadmin')) self.manager.remove_role(user, 'sysadmin')
self.manager.remove_role('test1', 'sysadmin') self.assertFalse(self.manager.has_role(user, 'sysadmin'))
self.assertFalse(project.has_role('test1', 'sysadmin'))
def test_214_can_retrieve_project_by_user(self): def test_removing_user_role_hides_it_from_project(self):
project = self.manager.create_project('testproj2', 'test2', 'Another test project', ['test2']) with user_and_project_generator(self.manager) as (user, project):
self.assert_(len(self.manager.get_projects()) > 1) self.manager.add_role(user, 'sysadmin')
self.assertEqual(len(self.manager.get_projects('test2')), 1) self.manager.add_role(user, 'sysadmin', project)
self.assertTrue(self.manager.has_role(user, 'sysadmin', project))
self.manager.remove_role(user, 'sysadmin')
self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
def test_220_can_modify_project(self): def test_can_remove_project_role_but_keep_user_role(self):
self.manager.modify_project('testproj', 'test2', 'new description') with user_and_project_generator(self.manager) as (user, project):
project = self.manager.get_project('testproj') self.manager.add_role(user, 'sysadmin')
self.assertEqual(project.project_manager_id, 'test2') self.manager.add_role(user, 'sysadmin', project)
self.assertEqual(project.description, 'new description') self.assertTrue(self.manager.has_role(user, 'sysadmin'))
self.manager.remove_role(user, 'sysadmin', project)
self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
self.assertTrue(self.manager.has_role(user, 'sysadmin'))
def test_299_can_delete_project(self): def test_can_retrieve_project_by_user(self):
self.manager.delete_project('testproj') with user_and_project_generator(self.manager) as (user, project):
self.assertFalse(filter(lambda p: p.name == 'testproj', self.manager.get_projects())) self.assertEqual(1, len(self.manager.get_projects('test1')))
self.manager.delete_project('testproj2')
def test_999_can_delete_users(self): def test_can_modify_project(self):
with user_and_project_generator(self.manager):
with user_generator(self.manager, name='test2'):
self.manager.modify_project('testproj', 'test2', 'new desc')
project = self.manager.get_project('testproj')
self.assertEqual('test2', project.project_manager_id)
self.assertEqual('new desc', project.description)
def test_can_delete_project(self):
with user_generator(self.manager):
self.manager.create_project('testproj', 'test1')
self.assert_(self.manager.get_project('testproj'))
self.manager.delete_project('testproj')
projectlist = self.manager.get_projects()
self.assert_(not filter(lambda p: p.name == 'testproj',
projectlist))
def test_can_delete_user(self):
self.manager.create_user('test1')
self.assert_(self.manager.get_user('test1'))
self.manager.delete_user('test1') self.manager.delete_user('test1')
users = self.manager.get_users() userlist = self.manager.get_users()
self.assertFalse(filter(lambda u: u.id == 'test1', users)) self.assert_(not filter(lambda u: u.id == 'test1', userlist))
self.manager.delete_user('test2')
self.assertEqual(self.manager.get_user('test2'), None) def test_can_modify_users(self):
with user_generator(self.manager):
self.manager.modify_user('test1', 'access', 'secret', True)
user = self.manager.get_user('test1')
self.assertEqual('access', user.access)
self.assertEqual('secret', user.secret)
self.assertTrue(user.is_admin())
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -16,10 +16,13 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import json
import logging import logging
from M2Crypto import BIO from M2Crypto import BIO
from M2Crypto import RSA from M2Crypto import RSA
import os
import StringIO import StringIO
import tempfile
import time import time
from twisted.internet import defer from twisted.internet import defer
@@ -36,15 +39,22 @@ from nova.auth import manager
from nova.compute import power_state from nova.compute import power_state
from nova.api.ec2 import context from nova.api.ec2 import context
from nova.api.ec2 import cloud from nova.api.ec2 import cloud
from nova.objectstore import image
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
class CloudTestCase(test.BaseTestCase): # Temp dirs for working with image attributes through the cloud controller
# (stole this from objectstore_unittest.py)
OSS_TEMPDIR = tempfile.mkdtemp(prefix='test_oss-')
IMAGES_PATH = os.path.join(OSS_TEMPDIR, 'images')
os.makedirs(IMAGES_PATH)
class CloudTestCase(test.TrialTestCase):
def setUp(self): def setUp(self):
super(CloudTestCase, self).setUp() super(CloudTestCase, self).setUp()
self.flags(connection_type='fake') self.flags(connection_type='fake', images_path=IMAGES_PATH)
self.conn = rpc.Connection.instance() self.conn = rpc.Connection.instance()
logging.getLogger().setLevel(logging.DEBUG) logging.getLogger().setLevel(logging.DEBUG)
@@ -55,9 +65,9 @@ class CloudTestCase(test.BaseTestCase):
# set up a service # set up a service
self.compute = utils.import_class(FLAGS.compute_manager) self.compute = utils.import_class(FLAGS.compute_manager)
self.compute_consumer = rpc.AdapterConsumer(connection=self.conn, self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
topic=FLAGS.compute_topic, topic=FLAGS.compute_topic,
proxy=self.compute) proxy=self.compute)
self.injected.append(self.compute_consumer.attach_to_tornado(self.ioloop)) self.compute_consumer.attach_to_twisted()
self.manager = manager.AuthManager() self.manager = manager.AuthManager()
self.user = self.manager.create_user('admin', 'admin', 'admin', True) self.user = self.manager.create_user('admin', 'admin', 'admin', True)
@@ -68,7 +78,7 @@ class CloudTestCase(test.BaseTestCase):
def tearDown(self): def tearDown(self):
self.manager.delete_project(self.project) self.manager.delete_project(self.project)
self.manager.delete_user(self.user) self.manager.delete_user(self.user)
super(CloudTestCase, self).setUp() super(CloudTestCase, self).tearDown()
def _create_key(self, name): def _create_key(self, name):
# NOTE(vish): create depends on pool, so just call helper directly # NOTE(vish): create depends on pool, so just call helper directly
@@ -191,3 +201,67 @@ class CloudTestCase(test.BaseTestCase):
#for i in xrange(4): #for i in xrange(4):
# data = self.cloud.get_metadata(instance(i)['private_dns_name']) # data = self.cloud.get_metadata(instance(i)['private_dns_name'])
# self.assert_(data['meta-data']['ami-id'] == 'ami-%s' % i) # self.assert_(data['meta-data']['ami-id'] == 'ami-%s' % i)
@staticmethod
def _fake_set_image_description(ctxt, image_id, description):
from nova.objectstore import handler
class req:
pass
request = req()
request.context = ctxt
request.args = {'image_id': [image_id],
'description': [description]}
resource = handler.ImagesResource()
resource.render_POST(request)
def test_user_editable_image_endpoint(self):
pathdir = os.path.join(FLAGS.images_path, 'ami-testing')
os.mkdir(pathdir)
info = {'isPublic': False}
with open(os.path.join(pathdir, 'info.json'), 'w') as f:
json.dump(info, f)
img = image.Image('ami-testing')
# self.cloud.set_image_description(self.context, 'ami-testing',
# 'Foo Img')
# NOTE(vish): Above won't work unless we start objectstore or create
# a fake version of api/ec2/images.py conn that can
# call methods directly instead of going through boto.
# for now, just cheat and call the method directly
self._fake_set_image_description(self.context, 'ami-testing',
'Foo Img')
self.assertEqual('Foo Img', img.metadata['description'])
self._fake_set_image_description(self.context, 'ami-testing', '')
self.assertEqual('', img.metadata['description'])
def test_update_of_instance_display_fields(self):
inst = db.instance_create({}, {})
self.cloud.update_instance(self.context, inst['ec2_id'],
display_name='c00l 1m4g3')
inst = db.instance_get({}, inst['id'])
self.assertEqual('c00l 1m4g3', inst['display_name'])
db.instance_destroy({}, inst['id'])
def test_update_of_instance_wont_update_private_fields(self):
inst = db.instance_create({}, {})
self.cloud.update_instance(self.context, inst['id'],
mac_address='DE:AD:BE:EF')
inst = db.instance_get({}, inst['id'])
self.assertEqual(None, inst['mac_address'])
db.instance_destroy({}, inst['id'])
def test_update_of_volume_display_fields(self):
vol = db.volume_create({}, {})
self.cloud.update_volume(self.context, vol['id'],
display_name='c00l v0lum3')
vol = db.volume_get({}, vol['id'])
self.assertEqual('c00l v0lum3', vol['display_name'])
db.volume_destroy({}, vol['id'])
def test_update_of_volume_wont_update_private_fields(self):
vol = db.volume_create({}, {})
self.cloud.update_volume(self.context, vol['id'],
mountpoint='/not/here')
vol = db.volume_get({}, vol['id'])
self.assertEqual(None, vol['mountpoint'])
db.volume_destroy({}, vol['id'])

View File

@@ -53,7 +53,7 @@ os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets')) os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
class ObjectStoreTestCase(test.BaseTestCase): class ObjectStoreTestCase(test.TrialTestCase):
"""Test objectstore API directly.""" """Test objectstore API directly."""
def setUp(self): # pylint: disable-msg=C0103 def setUp(self): # pylint: disable-msg=C0103
@@ -164,6 +164,12 @@ class ObjectStoreTestCase(test.BaseTestCase):
self.context.project = self.auth_manager.get_project('proj2') self.context.project = self.auth_manager.get_project('proj2')
self.assertFalse(my_img.is_authorized(self.context)) self.assertFalse(my_img.is_authorized(self.context))
# change user-editable fields
my_img.update_user_editable_fields({'display_name': 'my cool image'})
self.assertEqual('my cool image', my_img.metadata['displayName'])
my_img.update_user_editable_fields({'display_name': ''})
self.assert_(not my_img.metadata['displayName'])
class TestHTTPChannel(http.HTTPChannel): class TestHTTPChannel(http.HTTPChannel):
"""Dummy site required for twisted.web""" """Dummy site required for twisted.web"""

View File

@@ -30,7 +30,7 @@ from nova import test
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
class RpcTestCase(test.BaseTestCase): class RpcTestCase(test.TrialTestCase):
"""Test cases for rpc""" """Test cases for rpc"""
def setUp(self): # pylint: disable-msg=C0103 def setUp(self): # pylint: disable-msg=C0103
super(RpcTestCase, self).setUp() super(RpcTestCase, self).setUp()
@@ -39,14 +39,13 @@ class RpcTestCase(test.BaseTestCase):
self.consumer = rpc.AdapterConsumer(connection=self.conn, self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test', topic='test',
proxy=self.receiver) proxy=self.receiver)
self.consumer.attach_to_twisted()
self.injected.append(self.consumer.attach_to_tornado(self.ioloop))
def test_call_succeed(self): def test_call_succeed(self):
"""Get a value through rpc call""" """Get a value through rpc call"""
value = 42 value = 42
result = yield rpc.call('test', {"method": "echo", result = yield rpc.call_twisted('test', {"method": "echo",
"args": {"value": value}}) "args": {"value": value}})
self.assertEqual(value, result) self.assertEqual(value, result)
def test_call_exception(self): def test_call_exception(self):
@@ -57,12 +56,12 @@ class RpcTestCase(test.BaseTestCase):
to an int in the test. to an int in the test.
""" """
value = 42 value = 42
self.assertFailure(rpc.call('test', {"method": "fail", self.assertFailure(rpc.call_twisted('test', {"method": "fail",
"args": {"value": value}}), "args": {"value": value}}),
rpc.RemoteError) rpc.RemoteError)
try: try:
yield rpc.call('test', {"method": "fail", yield rpc.call_twisted('test', {"method": "fail",
"args": {"value": value}}) "args": {"value": value}})
self.fail("should have thrown rpc.RemoteError") self.fail("should have thrown rpc.RemoteError")
except rpc.RemoteError as exc: except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value) self.assertEqual(int(exc.value), value)

View File

@@ -21,14 +21,17 @@
Utility methods for working with WSGI servers Utility methods for working with WSGI servers
""" """
import json
import logging import logging
import sys import sys
from xml.dom import minidom
import eventlet import eventlet
import eventlet.wsgi import eventlet.wsgi
eventlet.patcher.monkey_patch(all=False, socket=True) eventlet.patcher.monkey_patch(all=False, socket=True)
import routes import routes
import routes.middleware import routes.middleware
import webob
import webob.dec import webob.dec
import webob.exc import webob.exc
@@ -230,7 +233,7 @@ class Controller(object):
class Serializer(object): class Serializer(object):
""" """
Serializes a dictionary to a Content Type specified by a WSGI environment. Serializes and deserializes dictionaries to certain MIME types.
""" """
def __init__(self, environ, metadata=None): def __init__(self, environ, metadata=None):
@@ -239,31 +242,74 @@ class Serializer(object):
'metadata' is an optional dict mapping MIME types to information 'metadata' is an optional dict mapping MIME types to information
needed to serialize a dictionary to that type. needed to serialize a dictionary to that type.
""" """
self.environ = environ
self.metadata = metadata or {} self.metadata = metadata or {}
self._methods = { req = webob.Request(environ)
'application/json': self._to_json, suffix = req.path_info.split('.')[-1].lower()
'application/xml': self._to_xml} if suffix == 'json':
self.handler = self._to_json
elif suffix == 'xml':
self.handler = self._to_xml
elif 'application/json' in req.accept:
self.handler = self._to_json
elif 'application/xml' in req.accept:
self.handler = self._to_xml
else:
self.handler = self._to_json # default
def to_content_type(self, data): def to_content_type(self, data):
""" """
Serialize a dictionary into a string. The format of the string Serialize a dictionary into a string.
will be decided based on the Content Type requested in self.environ:
by Accept: header, or by URL suffix. The format of the string will be decided based on the Content Type
requested in self.environ: by Accept: header, or by URL suffix.
""" """
mimetype = 'application/xml' return self.handler(data)
# TODO(gundlach): determine mimetype from request
return self._methods.get(mimetype, repr)(data) def deserialize(self, datastring):
"""
Deserialize a string to a dictionary.
The string must be in the format of a supported MIME type.
"""
datastring = datastring.strip()
is_xml = (datastring[0] == '<')
if not is_xml:
return json.loads(datastring)
return self._from_xml(datastring)
def _from_xml(self, datastring):
xmldata = self.metadata.get('application/xml', {})
plurals = set(xmldata.get('plurals', {}))
node = minidom.parseString(datastring).childNodes[0]
return {node.nodeName: self._from_xml_node(node, plurals)}
def _from_xml_node(self, node, listnames):
"""
Convert a minidom node to a simple Python type.
listnames is a collection of names of XML nodes whose subnodes should
be considered list items.
"""
if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3:
return node.childNodes[0].nodeValue
elif node.nodeName in listnames:
return [self._from_xml_node(n, listnames) for n in node.childNodes]
else:
result = dict()
for attr in node.attributes.keys():
result[attr] = node.attributes[attr].nodeValue
for child in node.childNodes:
if child.nodeType != node.TEXT_NODE:
result[child.nodeName] = self._from_xml_node(child, listnames)
return result
def _to_json(self, data): def _to_json(self, data):
import json
return json.dumps(data) return json.dumps(data)
def _to_xml(self, data): def _to_xml(self, data):
metadata = self.metadata.get('application/xml', {}) metadata = self.metadata.get('application/xml', {})
# We expect data to contain a single key which is the XML root. # We expect data to contain a single key which is the XML root.
root_key = data.keys()[0] root_key = data.keys()[0]
from xml.dom import minidom
doc = minidom.Document() doc = minidom.Document()
node = self._to_xml_node(doc, metadata, root_key, data[root_key]) node = self._to_xml_node(doc, metadata, root_key, data[root_key])
return node.toprettyxml(indent=' ') return node.toprettyxml(indent=' ')

View File

@@ -1,7 +1,8 @@
[Messages Control] [Messages Control]
# W0511: TODOs in code comments are fine. # W0511: TODOs in code comments are fine.
# W0142: *args and **kwargs are fine. # W0142: *args and **kwargs are fine.
disable-msg=W0511,W0142 # W0622: Redefining id is fine.
disable-msg=W0511,W0142,W0622
[Basic] [Basic]
# Variable names can be 1 to 31 characters long, with lowercase and underscores # Variable names can be 1 to 31 characters long, with lowercase and underscores

View File

@@ -54,5 +54,5 @@ setup(name='nova',
'bin/nova-manage', 'bin/nova-manage',
'bin/nova-network', 'bin/nova-network',
'bin/nova-objectstore', 'bin/nova-objectstore',
'bin/nova-api-new', 'bin/nova-scheduler',
'bin/nova-volume']) 'bin/nova-volume'])