Merge with trunk, resolve conflicts and refactor

This commit is contained in:
Justin Santa Barbara
2011-03-07 14:34:02 -08:00
26 changed files with 800 additions and 234 deletions

View File

@@ -13,3 +13,4 @@ CA/serial*
CA/newcerts/*.pem
CA/private/cakey.pem
nova/vcsversion.py
*.DS_Store

View File

@@ -15,10 +15,12 @@
<corywright@gmail.com> <cory.wright@rackspace.com>
<devin.carlen@gmail.com> <devcamcar@illian.local>
<ewan.mellor@citrix.com> <emellor@silver>
<itoumsn@nttdata.co.jp> <itoumsn@shayol>
<jaypipes@gmail.com> <jpipes@serialcoder>
<jmckenty@gmail.com> <jmckenty@joshua-mckentys-macbook-pro.local>
<jmckenty@gmail.com> <jmckenty@yyj-dhcp171.corp.flock.com>
<jmckenty@gmail.com> <joshua.mckenty@nasa.gov>
<josh@jk0.org> <josh.kearney@rackspace.com>
<justin@fathomdb.com> <justinsb@justinsb-desktop>
<justin@fathomdb.com> <superstack@superstack.org>
<masumotok@nttdata.co.jp> Masumoto<masumotok@nttdata.co.jp>
@@ -40,4 +42,5 @@
<ueno.nachi@lab.ntt.co.jp> <openstack@lab.ntt.co.jp>
<vishvananda@gmail.com> <root@mirror.nasanebula.net>
<vishvananda@gmail.com> <root@ubuntu>
<naveedm9@gmail.com> <naveed.massjouni@rackspace.com>
<vishvananda@gmail.com> <vishvananda@yahoo.com>

View File

@@ -31,13 +31,15 @@ John Dewey <john@dewey.ws>
Jonathan Bryce <jbryce@jbryce.com>
Jordan Rinke <jordan@openstack.org>
Josh Durgin <joshd@hq.newdream.net>
Josh Kearney <josh.kearney@rackspace.com>
Josh Kearney <josh@jk0.org>
Joshua McKenty <jmckenty@gmail.com>
Justin Santa Barbara <justin@fathomdb.com>
Kei Masumoto <masumotok@nttdata.co.jp>
Ken Pepple <ken.pepple@gmail.com>
Kevin L. Mitchell <kevin.mitchell@rackspace.com>
Koji Iida <iida.koji@lab.ntt.co.jp>
Lorin Hochstein <lorin@isi.edu>
Masanori Itoh <itoumsn@nttdata.co.jp>
Matt Dietz <matt.dietz@rackspace.com>
Michael Gundlach <michael.gundlach@rackspace.com>
Monsyne Dragon <mdragon@rackspace.com>
@@ -45,7 +47,8 @@ Monty Taylor <mordred@inaugust.com>
MORITA Kazutaka <morita.kazutaka@gmail.com>
Muneyuki Noguchi <noguchimn@nttdata.co.jp>
Nachi Ueno <ueno.nachi@lab.ntt.co.jp>
Naveed Massjouni <naveed.massjouni@rackspace.com>
Naveed Massjouni <naveedm9@gmail.com>
Nirmal Ranganathan <nirmal.ranganathan@rackspace.com>
Paul Voccio <paul@openstack.org>
Ricardo Carrillo Cruz <emaildericky@gmail.com>
Rick Clark <rick@openstack.org>

View File

@@ -38,3 +38,4 @@ include nova/tests/db/nova.austin.sqlite
include plugins/xenapi/README
include plugins/xenapi/etc/xapi.d/plugins/objectstore
include plugins/xenapi/etc/xapi.d/plugins/pluginlib_nova.py
global-exclude *.pyc

View File

@@ -47,9 +47,11 @@ from nova import utils
from nova import wsgi
FLAGS = flags.FLAGS
flags.DEFINE_integer('ajax_console_idle_timeout', 300,
'Seconds before idle connection destroyed')
flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())
LOG = logging.getLogger('nova.ajax_console_proxy')
LOG.setLevel(logging.DEBUG)
@@ -61,10 +63,16 @@ class AjaxConsoleProxy(object):
def __call__(self, env, start_response):
try:
req_url = '%s://%s%s?%s' % (env['wsgi.url_scheme'],
env['HTTP_HOST'],
env['PATH_INFO'],
env['QUERY_STRING'])
if 'QUERY_STRING' in env:
req_url = '%s://%s%s?%s' % (env['wsgi.url_scheme'],
env['HTTP_HOST'],
env['PATH_INFO'],
env['QUERY_STRING'])
else:
req_url = '%s://%s%s' % (env['wsgi.url_scheme'],
env['HTTP_HOST'],
env['PATH_INFO'])
if 'HTTP_REFERER' in env:
auth_url = env['HTTP_REFERER']
else:

View File

@@ -34,20 +34,33 @@ if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
gettext.install('nova', unicode=1)
from nova import apiservice
from nova import service
from nova import flags
from nova import log as logging
from nova import utils
from nova import version
from nova import wsgi
LOG = logging.getLogger('nova.api')
FLAGS = flags.FLAGS
if __name__ == '__main__':
utils.default_flagfile()
FLAGS(sys.argv)
logging.setup()
LOG.audit(_("Starting nova-api node (version %s)"),
version.version_string_with_vcs())
LOG.debug(_("Full set of FLAGS:"))
for flag in FLAGS:
flag_get = FLAGS.get(flag, None)
LOG.debug("%(flag)s : %(flag_get)s" % locals())
conf = wsgi.paste_config_file('nova-api.conf')
if not conf:
LOG.error(_("No paste configuration found for: %s"), 'nova-api.conf')
sys.exit(1)
else:
service = apiservice.serve(conf)
service = service.serve_wsgi(service.ApiService, conf)
service.wait()

View File

@@ -105,16 +105,7 @@ def main():
logging.setup()
interface = os.environ.get('DNSMASQ_INTERFACE', 'br0')
if int(os.environ.get('TESTING', '0')):
FLAGS.fake_rabbit = True
FLAGS.network_size = 16
FLAGS.connection_type = 'fake'
FLAGS.fake_network = True
FLAGS.auth_driver = 'nova.auth.dbdriver.DbDriver'
FLAGS.num_networks = 5
path = os.path.abspath(os.path.join(os.path.dirname(__file__),
'..',
'nova.sqlite'))
FLAGS.sql_connection = 'sqlite:///%s' % path
from nova.tests import fake_flags
action = argv[1]
if action in ['add', 'del', 'old']:
mac = argv[2]

View File

@@ -45,6 +45,10 @@ from nova.compute import api as compute_api
FLAGS = flags.FLAGS
flags.DEFINE_integer('direct_port', 8001, 'Direct API port')
flags.DEFINE_string('direct_host', '0.0.0.0', 'Direct API host')
flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())
if __name__ == '__main__':
utils.default_flagfile()

View File

@@ -84,6 +84,7 @@ from nova import utils
from nova.api.ec2.cloud import ec2_id_to_id
from nova.auth import manager
from nova.cloudpipe import pipelib
from nova.compute import instance_types
from nova.db import migration
FLAGS = flags.FLAGS
@@ -93,6 +94,9 @@ flags.DECLARE('network_size', 'nova.network.manager')
flags.DECLARE('vlan_start', 'nova.network.manager')
flags.DECLARE('vpn_start', 'nova.network.manager')
flags.DECLARE('fixed_range_v6', 'nova.network.manager')
flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())
def param2id(object_id):
@@ -550,7 +554,7 @@ class ServiceCommands(object):
args: [host] [service]"""
ctxt = context.get_admin_context()
now = datetime.datetime.utcnow()
services = db.service_get_all(ctxt)
services = db.service_get_all(ctxt) + db.service_get_all(ctxt, True)
if host:
services = [s for s in services if s['host'] == host]
if service:
@@ -658,6 +662,79 @@ class VolumeCommands(object):
"mountpoint": volume['mountpoint']}})
class InstanceTypeCommands(object):
"""Class for managing instance types / flavors."""
def _print_instance_types(self, n, val):
deleted = ('', ', inactive')[val["deleted"] == 1]
print ("%s: Memory: %sMB, VCPUS: %s, Storage: %sGB, FlavorID: %s, "
"Swap: %sGB, RXTX Quota: %sGB, RXTX Cap: %sMB%s") % (
n, val["memory_mb"], val["vcpus"], val["local_gb"],
val["flavorid"], val["swap"], val["rxtx_quota"],
val["rxtx_cap"], deleted)
def create(self, name, memory, vcpus, local_gb, flavorid,
swap=0, rxtx_quota=0, rxtx_cap=0):
"""Creates instance types / flavors
arguments: name memory vcpus local_gb flavorid [swap] [rxtx_quota]
[rxtx_cap]
"""
try:
instance_types.create(name, memory, vcpus, local_gb,
flavorid, swap, rxtx_quota, rxtx_cap)
except exception.InvalidInputException:
print "Must supply valid parameters to create instance type"
print e
sys.exit(1)
except exception.DBError, e:
print "DB Error: %s" % e
sys.exit(2)
except:
print "Unknown error"
sys.exit(3)
else:
print "%s created" % name
def delete(self, name, purge=None):
"""Marks instance types / flavors as deleted
arguments: name"""
try:
if purge == "--purge":
instance_types.purge(name)
verb = "purged"
else:
instance_types.destroy(name)
verb = "deleted"
except exception.ApiError:
print "Valid instance type name is required"
sys.exit(1)
except exception.DBError, e:
print "DB Error: %s" % e
sys.exit(2)
except:
sys.exit(3)
else:
print "%s %s" % (name, verb)
def list(self, name=None):
"""Lists all active or specific instance types / flavors
arguments: [name]"""
try:
if name == None:
inst_types = instance_types.get_all_types()
elif name == "--all":
inst_types = instance_types.get_all_types(1)
else:
inst_types = instance_types.get_instance_type(name)
except exception.DBError, e:
_db_error(e)
if isinstance(inst_types.values()[0], dict):
for k, v in inst_types.iteritems():
self._print_instance_types(k, v)
else:
self._print_instance_types(name, inst_types)
CATEGORIES = [
('user', UserCommands),
('project', ProjectCommands),
@@ -670,7 +747,9 @@ CATEGORIES = [
('service', ServiceCommands),
('log', LogCommands),
('db', DbCommands),
('volume', VolumeCommands)]
('volume', VolumeCommands),
('instance_type', InstanceTypeCommands),
('flavor', InstanceTypeCommands)]
def lazy_match(name, key_value_tuples):

View File

@@ -23,6 +23,8 @@ import base64
import boto
import boto.exception
import httplib
import re
import string
from boto.ec2.regioninfo import RegionInfo
@@ -165,19 +167,20 @@ class HostInfo(object):
**Fields Include**
* Disk stats
* Running Instances
* Memory stats
* CPU stats
* Network address info
* Firewall info
* Bridge and devices
* Hostname
* Compute service status
* Volume service status
* Instance count
* Volume count
"""
def __init__(self, connection=None):
self.connection = connection
self.hostname = None
self.compute = None
self.volume = None
self.instance_count = 0
self.volume_count = 0
def __repr__(self):
return 'Host:%s' % self.hostname
@@ -188,7 +191,39 @@ class HostInfo(object):
# this is needed by the sax parser, so ignore the ugly name
def endElement(self, name, value, connection):
setattr(self, name, value)
fixed_name = string.lower(re.sub(r'([A-Z])', r'_\1', name))
setattr(self, fixed_name, value)
class Vpn(object):
"""
Information about a Vpn, as parsed through SAX
**Fields Include**
* instance_id
* project_id
* public_ip
* public_port
* created_at
* internal_ip
* state
"""
def __init__(self, connection=None):
self.connection = connection
self.instance_id = None
self.project_id = None
def __repr__(self):
return 'Vpn:%s:%s' % (self.project_id, self.instance_id)
def startElement(self, name, attrs, connection):
return None
def endElement(self, name, value, connection):
fixed_name = string.lower(re.sub(r'([A-Z])', r'_\1', name))
setattr(self, fixed_name, value)
class InstanceType(object):
@@ -422,6 +457,16 @@ class NovaAdminClient(object):
zip = self.apiconn.get_object('GenerateX509ForUser', params, UserInfo)
return zip.file
def start_vpn(self, project):
"""
Starts the vpn for a user
"""
return self.apiconn.get_object('StartVpn', {'Project': project}, Vpn)
def get_vpns(self):
"""Return a list of vpn with project name"""
return self.apiconn.get_list('DescribeVpns', {}, [('item', Vpn)])
def get_hosts(self):
return self.apiconn.get_list('DescribeHosts', {}, [('item', HostInfo)])

View File

@@ -1,99 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Wrapper for API service, makes it look more like the non-WSGI services"""
from nova import flags
from nova import log as logging
from nova import version
from nova import wsgi
LOG = logging.getLogger('nova.api')
FLAGS = flags.FLAGS
flags.DEFINE_string('ec2_listen', "0.0.0.0",
'IP address for EC2 API to listen')
flags.DEFINE_integer('ec2_listen_port', 8773, 'port for ec2 api to listen')
flags.DEFINE_string('osapi_listen', "0.0.0.0",
'IP address for OpenStack API to listen')
flags.DEFINE_integer('osapi_listen_port', 8774, 'port for os api to listen')
API_ENDPOINTS = ['ec2', 'osapi']
def _run_app(paste_config_file):
LOG.debug(_("Using paste.deploy config at: %s"), paste_config_file)
apps = []
for api in API_ENDPOINTS:
config = wsgi.load_paste_configuration(paste_config_file, api)
if config is None:
LOG.debug(_("No paste configuration for app: %s"), api)
continue
LOG.debug(_("App Config: %(api)s\n%(config)r") % locals())
LOG.info(_("Running %s API"), api)
app = wsgi.load_paste_app(paste_config_file, api)
apps.append((app, getattr(FLAGS, "%s_listen_port" % api),
getattr(FLAGS, "%s_listen" % api)))
if len(apps) == 0:
LOG.error(_("No known API applications configured in %s."),
paste_config_file)
return
server = wsgi.Server()
for app in apps:
server.start(*app)
return server
class ApiService(object):
"""Base class for workers that run on hosts."""
def __init__(self, conf):
self.conf = conf
self.wsgi_app = None
def start(self):
self.wsgi_app = _run_app(self.conf)
def wait(self):
self.wsgi_app.wait()
@classmethod
def create(cls):
conf = wsgi.paste_config_file('nova-api.conf')
LOG.audit(_("Starting nova-api node (version %s)"),
version.version_string_with_vcs())
service = cls(conf)
return service
def serve(conf):
LOG.audit(_("Starting nova-api node (version %s)"),
version.version_string_with_vcs())
LOG.debug(_("Full set of FLAGS:"))
for flag in FLAGS:
flag_get = FLAGS.get(flag, None)
LOG.debug("%(flag)s : %(flag_get)s" % locals())
service = ApiService(conf)
service.start()
return service

View File

@@ -160,9 +160,45 @@ class StrWrapper(object):
raise KeyError(name)
FLAGS = FlagValues()
gflags.FLAGS = FLAGS
gflags.DEFINE_flag(gflags.HelpFlag(), FLAGS)
# Copied from gflags with small mods to get the naming correct.
# Originally gflags checks for the first module that is not gflags that is
# in the call chain, we want to check for the first module that is not gflags
# and not this module.
def _GetCallingModule():
"""Returns the name of the module that's calling into this module.
We generally use this function to get the name of the module calling a
DEFINE_foo... function.
"""
# Walk down the stack to find the first globals dict that's not ours.
for depth in range(1, sys.getrecursionlimit()):
if not sys._getframe(depth).f_globals is globals():
module_name = __GetModuleName(sys._getframe(depth).f_globals)
if module_name == 'gflags':
continue
if module_name is not None:
return module_name
raise AssertionError("No module was found")
# Copied from gflags because it is a private function
def __GetModuleName(globals_dict):
"""Given a globals dict, returns the name of the module that defines it.
Args:
globals_dict: A dictionary that should correspond to an environment
providing the values of the globals.
Returns:
A string (the name of the module) or None (if the module could not
be identified.
"""
for name, module in sys.modules.iteritems():
if getattr(module, '__dict__', None) is globals_dict:
if name == '__main__':
return sys.argv[0]
return name
return None
def _wrapper(func):
@@ -173,6 +209,11 @@ def _wrapper(func):
return _wrapped
FLAGS = FlagValues()
gflags.FLAGS = FLAGS
gflags._GetCallingModule = _GetCallingModule
DEFINE = _wrapper(gflags.DEFINE)
DEFINE_string = _wrapper(gflags.DEFINE_string)
DEFINE_integer = _wrapper(gflags.DEFINE_integer)
@@ -185,8 +226,6 @@ DEFINE_spaceseplist = _wrapper(gflags.DEFINE_spaceseplist)
DEFINE_multistring = _wrapper(gflags.DEFINE_multistring)
DEFINE_multi_int = _wrapper(gflags.DEFINE_multi_int)
DEFINE_flag = _wrapper(gflags.DEFINE_flag)
HelpFlag = gflags.HelpFlag
HelpshortFlag = gflags.HelpshortFlag
HelpXMLFlag = gflags.HelpXMLFlag
@@ -285,8 +324,9 @@ DEFINE_string('state_path', os.path.join(os.path.dirname(__file__), '../'),
DEFINE_string('logdir', None, 'output to a per-service log file in named '
'directory')
DEFINE_string('sqlite_db', 'nova.sqlite', 'file name for sqlite')
DEFINE_string('sql_connection',
'sqlite:///$state_path/nova.sqlite',
'sqlite:///$state_path/$sqlite_db',
'connection string for sql database')
DEFINE_integer('sql_idle_timeout',
3600,

View File

@@ -54,7 +54,7 @@ flags.DEFINE_string('logging_default_format_string',
'format string to use for log messages without context')
flags.DEFINE_string('logging_debug_format_suffix',
'from %(processName)s (pid=%(process)d) %(funcName)s'
'from (pid=%(process)d) %(funcName)s'
' %(pathname)s:%(lineno)d',
'data to append to log format when level is DEBUG')
@@ -236,16 +236,17 @@ class NovaRootLogger(NovaLogger):
def __init__(self, name, level=NOTSET):
self.logpath = None
self.filelog = None
self.syslog = SysLogHandler(address='/dev/log')
self.streamlog = StreamHandler()
self.syslog = None
NovaLogger.__init__(self, name, level)
def setup_from_flags(self):
"""Setup logger from flags"""
global _filelog
if FLAGS.use_syslog:
self.syslog = SysLogHandler(address='/dev/log')
self.addHandler(self.syslog)
else:
elif self.syslog:
self.removeHandler(self.syslog)
logpath = _get_log_file_path()
if logpath:

View File

@@ -91,18 +91,19 @@ class Consumer(messaging.Consumer):
super(Consumer, self).__init__(*args, **kwargs)
self.failed_connection = False
break
except: # Catching all because carrot sucks
except Exception as e: # Catching all because carrot sucks
fl_host = FLAGS.rabbit_host
fl_port = FLAGS.rabbit_port
fl_intv = FLAGS.rabbit_retry_interval
LOG.exception(_("AMQP server on %(fl_host)s:%(fl_port)d is"
" unreachable. Trying again in %(fl_intv)d seconds.")
LOG.error(_("AMQP server on %(fl_host)s:%(fl_port)d is"
" unreachable: %(e)s. Trying again in %(fl_intv)d"
" seconds.")
% locals())
self.failed_connection = True
if self.failed_connection:
LOG.exception(_("Unable to connect to AMQP server "
"after %d tries. Shutting down."),
FLAGS.rabbit_max_retries)
LOG.error(_("Unable to connect to AMQP server "
"after %d tries. Shutting down."),
FLAGS.rabbit_max_retries)
sys.exit(1)
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
@@ -122,7 +123,7 @@ class Consumer(messaging.Consumer):
LOG.error(_("Reconnected to queue"))
self.failed_connection = False
# NOTE(vish): This is catching all errors because we really don't
# exceptions to be logged 10 times a second if some
# want exceptions to be logged 10 times a second if some
# persistent failure occurs.
except Exception: # pylint: disable-msg=W0703
if not self.failed_connection:

View File

@@ -29,8 +29,8 @@ FLAGS.auth_driver = 'nova.auth.dbdriver.DbDriver'
flags.DECLARE('network_size', 'nova.network.manager')
flags.DECLARE('num_networks', 'nova.network.manager')
flags.DECLARE('fake_network', 'nova.network.manager')
FLAGS.network_size = 16
FLAGS.num_networks = 5
FLAGS.network_size = 8
FLAGS.num_networks = 2
FLAGS.fake_network = True
flags.DECLARE('num_shelves', 'nova.volume.driver')
flags.DECLARE('blades_per_shelf', 'nova.volume.driver')
@@ -39,6 +39,5 @@ FLAGS.num_shelves = 2
FLAGS.blades_per_shelf = 4
FLAGS.iscsi_num_targets = 8
FLAGS.verbose = True
FLAGS.sql_connection = 'sqlite:///nova.sqlite'
FLAGS.sqlite_db = "tests.sqlite"
FLAGS.use_ipv6 = True
FLAGS.logfile = 'tests.log'

View File

@@ -311,4 +311,5 @@ class S3APITestCase(test.TestCase):
self.auth_manager.delete_user('admin')
self.auth_manager.delete_project('admin')
stop_listening = defer.maybeDeferred(self.listening_port.stopListening)
super(S3APITestCase, self).tearDown()
return defer.DeferredList([stop_listening])

View File

@@ -66,6 +66,7 @@ class CloudTestCase(test.TestCase):
# set up services
self.compute = self.start_service('compute')
self.scheduter = self.start_service('scheduler')
self.network = self.start_service('network')
self.manager = manager.AuthManager()
@@ -73,8 +74,12 @@ class CloudTestCase(test.TestCase):
self.project = self.manager.create_project('proj', 'admin', 'proj')
self.context = context.RequestContext(user=self.user,
project=self.project)
host = self.network.get_network_host(self.context.elevated())
def tearDown(self):
network_ref = db.project_get_network(self.context,
self.project.id)
db.network_disassociate(self.context, network_ref['id'])
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
self.compute.kill()
@@ -131,6 +136,22 @@ class CloudTestCase(test.TestCase):
db.instance_destroy(self.context, inst['id'])
db.floating_ip_destroy(self.context, address)
def test_describe_security_groups(self):
"""Makes sure describe_security_groups works and filters results."""
sec = db.security_group_create(self.context,
{'project_id': self.context.project_id,
'name': 'test'})
result = self.cloud.describe_security_groups(self.context)
# NOTE(vish): should have the default group as well
self.assertEqual(len(result['securityGroupInfo']), 2)
result = self.cloud.describe_security_groups(self.context,
group_name=[sec['name']])
self.assertEqual(len(result['securityGroupInfo']), 1)
self.assertEqual(
result['securityGroupInfo'][0]['groupName'],
sec['name'])
db.security_group_destroy(self.context, sec['id'])
def test_describe_volumes(self):
"""Makes sure describe_volumes works and filters results."""
vol1 = db.volume_create(self.context, {})
@@ -201,27 +222,32 @@ class CloudTestCase(test.TestCase):
'instance_type': instance_type,
'max_count': max_count}
rv = self.cloud.run_instances(self.context, **kwargs)
greenthread.sleep(0.3)
instance_id = rv['instancesSet'][0]['instanceId']
output = self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE OUTPUT')
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
rv = self.cloud.terminate_instances(self.context, [instance_id])
greenthread.sleep(0.3)
def test_ajax_console(self):
image_id = FLAGS.default_image
kwargs = {'image_id': image_id}
rv = yield self.cloud.run_instances(self.context, **kwargs)
rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
output = yield self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']),
'http://fakeajaxconsole.com/?token=FAKETOKEN')
greenthread.sleep(0.3)
output = self.cloud.get_ajax_console(context=self.context,
instance_id=[instance_id])
self.assertEquals(output['url'],
'%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url)
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
rv = yield self.cloud.terminate_instances(self.context, [instance_id])
rv = self.cloud.terminate_instances(self.context, [instance_id])
greenthread.sleep(0.3)
def test_key_generation(self):
result = self._create_key('test')
@@ -241,7 +267,7 @@ class CloudTestCase(test.TestCase):
self._create_key('test1')
self._create_key('test2')
result = self.cloud.describe_key_pairs(self.context)
keys = result["keypairsSet"]
keys = result["keySet"]
self.assertTrue(filter(lambda k: k['keyName'] == 'test1', keys))
self.assertTrue(filter(lambda k: k['keyName'] == 'test2', keys))
@@ -284,70 +310,6 @@ class CloudTestCase(test.TestCase):
LOG.debug(_("Terminating instance %s"), instance_id)
rv = self.compute.terminate_instance(instance_id)
def test_describe_instances(self):
"""Makes sure describe_instances works."""
instance1 = db.instance_create(self.context, {'host': 'host2'})
comp1 = db.service_create(self.context, {'host': 'host2',
'availability_zone': 'zone1',
'topic': "compute"})
result = self.cloud.describe_instances(self.context)
self.assertEqual(result['reservationSet'][0]
['instancesSet'][0]
['placement']['availabilityZone'], 'zone1')
db.instance_destroy(self.context, instance1['id'])
db.service_destroy(self.context, comp1['id'])
def test_instance_update_state(self):
# TODO(termie): what is this code even testing?
def instance(num):
return {
'reservation_id': 'r-1',
'instance_id': 'i-%s' % num,
'image_id': 'ami-%s' % num,
'private_dns_name': '10.0.0.%s' % num,
'dns_name': '10.0.0%s' % num,
'ami_launch_index': str(num),
'instance_type': 'fake',
'availability_zone': 'fake',
'key_name': None,
'kernel_id': 'fake',
'ramdisk_id': 'fake',
'groups': ['default'],
'product_codes': None,
'state': 0x01,
'user_data': ''}
rv = self.cloud._format_describe_instances(self.context)
logging.error(str(rv))
self.assertEqual(len(rv['reservationSet']), 0)
# simulate launch of 5 instances
# self.cloud.instances['pending'] = {}
#for i in xrange(5):
# inst = instance(i)
# self.cloud.instances['pending'][inst['instance_id']] = inst
#rv = self.cloud._format_instances(self.admin)
#self.assert_(len(rv['reservationSet']) == 1)
#self.assert_(len(rv['reservationSet'][0]['instances_set']) == 5)
# report 4 nodes each having 1 of the instances
#for i in xrange(4):
# self.cloud.update_state('instances',
# {('node-%s' % i): {('i-%s' % i):
# instance(i)}})
# one instance should be pending still
#self.assert_(len(self.cloud.instances['pending'].keys()) == 1)
# check that the reservations collapse
#rv = self.cloud._format_instances(self.admin)
#self.assert_(len(rv['reservationSet']) == 1)
#self.assert_(len(rv['reservationSet'][0]['instances_set']) == 5)
# check that we can get metadata for each instance
#for i in xrange(4):
# data = self.cloud.get_metadata(instance(i)['private_dns_name'])
# self.assert_(data['meta-data']['ami-id'] == 'ami-%s' % i)
@staticmethod
def _fake_set_image_description(ctxt, image_id, description):
from nova.objectstore import handler

View File

@@ -30,6 +30,7 @@ from nova import log as logging
from nova import test
from nova import utils
from nova.auth import manager
from nova.compute import instance_types
LOG = logging.getLogger('nova.tests.compute')
@@ -56,7 +57,7 @@ class ComputeTestCase(test.TestCase):
self.manager.delete_project(self.project)
super(ComputeTestCase, self).tearDown()
def _create_instance(self):
def _create_instance(self, params={}):
"""Create a test instance"""
inst = {}
inst['image_id'] = 'ami-test'
@@ -67,6 +68,7 @@ class ComputeTestCase(test.TestCase):
inst['instance_type'] = 'm1.tiny'
inst['mac_address'] = utils.generate_mac()
inst['ami_launch_index'] = 0
inst.update(params)
return db.instance_create(self.context, inst)['id']
def _create_group(self):
@@ -266,3 +268,31 @@ class ComputeTestCase(test.TestCase):
self.assertEqual(ret_val, None)
self.compute.terminate_instance(self.context, instance_id)
def test_resize_instance(self):
"""Ensure instance can be migrated/resized"""
instance_id = self._create_instance()
context = self.context.elevated()
self.compute.run_instance(self.context, instance_id)
db.instance_update(self.context, instance_id, {'host': 'foo'})
self.compute.prep_resize(context, instance_id)
migration_ref = db.migration_get_by_instance_and_status(context,
instance_id, 'pre-migrating')
self.compute.resize_instance(context, instance_id,
migration_ref['id'])
self.compute.terminate_instance(context, instance_id)
def test_get_by_flavor_id(self):
type = instance_types.get_by_flavor_id(1)
self.assertEqual(type, 'm1.tiny')
def test_resize_same_source_fails(self):
"""Ensure instance fails to migrate when source and destination are
the same host"""
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.assertRaises(exception.Error, self.compute.prep_resize,
self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
type = instance_types.get_by_flavor_id("1")
self.assertEqual(type, 'm1.tiny')

View File

@@ -52,6 +52,7 @@ class DirectTestCase(test.TestCase):
def tearDown(self):
direct.ROUTES = {}
super(DirectTestCase, self).tearDown()
def test_delegated_auth(self):
req = webob.Request.blank('/fake/context')

View File

@@ -0,0 +1,86 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Ken Pepple
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for instance types code
"""
import time
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import test
from nova import utils
from nova.compute import instance_types
from nova.db.sqlalchemy.session import get_session
from nova.db.sqlalchemy import models
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.tests.compute')
class InstanceTypeTestCase(test.TestCase):
"""Test cases for instance type code"""
def setUp(self):
super(InstanceTypeTestCase, self).setUp()
session = get_session()
max_flavorid = session.query(models.InstanceTypes).\
order_by("flavorid desc").\
first()
self.flavorid = max_flavorid["flavorid"] + 1
self.name = str(int(time.time()))
def test_instance_type_create_then_delete(self):
"""Ensure instance types can be created"""
starting_inst_list = instance_types.get_all_types()
instance_types.create(self.name, 256, 1, 120, self.flavorid)
new = instance_types.get_all_types()
self.assertNotEqual(len(starting_inst_list),
len(new),
'instance type was not created')
instance_types.destroy(self.name)
self.assertEqual(1,
instance_types.get_instance_type(self.name)["deleted"])
self.assertEqual(starting_inst_list, instance_types.get_all_types())
instance_types.purge(self.name)
self.assertEqual(len(starting_inst_list),
len(instance_types.get_all_types()),
'instance type not purged')
def test_get_all_instance_types(self):
"""Ensures that all instance types can be retrieved"""
session = get_session()
total_instance_types = session.query(models.InstanceTypes).\
count()
inst_types = instance_types.get_all_types()
self.assertEqual(total_instance_types, len(inst_types))
def test_invalid_create_args_should_fail(self):
"""Ensures that instance type creation fails with invalid args"""
self.assertRaises(
exception.InvalidInputException,
instance_types.create, self.name, 0, 1, 120, self.flavorid)
self.assertRaises(
exception.InvalidInputException,
instance_types.create, self.name, 256, -1, 120, self.flavorid)
self.assertRaises(
exception.InvalidInputException,
instance_types.create, self.name, 256, 1, "aa", self.flavorid)
def test_non_existant_inst_type_shouldnt_delete(self):
"""Ensures that instance type creation fails with invalid args"""
self.assertRaises(exception.ApiError,
instance_types.destroy, "sfsfsdfdfs")

View File

@@ -42,15 +42,13 @@ class NetworkTestCase(test.TestCase):
# flags in the corresponding section in nova-dhcpbridge
self.flags(connection_type='fake',
fake_call=True,
fake_network=True,
network_size=16,
num_networks=5)
fake_network=True)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('netuser', 'netuser', 'netuser')
self.projects = []
self.network = utils.import_object(FLAGS.network_manager)
self.context = context.RequestContext(project=None, user=self.user)
for i in range(5):
for i in range(FLAGS.num_networks):
name = 'project%s' % i
project = self.manager.create_project(name, 'netuser', name)
self.projects.append(project)
@@ -117,6 +115,9 @@ class NetworkTestCase(test.TestCase):
utils.to_global_ipv6(
network_ref['cidr_v6'],
instance_ref['mac_address']))
self._deallocate_address(0, address)
db.instance_destroy(context.get_admin_context(),
instance_ref['id'])
def test_public_network_association(self):
"""Makes sure that we can allocaate a public ip"""
@@ -192,7 +193,7 @@ class NetworkTestCase(test.TestCase):
first = self._create_address(0)
lease_ip(first)
instance_ids = []
for i in range(1, 5):
for i in range(1, FLAGS.num_networks):
instance_ref = self._create_instance(i, mac=utils.generate_mac())
instance_ids.append(instance_ref['id'])
address = self._create_address(i, instance_ref['id'])

View File

@@ -150,6 +150,7 @@ class SimpleDriverTestCase(test.TestCase):
def tearDown(self):
self.manager.delete_user(self.user)
self.manager.delete_project(self.project)
super(SimpleDriverTestCase, self).tearDown()
def _create_instance(self, **kwargs):
"""Create a test instance"""
@@ -270,6 +271,7 @@ class SimpleDriverTestCase(test.TestCase):
self.scheduler.driver.schedule_run_instance,
self.context,
instance_id)
db.instance_destroy(self.context, instance_id)
for instance_id in instance_ids1:
compute1.terminate_instance(self.context, instance_id)
for instance_id in instance_ids2:

View File

@@ -204,11 +204,12 @@ class LibvirtConnTestCase(test.TestCase):
conn = libvirt_conn.LibvirtConnection(True)
uri = conn.get_uri()
self.assertEquals(uri, testuri)
db.instance_destroy(user_context, instance_ref['id'])
def tearDown(self):
super(LibvirtConnTestCase, self).tearDown()
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
super(LibvirtConnTestCase, self).tearDown()
class IptablesFirewallTestCase(test.TestCase):
@@ -365,6 +366,7 @@ class IptablesFirewallTestCase(test.TestCase):
'--dports 80:81 -j ACCEPT' % security_group_chain \
in self.out_rules,
"TCP port 80/81 acceptance rule wasn't added")
db.instance_destroy(admin_ctxt, instance_ref['id'])
class NWFilterTestCase(test.TestCase):
@@ -388,6 +390,7 @@ class NWFilterTestCase(test.TestCase):
def tearDown(self):
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
super(NWFilterTestCase, self).tearDown()
def test_cidr_rule_nwfilter_xml(self):
cloud_controller = cloud.CloudController()
@@ -514,3 +517,4 @@ class NWFilterTestCase(test.TestCase):
self.fw.apply_instance_filter(instance)
_ensure_all_called()
self.teardown_security_group()
db.instance_destroy(admin_ctxt, instance_ref['id'])

View File

@@ -31,6 +31,7 @@ from nova.compute import power_state
from nova.virt import xenapi_conn
from nova.virt.xenapi import fake as xenapi_fake
from nova.virt.xenapi import volume_utils
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi.vmops import SimpleDH
from nova.virt.xenapi.vmops import VMOps
from nova.tests.db import fakes as db_fakes
@@ -167,6 +168,7 @@ class XenAPIVMTestCase(test.TestCase):
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
stubs.stubout_get_this_vm_uuid(self.stubs)
stubs.stubout_stream_disk(self.stubs)
stubs.stubout_is_vdi_pv(self.stubs)
self.stubs.Set(VMOps, 'reset_network', reset_network)
glance_stubs.stubout_glance_client(self.stubs,
glance_stubs.FakeGlance)
@@ -231,7 +233,7 @@ class XenAPIVMTestCase(test.TestCase):
vm = vms[0]
# Check that m1.large above turned into the right thing.
instance_type = instance_types.INSTANCE_TYPES['m1.large']
instance_type = db.instance_type_get_by_name(conn, 'm1.large')
mem_kib = long(instance_type['memory_mb']) << 10
mem_bytes = str(mem_kib << 10)
vcpus = instance_type['vcpus']
@@ -283,11 +285,17 @@ class XenAPIVMTestCase(test.TestCase):
def test_spawn_raw_glance(self):
FLAGS.xenapi_image_service = 'glance'
self._test_spawn(1, None, None)
self._test_spawn(glance_stubs.FakeGlance.IMAGE_RAW, None, None)
def test_spawn_vhd_glance(self):
FLAGS.xenapi_image_service = 'glance'
self._test_spawn(glance_stubs.FakeGlance.IMAGE_VHD, None, None)
def test_spawn_glance(self):
FLAGS.xenapi_image_service = 'glance'
self._test_spawn(1, 2, 3)
self._test_spawn(glance_stubs.FakeGlance.IMAGE_MACHINE,
glance_stubs.FakeGlance.IMAGE_KERNEL,
glance_stubs.FakeGlance.IMAGE_RAMDISK)
def tearDown(self):
super(XenAPIVMTestCase, self).tearDown()
@@ -336,3 +344,101 @@ class XenAPIDiffieHellmanTestCase(test.TestCase):
def tearDown(self):
super(XenAPIDiffieHellmanTestCase, self).tearDown()
class XenAPIMigrateInstance(test.TestCase):
"""
Unit test for verifying migration-related actions
"""
def setUp(self):
super(XenAPIMigrateInstance, self).setUp()
self.stubs = stubout.StubOutForTesting()
FLAGS.target_host = '127.0.0.1'
FLAGS.xenapi_connection_url = 'test_url'
FLAGS.xenapi_connection_password = 'test_pass'
db_fakes.stub_out_db_instance_api(self.stubs)
stubs.stub_out_get_target(self.stubs)
xenapi_fake.reset()
self.values = {'name': 1, 'id': 1,
'project_id': 'fake',
'user_id': 'fake',
'image_id': 1,
'kernel_id': 2,
'ramdisk_id': 3,
'instance_type': 'm1.large',
'mac_address': 'aa:bb:cc:dd:ee:ff',
}
stubs.stub_out_migration_methods(self.stubs)
def test_migrate_disk_and_power_off(self):
instance = db.instance_create(self.values)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
conn = xenapi_conn.get_connection(False)
conn.migrate_disk_and_power_off(instance, '127.0.0.1')
def test_attach_disk(self):
instance = db.instance_create(self.values)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
conn = xenapi_conn.get_connection(False)
conn.attach_disk(instance, {'base_copy': 'hurr', 'cow': 'durr'})
class XenAPIDetermineDiskImageTestCase(test.TestCase):
"""
Unit tests for code that detects the ImageType
"""
def setUp(self):
super(XenAPIDetermineDiskImageTestCase, self).setUp()
glance_stubs.stubout_glance_client(self.stubs,
glance_stubs.FakeGlance)
class FakeInstance(object):
pass
self.fake_instance = FakeInstance()
self.fake_instance.id = 42
def assert_disk_type(self, disk_type):
dt = vm_utils.VMHelper.determine_disk_image_type(
self.fake_instance)
self.assertEqual(disk_type, dt)
def test_instance_disk(self):
"""
If a kernel is specified then the image type is DISK (aka machine)
"""
FLAGS.xenapi_image_service = 'objectstore'
self.fake_instance.image_id = glance_stubs.FakeGlance.IMAGE_MACHINE
self.fake_instance.kernel_id = glance_stubs.FakeGlance.IMAGE_KERNEL
self.assert_disk_type(vm_utils.ImageType.DISK)
def test_instance_disk_raw(self):
"""
If the kernel isn't specified, and we're not using Glance, then
DISK_RAW is assumed.
"""
FLAGS.xenapi_image_service = 'objectstore'
self.fake_instance.image_id = glance_stubs.FakeGlance.IMAGE_RAW
self.fake_instance.kernel_id = None
self.assert_disk_type(vm_utils.ImageType.DISK_RAW)
def test_glance_disk_raw(self):
"""
If we're using Glance, then defer to the image_type field, which in
this case will be 'raw'.
"""
FLAGS.xenapi_image_service = 'glance'
self.fake_instance.image_id = glance_stubs.FakeGlance.IMAGE_RAW
self.fake_instance.kernel_id = None
self.assert_disk_type(vm_utils.ImageType.DISK_RAW)
def test_glance_disk_vhd(self):
"""
If we're using Glance, then defer to the image_type field, which in
this case will be 'vhd'.
"""
FLAGS.xenapi_image_service = 'glance'
self.fake_instance.image_id = glance_stubs.FakeGlance.IMAGE_VHD
self.fake_instance.kernel_id = None
self.assert_disk_type(vm_utils.ImageType.DISK_VHD)

View File

@@ -20,6 +20,7 @@ from nova.virt import xenapi_conn
from nova.virt.xenapi import fake
from nova.virt.xenapi import volume_utils
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi import vmops
def stubout_instance_snapshot(stubs):
@@ -27,7 +28,7 @@ def stubout_instance_snapshot(stubs):
def fake_fetch_image(cls, session, instance_id, image, user, project,
type):
# Stubout wait_for_task
def fake_wait_for_task(self, id, task):
def fake_wait_for_task(self, task, id):
class FakeEvent:
def send(self, value):
@@ -130,6 +131,12 @@ def stubout_stream_disk(stubs):
stubs.Set(vm_utils, '_stream_disk', f)
def stubout_is_vdi_pv(stubs):
def f(_1):
return False
stubs.Set(vm_utils, '_is_vdi_pv', f)
class FakeSessionForVMTests(fake.SessionBase):
""" Stubs out a XenAPISession for VM tests """
def __init__(self, uri):
@@ -171,6 +178,12 @@ class FakeSessionForVMTests(fake.SessionBase):
def VM_destroy(self, session_ref, vm_ref):
fake.destroy_vm(vm_ref)
def SR_scan(self, session_ref, sr_ref):
pass
def VDI_set_name_label(self, session_ref, vdi_ref, name_label):
pass
class FakeSessionForVolumeTests(fake.SessionBase):
""" Stubs out a XenAPISession for Volume tests """
@@ -205,3 +218,44 @@ class FakeSessionForVolumeFailedTests(FakeSessionForVolumeTests):
def SR_forget(self, _1, ref):
pass
class FakeSessionForMigrationTests(fake.SessionBase):
"""Stubs out a XenAPISession for Migration tests"""
def __init__(self, uri):
super(FakeSessionForMigrationTests, self).__init__(uri)
def stub_out_migration_methods(stubs):
def fake_get_snapshot(self, instance):
return 'foo', 'bar'
@classmethod
def fake_get_vdi(cls, session, vm_ref):
vdi_ref = fake.create_vdi(name_label='derp', read_only=False,
sr_ref='herp', sharable=False)
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
return vdi_ref, {'uuid': vdi_rec['uuid'], }
def fake_shutdown(self, inst, vm, method='clean'):
pass
@classmethod
def fake_sr(cls, session, *args):
pass
@classmethod
def fake_get_sr_path(cls, *args):
return "fake"
def fake_destroy(*args, **kwargs):
pass
stubs.Set(vmops.VMOps, '_destroy', fake_destroy)
stubs.Set(vm_utils.VMHelper, 'scan_default_sr', fake_sr)
stubs.Set(vm_utils.VMHelper, 'scan_sr', fake_sr)
stubs.Set(vmops.VMOps, '_get_snapshot', fake_get_snapshot)
stubs.Set(vm_utils.VMHelper, 'get_vdi_for_vm_safely', fake_get_vdi)
stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', lambda x, y, z: None)
stubs.Set(vm_utils.VMHelper, 'get_sr_path', fake_get_sr_path)
stubs.Set(vmops.VMOps, '_shutdown', fake_shutdown)

View File

@@ -17,27 +17,245 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Colorizer Code is borrowed from Twisted:
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Unittest runner for Nova.
To run all tests
python run_tests.py
To run a single test:
python run_tests.py test_compute:ComputeTestCase.test_run_terminate
To run a single test module:
python run_tests.py test_compute
or
python run_tests.py api.test_wsgi
"""
import gettext
import os
import unittest
import sys
from nose import config
from nose import result
from nose import core
from nose import result
from nova import log as logging
from nova.tests import fake_flags
class _AnsiColorizer(object):
"""
A colorizer is an object that loosely wraps around a stream, allowing
callers to write text to the stream in a particular color.
Colorizer classes must implement C{supported()} and C{write(text, color)}.
"""
_colors = dict(black=30, red=31, green=32, yellow=33,
blue=34, magenta=35, cyan=36, white=37)
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
except ImportError:
return False
else:
try:
try:
return curses.tigetnum("colors") > 2
except curses.error:
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
raise
# guess false in case of error
return False
supported = classmethod(supported)
def write(self, text, color):
"""
Write the given text to the stream in the given color.
@param text: Text to be written to the stream.
@param color: A string label for a color. e.g. 'red', 'white'.
"""
color = self._colors[color]
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
class _Win32Colorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
from win32console import GetStdHandle, STD_OUT_HANDLE, \
FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
FOREGROUND_INTENSITY
red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
FOREGROUND_BLUE, FOREGROUND_INTENSITY)
self.stream = stream
self.screenBuffer = GetStdHandle(STD_OUT_HANDLE)
self._colors = {
'normal': red | green | blue,
'red': red | bold,
'green': green | bold,
'blue': blue | bold,
'yellow': red | green | bold,
'magenta': red | blue | bold,
'cyan': green | blue | bold,
'white': red | green | blue | bold
}
def supported(cls, stream=sys.stdout):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
supported = classmethod(supported)
def write(self, text, color):
color = self._colors[color]
self.screenBuffer.SetConsoleTextAttribute(color)
self.stream.write(text)
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
class _NullColorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
return True
supported = classmethod(supported)
def write(self, text, color):
self.stream.write(text)
class NovaTestResult(result.TextTestResult):
def __init__(self, *args, **kw):
result.TextTestResult.__init__(self, *args, **kw)
self._last_case = None
self.colorizer = None
# NOTE(vish): reset stdout for the terminal check
stdout = sys.stdout
sys.stdout = sys.__stdout__
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
if colorizer.supported():
self.colorizer = colorizer(self.stream)
break
sys.stdout = stdout
def getDescription(self, test):
return str(test)
# NOTE(vish): copied from unittest with edit to add color
def addSuccess(self, test):
unittest.TestResult.addSuccess(self, test)
if self.showAll:
self.colorizer.write("OK", 'green')
self.stream.writeln()
elif self.dots:
self.stream.write('.')
self.stream.flush()
# NOTE(vish): copied from unittest with edit to add color
def addFailure(self, test, err):
unittest.TestResult.addFailure(self, test, err)
if self.showAll:
self.colorizer.write("FAIL", 'red')
self.stream.writeln()
elif self.dots:
self.stream.write('F')
self.stream.flush()
# NOTE(vish): copied from nose with edit to add color
def addError(self, test, err):
"""Overrides normal addError to add support for
errorClasses. If the exception is a registered class, the
error will be added to the list for that class, not errors.
"""
stream = getattr(self, 'stream', None)
ec, ev, tb = err
try:
exc_info = self._exc_info_to_string(err, test)
except TypeError:
# 2.3 compat
exc_info = self._exc_info_to_string(err)
for cls, (storage, label, isfail) in self.errorClasses.items():
if result.isclass(ec) and issubclass(ec, cls):
if isfail:
test.passed = False
storage.append((test, exc_info))
# Might get patched into a streamless result
if stream is not None:
if self.showAll:
message = [label]
detail = result._exception_detail(err[1])
if detail:
message.append(detail)
stream.writeln(": ".join(message))
elif self.dots:
stream.write(label[:1])
return
self.errors.append((test, exc_info))
test.passed = False
if stream is not None:
if self.showAll:
self.colorizer.write("ERROR", 'red')
self.stream.writeln()
elif self.dots:
stream.write('E')
def startTest(self, test):
unittest.TestResult.startTest(self, test)
current_case = test.test.__class__.__name__
@@ -62,12 +280,23 @@ class NovaTestRunner(core.TextTestRunner):
if __name__ == '__main__':
logging.setup()
# If any argument looks like a test name but doesn't have "nova.tests" in
# front of it, automatically add that so we don't have to type as much
argv = []
for x in sys.argv:
if x.startswith('test_'):
argv.append('nova.tests.%s' % x)
else:
argv.append(x)
testdir = os.path.abspath(os.path.join("nova", "tests"))
c = config.Config(stream=sys.stdout,
env=os.environ,
verbosity=3,
workingDir=testdir,
plugins=core.DefaultPluginManager())
runner = NovaTestRunner(stream=c.stream,
verbosity=c.verbosity,
config=c)
sys.exit(not core.run(config=c, testRunner=runner))
sys.exit(not core.run(config=c, testRunner=runner, argv=argv))