Merge trunk

Addressing Sandy's comments
This commit is contained in:
Salvatore Orlando
2011-06-23 15:00:59 +01:00
16 changed files with 704 additions and 136 deletions

View File

@@ -272,7 +272,7 @@ DEFINE_string('aws_access_key_id', 'admin', 'AWS Access ID')
DEFINE_string('aws_secret_access_key', 'admin', 'AWS Access Key')
# NOTE(sirp): my_ip interpolation doesn't work within nested structures
DEFINE_list('glance_api_servers',
['127.0.0.1:9292'],
['%s:9292' % _get_my_ip()],
'list of glance api servers available to nova (host:port)')
DEFINE_integer('s3_port', 3333, 's3 port')
DEFINE_string('s3_host', '$my_ip', 's3 host (for infrastructure)')
@@ -364,7 +364,7 @@ DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager',
'Manager for scheduler')
# The service to use for image search and retrieval
DEFINE_string('image_service', 'nova.image.local.LocalImageService',
DEFINE_string('image_service', 'nova.image.glance.GlanceImageService',
'The service to use for retrieving and searching for images.')
DEFINE_string('host', socket.gethostname(),

View File

@@ -24,6 +24,7 @@ from nova import exception
from nova import flags
from nova import log as logging
from nova import rpc
from nova import utils
from eventlet import greenpool
@@ -106,12 +107,14 @@ def _wrap_method(function, self):
def _process(func, zone):
"""Worker stub for green thread pool. Give the worker
an authenticated nova client and zone info."""
nova = novaclient.OpenStack(zone.username, zone.password, zone.api_url)
nova = novaclient.OpenStack(zone.username, zone.password, None,
zone.api_url)
nova.authenticate()
return func(nova, zone)
def call_zone_method(context, method, errors_to_ignore=None, *args, **kwargs):
def call_zone_method(context, method_name, errors_to_ignore=None,
novaclient_collection_name='zones', *args, **kwargs):
"""Returns a list of (zone, call_result) objects."""
if not isinstance(errors_to_ignore, (list, tuple)):
# This will also handle the default None
@@ -121,7 +124,7 @@ def call_zone_method(context, method, errors_to_ignore=None, *args, **kwargs):
results = []
for zone in db.zone_get_all(context):
try:
nova = novaclient.OpenStack(zone.username, zone.password,
nova = novaclient.OpenStack(zone.username, zone.password, None,
zone.api_url)
nova.authenticate()
except novaclient.exceptions.BadRequest, e:
@@ -131,18 +134,16 @@ def call_zone_method(context, method, errors_to_ignore=None, *args, **kwargs):
#TODO (dabo) - add logic for failure counts per zone,
# with escalation after a given number of failures.
continue
zone_method = getattr(nova.zones, method)
novaclient_collection = getattr(nova, novaclient_collection_name)
collection_method = getattr(novaclient_collection, method_name)
def _error_trap(*args, **kwargs):
try:
return zone_method(*args, **kwargs)
return collection_method(*args, **kwargs)
except Exception as e:
if type(e) in errors_to_ignore:
return None
# TODO (dabo) - want to be able to re-raise here.
# Returning a string now; raising was causing issues.
# raise e
return "ERROR", "%s" % e
raise
res = pool.spawn(_error_trap, *args, **kwargs)
results.append((zone, res))
@@ -201,38 +202,78 @@ class RedirectResult(exception.Error):
class reroute_compute(object):
"""Decorator used to indicate that the method should
delegate the call the child zones if the db query
can't find anything."""
"""
reroute_compute is responsible for trying to lookup a resource in the
current zone and if it's not found there, delegating the call to the
child zones.
Since reroute_compute will be making 'cross-zone' calls, the ID for the
object must come in as a UUID-- if we receive an integer ID, we bail.
The steps involved are:
1. Validate that item_id is UUID like
2. Lookup item by UUID in the zone local database
3. If the item was found, then extract integer ID, and pass that to
the wrapped method. (This ensures that zone-local code can
continue to use integer IDs).
4. If the item was not found, we delgate the call to a child zone
using the UUID.
"""
def __init__(self, method_name):
self.method_name = method_name
def _route_to_child_zones(self, context, collection, item_uuid):
if not FLAGS.enable_zone_routing:
raise exception.InstanceNotFound(instance_id=item_uuid)
zones = db.zone_get_all(context)
if not zones:
raise exception.InstanceNotFound(instance_id=item_uuid)
# Ask the children to provide an answer ...
LOG.debug(_("Asking child zones ..."))
result = self._call_child_zones(zones,
wrap_novaclient_function(_issue_novaclient_command,
collection, self.method_name, item_uuid))
# Scrub the results and raise another exception
# so the API layers can bail out gracefully ...
raise RedirectResult(self.unmarshall_result(result))
def __call__(self, f):
def wrapped_f(*args, **kwargs):
collection, context, item_id = \
collection, context, item_id_or_uuid = \
self.get_collection_context_and_id(args, kwargs)
try:
# Call the original function ...
attempt_reroute = False
if utils.is_uuid_like(item_id_or_uuid):
item_uuid = item_id_or_uuid
try:
instance = db.instance_get_by_uuid(context, item_uuid)
except exception.InstanceNotFound, e:
# NOTE(sirp): since a UUID was passed in, we can attempt
# to reroute to a child zone
attempt_reroute = True
LOG.debug(_("Instance %(item_uuid)s not found "
"locally: '%(e)s'" % locals()))
else:
# NOTE(sirp): since we're not re-routing in this case, and
# we we were passed a UUID, we need to replace that UUID
# with an integer ID in the argument list so that the
# zone-local code can continue to use integer IDs.
item_id = instance['id']
args = list(args) # needs to be mutable to replace
self.replace_uuid_with_id(args, kwargs, item_id)
if attempt_reroute:
return self._route_to_child_zones(context, collection,
item_uuid)
else:
return f(*args, **kwargs)
except exception.InstanceNotFound, e:
LOG.debug(_("Instance %(item_id)s not found "
"locally: '%(e)s'" % locals()))
if not FLAGS.enable_zone_routing:
raise
zones = db.zone_get_all(context)
if not zones:
raise
# Ask the children to provide an answer ...
LOG.debug(_("Asking child zones ..."))
result = self._call_child_zones(zones,
wrap_novaclient_function(_issue_novaclient_command,
collection, self.method_name, item_id))
# Scrub the results and raise another exception
# so the API layers can bail out gracefully ...
raise RedirectResult(self.unmarshall_result(result))
return wrapped_f
def _call_child_zones(self, zones, function):
@@ -251,6 +292,18 @@ class reroute_compute(object):
instance_id = args[2]
return ("servers", context, instance_id)
@staticmethod
def replace_uuid_with_id(args, kwargs, replacement_id):
"""
Extracts the UUID parameter from the arg or kwarg list and replaces
it with an integer ID.
"""
if 'instance_id' in kwargs:
kwargs['instance_id'] = replacement_id
elif len(args) > 1:
args.pop(2)
args.insert(2, replacement_id)
def unmarshall_result(self, zone_responses):
"""Result is a list of responses from each child zone.
Each decorator derivation is responsible to turning this

View File

@@ -39,7 +39,7 @@ flags.DEFINE_integer("max_networks", 1000,
class SimpleScheduler(chance.ChanceScheduler):
"""Implements Naive Scheduler that tries to find least loaded host."""
def schedule_run_instance(self, context, instance_id, *_args, **_kwargs):
def _schedule_instance(self, context, instance_id, *_args, **_kwargs):
"""Picks a host that is up and has the fewest running instances."""
instance_ref = db.instance_get(context, instance_id)
if (instance_ref['availability_zone']
@@ -75,6 +75,12 @@ class SimpleScheduler(chance.ChanceScheduler):
" for this request. Is the appropriate"
" service running?"))
def schedule_run_instance(self, context, instance_id, *_args, **_kwargs):
return self._schedule_instance(context, instance_id, *_args, **_kwargs)
def schedule_start_instance(self, context, instance_id, *_args, **_kwargs):
return self._schedule_instance(context, instance_id, *_args, **_kwargs)
def schedule_create_volume(self, context, volume_id, *_args, **_kwargs):
"""Picks a host that is up and has the fewest volumes."""
volume_ref = db.volume_get(context, volume_id)

View File

@@ -88,9 +88,10 @@ class ZoneAwareScheduler(driver.Scheduler):
instance_properties = request_spec['instance_properties']
name = instance_properties['display_name']
image_id = instance_properties['image_id']
image_ref = instance_properties['image_ref']
meta = instance_properties['metadata']
flavor_id = instance_type['flavorid']
reservation_id = instance_properties['reservation_id']
files = kwargs['injected_files']
ipgroup = None # Not supported in OS API ... yet
@@ -99,18 +100,20 @@ class ZoneAwareScheduler(driver.Scheduler):
child_blob = zone_info['child_blob']
zone = db.zone_get(context, child_zone)
url = zone.api_url
LOG.debug(_("Forwarding instance create call to child zone %(url)s")
LOG.debug(_("Forwarding instance create call to child zone %(url)s"
". ReservationID=%(reservation_id)s")
% locals())
nova = None
try:
nova = novaclient.OpenStack(zone.username, zone.password, url)
nova = novaclient.OpenStack(zone.username, zone.password, None,
url)
nova.authenticate()
except novaclient.exceptions.BadRequest, e:
raise exception.NotAuthorized(_("Bad credentials attempting "
"to talk to zone at %(url)s.") % locals())
nova.servers.create(name, image_id, flavor_id, ipgroup, meta, files,
child_blob)
nova.servers.create(name, image_ref, flavor_id, ipgroup, meta, files,
child_blob, reservation_id=reservation_id)
def _provision_resource_from_blob(self, context, item, instance_id,
request_spec, kwargs):
@@ -182,7 +185,11 @@ class ZoneAwareScheduler(driver.Scheduler):
if not build_plan:
raise driver.NoValidHost(_('No hosts were available'))
for item in build_plan:
for num in xrange(request_spec['num_instances']):
if not build_plan:
break
item = build_plan.pop(0)
self._provision_resource(context, item, instance_id, request_spec,
kwargs)

View File

@@ -89,7 +89,8 @@ class ZoneState(object):
def _call_novaclient(zone):
"""Call novaclient. Broken out for testing purposes."""
client = novaclient.OpenStack(zone.username, zone.password, zone.api_url)
client = novaclient.OpenStack(zone.username, zone.password, None,
zone.api_url)
return client.zones.info()._info

View File

@@ -32,7 +32,7 @@ flags.DECLARE('fake_network', 'nova.network.manager')
FLAGS['network_size'].SetDefault(8)
FLAGS['num_networks'].SetDefault(2)
FLAGS['fake_network'].SetDefault(True)
FLAGS['image_service'].SetDefault('nova.image.local.LocalImageService')
FLAGS['image_service'].SetDefault('nova.image.fake.FakeImageService')
flags.DECLARE('num_shelves', 'nova.volume.driver')
flags.DECLARE('blades_per_shelf', 'nova.volume.driver')
flags.DECLARE('iscsi_num_targets', 'nova.volume.driver')

View File

@@ -133,11 +133,11 @@ class HostFilterTestCase(test.TestCase):
raw = ['or',
['and',
['<', '$compute.host_memory_free', 30],
['<', '$compute.disk_available', 300]
['<', '$compute.disk_available', 300],
],
['and',
['>', '$compute.host_memory_free', 70],
['>', '$compute.disk_available', 700]
['>', '$compute.disk_available', 700],
]
]
cooked = json.dumps(raw)
@@ -183,12 +183,12 @@ class HostFilterTestCase(test.TestCase):
self.assertTrue(hf.filter_hosts(self.zone_manager, json.dumps([])))
self.assertTrue(hf.filter_hosts(self.zone_manager, json.dumps({})))
self.assertTrue(hf.filter_hosts(self.zone_manager, json.dumps(
['not', True, False, True, False]
['not', True, False, True, False],
)))
try:
hf.filter_hosts(self.zone_manager, json.dumps(
'not', True, False, True, False
'not', True, False, True, False,
))
self.fail("Should give KeyError")
except KeyError, e:

View File

@@ -44,7 +44,7 @@ class WeightedSumTestCase(test.TestCase):
hosts = [
FakeHost(1, 512 * MB, 100),
FakeHost(2, 256 * MB, 400),
FakeHost(3, 512 * MB, 100)
FakeHost(3, 512 * MB, 100),
]
weighted_fns = [
@@ -96,7 +96,7 @@ class LeastCostSchedulerTestCase(test.TestCase):
def test_noop_cost_fn(self):
FLAGS.least_cost_scheduler_cost_functions = [
'nova.scheduler.least_cost.noop_cost_fn'
'nova.scheduler.least_cost.noop_cost_fn',
]
FLAGS.noop_cost_fn_weight = 1
@@ -110,7 +110,7 @@ class LeastCostSchedulerTestCase(test.TestCase):
def test_cost_fn_weights(self):
FLAGS.least_cost_scheduler_cost_functions = [
'nova.scheduler.least_cost.noop_cost_fn'
'nova.scheduler.least_cost.noop_cost_fn',
]
FLAGS.noop_cost_fn_weight = 2
@@ -124,7 +124,7 @@ class LeastCostSchedulerTestCase(test.TestCase):
def test_fill_first_cost_fn(self):
FLAGS.least_cost_scheduler_cost_functions = [
'nova.scheduler.least_cost.fill_first_cost_fn'
'nova.scheduler.least_cost.fill_first_cost_fn',
]
FLAGS.fill_first_cost_fn_weight = 1

View File

@@ -201,7 +201,7 @@ class ZoneAwareSchedulerTestCase(test.TestCase):
'instance_properties': {},
'instance_type': {},
'filter_driver': 'nova.scheduler.host_filter.AllHostsFilter',
'blob': "Non-None blob data"
'blob': "Non-None blob data",
}
result = sched.schedule_run_instance(None, 1, request_spec)

View File

@@ -89,7 +89,7 @@ class FakeHttplibConnection(object):
class XmlConversionTestCase(test.TestCase):
"""Unit test api xml conversion"""
def test_number_conversion(self):
conv = apirequest._try_convert
conv = ec2utils._try_convert
self.assertEqual(conv('None'), None)
self.assertEqual(conv('True'), True)
self.assertEqual(conv('False'), False)

View File

@@ -35,7 +35,7 @@ from nova import utils
from nova.auth import manager
from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.image import local
from nova.image import fake
FLAGS = flags.FLAGS
@@ -56,6 +56,7 @@ class CloudTestCase(test.TestCase):
self.compute = self.start_service('compute')
self.scheduter = self.start_service('scheduler')
self.network = self.start_service('network')
self.volume = self.start_service('volume')
self.image_service = utils.import_object(FLAGS.image_service)
self.manager = manager.AuthManager()
@@ -69,8 +70,8 @@ class CloudTestCase(test.TestCase):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine', 'image_state': 'available'}}
self.stubs.Set(local.LocalImageService, 'show', fake_show)
self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
self.stubs.Set(fake._FakeImageService, 'show', fake_show)
self.stubs.Set(fake._FakeImageService, 'show_by_name', fake_show)
# NOTE(vish): set up a manual wait so rpc.cast has a chance to finish
rpc_cast = rpc.cast
@@ -303,7 +304,7 @@ class CloudTestCase(test.TestCase):
def fake_show_none(meh, context, id):
raise exception.ImageNotFound(image_id='bad_image_id')
self.stubs.Set(local.LocalImageService, 'detail', fake_detail)
self.stubs.Set(fake._FakeImageService, 'detail', fake_detail)
# list all
result1 = describe_images(self.context)
result1 = result1['imagesSet'][0]
@@ -317,8 +318,8 @@ class CloudTestCase(test.TestCase):
self.assertEqual(2, len(result3['imagesSet']))
# provide an non-existing image_id
self.stubs.UnsetAll()
self.stubs.Set(local.LocalImageService, 'show', fake_show_none)
self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show_none)
self.stubs.Set(fake._FakeImageService, 'show', fake_show_none)
self.stubs.Set(fake._FakeImageService, 'show_by_name', fake_show_none)
self.assertRaises(exception.ImageNotFound, describe_images,
self.context, ['ami-fake'])
@@ -329,8 +330,8 @@ class CloudTestCase(test.TestCase):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine'}, 'is_public': True}
self.stubs.Set(local.LocalImageService, 'show', fake_show)
self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
self.stubs.Set(fake._FakeImageService, 'show', fake_show)
self.stubs.Set(fake._FakeImageService, 'show_by_name', fake_show)
result = describe_image_attribute(self.context, 'ami-00000001',
'launchPermission')
self.assertEqual([{'group': 'all'}], result['launchPermission'])
@@ -345,9 +346,9 @@ class CloudTestCase(test.TestCase):
def fake_update(meh, context, image_id, metadata, data=None):
return metadata
self.stubs.Set(local.LocalImageService, 'show', fake_show)
self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
self.stubs.Set(local.LocalImageService, 'update', fake_update)
self.stubs.Set(fake._FakeImageService, 'show', fake_show)
self.stubs.Set(fake._FakeImageService, 'show_by_name', fake_show)
self.stubs.Set(fake._FakeImageService, 'update', fake_update)
result = modify_image_attribute(self.context, 'ami-00000001',
'launchPermission', 'add',
user_group=['all'])
@@ -359,7 +360,7 @@ class CloudTestCase(test.TestCase):
def fake_delete(self, context, id):
return None
self.stubs.Set(local.LocalImageService, 'delete', fake_delete)
self.stubs.Set(fake._FakeImageService, 'delete', fake_delete)
# valid image
result = deregister_image(self.context, 'ami-00000001')
self.assertEqual(result['imageId'], 'ami-00000001')
@@ -369,18 +370,25 @@ class CloudTestCase(test.TestCase):
def fake_detail_empty(self, context):
return []
self.stubs.Set(local.LocalImageService, 'detail', fake_detail_empty)
self.stubs.Set(fake._FakeImageService, 'detail', fake_detail_empty)
self.assertRaises(exception.ImageNotFound, deregister_image,
self.context, 'ami-bad001')
def test_console_output(self):
instance_type = FLAGS.default_instance_type
max_count = 1
kwargs = {'image_id': 'ami-1',
'instance_type': instance_type,
'max_count': max_count}
def _run_instance(self, **kwargs):
rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
return instance_id
def _run_instance_wait(self, **kwargs):
ec2_instance_id = self._run_instance(**kwargs)
self._wait_for_running(ec2_instance_id)
return ec2_instance_id
def test_console_output(self):
instance_id = self._run_instance(
image_id='ami-1',
instance_type=FLAGS.default_instance_type,
max_count=1)
output = self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT')
@@ -389,9 +397,7 @@ class CloudTestCase(test.TestCase):
rv = self.cloud.terminate_instances(self.context, [instance_id])
def test_ajax_console(self):
kwargs = {'image_id': 'ami-1'}
rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
instance_id = self._run_instance(image_id='ami-1')
output = self.cloud.get_ajax_console(context=self.context,
instance_id=[instance_id])
self.assertEquals(output['url'],
@@ -457,6 +463,12 @@ class CloudTestCase(test.TestCase):
self.cloud.delete_key_pair(self.context, 'test')
def test_run_instances(self):
# stub out the rpc call
def stub_cast(*args, **kwargs):
pass
self.stubs.Set(rpc, 'cast', stub_cast)
kwargs = {'image_id': FLAGS.default_image,
'instance_type': FLAGS.default_instance_type,
'max_count': 1}
@@ -466,7 +478,7 @@ class CloudTestCase(test.TestCase):
self.assertEqual(instance['imageId'], 'ami-00000001')
self.assertEqual(instance['displayName'], 'Server 1')
self.assertEqual(instance['instanceId'], 'i-00000001')
self.assertEqual(instance['instanceState']['name'], 'networking')
self.assertEqual(instance['instanceState']['name'], 'scheduling')
self.assertEqual(instance['instanceType'], 'm1.small')
def test_run_instances_image_state_none(self):
@@ -480,7 +492,7 @@ class CloudTestCase(test.TestCase):
'type': 'machine'}}
self.stubs.UnsetAll()
self.stubs.Set(local.LocalImageService, 'show', fake_show_no_state)
self.stubs.Set(fake._FakeImageService, 'show', fake_show_no_state)
self.assertRaises(exception.ApiError, run_instances,
self.context, **kwargs)
@@ -495,7 +507,7 @@ class CloudTestCase(test.TestCase):
'type': 'machine', 'image_state': 'decrypting'}}
self.stubs.UnsetAll()
self.stubs.Set(local.LocalImageService, 'show', fake_show_decrypt)
self.stubs.Set(fake._FakeImageService, 'show', fake_show_decrypt)
self.assertRaises(exception.ApiError, run_instances,
self.context, **kwargs)
@@ -509,7 +521,7 @@ class CloudTestCase(test.TestCase):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine'}, 'status': 'active'}
self.stubs.Set(local.LocalImageService, 'show', fake_show_stat_active)
self.stubs.Set(fake._FakeImageService, 'show', fake_show_stat_active)
result = run_instances(self.context, **kwargs)
self.assertEqual(len(result['instancesSet']), 1)
@@ -538,7 +550,9 @@ class CloudTestCase(test.TestCase):
def test_update_of_instance_wont_update_private_fields(self):
inst = db.instance_create(self.context, {})
self.cloud.update_instance(self.context, inst['id'],
ec2_id = ec2utils.id_to_ec2_id(inst['id'])
self.cloud.update_instance(self.context, ec2_id,
display_name='c00l 1m4g3',
mac_address='DE:AD:BE:EF')
inst = db.instance_get(self.context, inst['id'])
self.assertEqual(None, inst['mac_address'])
@@ -561,3 +575,299 @@ class CloudTestCase(test.TestCase):
vol = db.volume_get(self.context, vol['id'])
self.assertEqual(None, vol['mountpoint'])
db.volume_destroy(self.context, vol['id'])
def _restart_compute_service(self, periodic_interval=None):
"""restart compute service. NOTE: fake driver forgets all instances."""
self.compute.kill()
if periodic_interval:
self.compute = self.start_service(
'compute', periodic_interval=periodic_interval)
else:
self.compute = self.start_service('compute')
def _wait_for_state(self, ctxt, instance_id, predicate):
"""Wait for an stopping instance to be a given state"""
id = ec2utils.ec2_id_to_id(instance_id)
while True:
info = self.cloud.compute_api.get(context=ctxt, instance_id=id)
LOG.debug(info)
if predicate(info):
break
greenthread.sleep(1)
def _wait_for_running(self, instance_id):
def is_running(info):
return info['state_description'] == 'running'
self._wait_for_state(self.context, instance_id, is_running)
def _wait_for_stopped(self, instance_id):
def is_stopped(info):
return info['state_description'] == 'stopped'
self._wait_for_state(self.context, instance_id, is_stopped)
def _wait_for_terminate(self, instance_id):
def is_deleted(info):
return info['deleted']
elevated = self.context.elevated(read_deleted=True)
self._wait_for_state(elevated, instance_id, is_deleted)
def test_stop_start_instance(self):
"""Makes sure stop/start instance works"""
# enforce periodic tasks run in short time to avoid wait for 60s.
self._restart_compute_service(periodic_interval=0.3)
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1, }
instance_id = self._run_instance_wait(**kwargs)
# a running instance can't be started. It is just ignored.
result = self.cloud.start_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result)
result = self.cloud.stop_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result)
self._wait_for_stopped(instance_id)
result = self.cloud.start_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result)
self._wait_for_running(instance_id)
result = self.cloud.stop_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result)
self._wait_for_stopped(instance_id)
result = self.cloud.terminate_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result)
self._restart_compute_service()
def _volume_create(self):
kwargs = {'status': 'available',
'host': self.volume.host,
'size': 1,
'attach_status': 'detached', }
return db.volume_create(self.context, kwargs)
def _assert_volume_attached(self, vol, instance_id, mountpoint):
self.assertEqual(vol['instance_id'], instance_id)
self.assertEqual(vol['mountpoint'], mountpoint)
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
def _assert_volume_detached(self, vol):
self.assertEqual(vol['instance_id'], None)
self.assertEqual(vol['mountpoint'], None)
self.assertEqual(vol['status'], "available")
self.assertEqual(vol['attach_status'], "detached")
def test_stop_start_with_volume(self):
"""Make sure run instance with block device mapping works"""
# enforce periodic tasks run in short time to avoid wait for 60s.
self._restart_compute_service(periodic_interval=0.3)
vol1 = self._volume_create()
vol2 = self._volume_create()
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1,
'block_device_mapping': [{'device_name': '/dev/vdb',
'volume_id': vol1['id'],
'delete_on_termination': False, },
{'device_name': '/dev/vdc',
'volume_id': vol2['id'],
'delete_on_termination': True, },
]}
ec2_instance_id = self._run_instance_wait(**kwargs)
instance_id = ec2utils.ec2_id_to_id(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 2)
for vol in vols:
self.assertTrue(vol['id'] == vol1['id'] or vol['id'] == vol2['id'])
vol = db.volume_get(self.context, vol1['id'])
self._assert_volume_attached(vol, instance_id, '/dev/vdb')
vol = db.volume_get(self.context, vol2['id'])
self._assert_volume_attached(vol, instance_id, '/dev/vdc')
result = self.cloud.stop_instances(self.context, [ec2_instance_id])
self.assertTrue(result)
self._wait_for_stopped(ec2_instance_id)
vol = db.volume_get(self.context, vol1['id'])
self._assert_volume_detached(vol)
vol = db.volume_get(self.context, vol2['id'])
self._assert_volume_detached(vol)
self.cloud.start_instances(self.context, [ec2_instance_id])
self._wait_for_running(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 2)
for vol in vols:
self.assertTrue(vol['id'] == vol1['id'] or vol['id'] == vol2['id'])
self.assertTrue(vol['mountpoint'] == '/dev/vdb' or
vol['mountpoint'] == '/dev/vdc')
self.assertEqual(vol['instance_id'], instance_id)
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
self.cloud.terminate_instances(self.context, [ec2_instance_id])
greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=False)
vol = db.volume_get(admin_ctxt, vol1['id'])
self.assertFalse(vol['deleted'])
db.volume_destroy(self.context, vol1['id'])
greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=True)
vol = db.volume_get(admin_ctxt, vol2['id'])
self.assertTrue(vol['deleted'])
self._restart_compute_service()
def test_stop_with_attached_volume(self):
"""Make sure attach info is reflected to block device mapping"""
# enforce periodic tasks run in short time to avoid wait for 60s.
self._restart_compute_service(periodic_interval=0.3)
vol1 = self._volume_create()
vol2 = self._volume_create()
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1,
'block_device_mapping': [{'device_name': '/dev/vdb',
'volume_id': vol1['id'],
'delete_on_termination': True}]}
ec2_instance_id = self._run_instance_wait(**kwargs)
instance_id = ec2utils.ec2_id_to_id(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 1)
for vol in vols:
self.assertEqual(vol['id'], vol1['id'])
self._assert_volume_attached(vol, instance_id, '/dev/vdb')
vol = db.volume_get(self.context, vol2['id'])
self._assert_volume_detached(vol)
self.cloud.compute_api.attach_volume(self.context,
instance_id=instance_id,
volume_id=vol2['id'],
device='/dev/vdc')
greenthread.sleep(0.3)
vol = db.volume_get(self.context, vol2['id'])
self._assert_volume_attached(vol, instance_id, '/dev/vdc')
self.cloud.compute_api.detach_volume(self.context,
volume_id=vol1['id'])
greenthread.sleep(0.3)
vol = db.volume_get(self.context, vol1['id'])
self._assert_volume_detached(vol)
result = self.cloud.stop_instances(self.context, [ec2_instance_id])
self.assertTrue(result)
self._wait_for_stopped(ec2_instance_id)
for vol_id in (vol1['id'], vol2['id']):
vol = db.volume_get(self.context, vol_id)
self._assert_volume_detached(vol)
self.cloud.start_instances(self.context, [ec2_instance_id])
self._wait_for_running(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 1)
for vol in vols:
self.assertEqual(vol['id'], vol2['id'])
self._assert_volume_attached(vol, instance_id, '/dev/vdc')
vol = db.volume_get(self.context, vol1['id'])
self._assert_volume_detached(vol)
self.cloud.terminate_instances(self.context, [ec2_instance_id])
greenthread.sleep(0.3)
for vol_id in (vol1['id'], vol2['id']):
vol = db.volume_get(self.context, vol_id)
self.assertEqual(vol['id'], vol_id)
self._assert_volume_detached(vol)
db.volume_destroy(self.context, vol_id)
self._restart_compute_service()
def _create_snapshot(self, ec2_volume_id):
result = self.cloud.create_snapshot(self.context,
volume_id=ec2_volume_id)
greenthread.sleep(0.3)
return result['snapshotId']
def test_run_with_snapshot(self):
"""Makes sure run/stop/start instance with snapshot works."""
vol = self._volume_create()
ec2_volume_id = ec2utils.id_to_ec2_id(vol['id'], 'vol-%08x')
ec2_snapshot1_id = self._create_snapshot(ec2_volume_id)
snapshot1_id = ec2utils.ec2_id_to_id(ec2_snapshot1_id)
ec2_snapshot2_id = self._create_snapshot(ec2_volume_id)
snapshot2_id = ec2utils.ec2_id_to_id(ec2_snapshot2_id)
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1,
'block_device_mapping': [{'device_name': '/dev/vdb',
'snapshot_id': snapshot1_id,
'delete_on_termination': False, },
{'device_name': '/dev/vdc',
'snapshot_id': snapshot2_id,
'delete_on_termination': True}]}
ec2_instance_id = self._run_instance_wait(**kwargs)
instance_id = ec2utils.ec2_id_to_id(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 2)
vol1_id = None
vol2_id = None
for vol in vols:
snapshot_id = vol['snapshot_id']
if snapshot_id == snapshot1_id:
vol1_id = vol['id']
mountpoint = '/dev/vdb'
elif snapshot_id == snapshot2_id:
vol2_id = vol['id']
mountpoint = '/dev/vdc'
else:
self.fail()
self._assert_volume_attached(vol, instance_id, mountpoint)
self.assertTrue(vol1_id)
self.assertTrue(vol2_id)
self.cloud.terminate_instances(self.context, [ec2_instance_id])
greenthread.sleep(0.3)
self._wait_for_terminate(ec2_instance_id)
greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=False)
vol = db.volume_get(admin_ctxt, vol1_id)
self._assert_volume_detached(vol)
self.assertFalse(vol['deleted'])
db.volume_destroy(self.context, vol1_id)
greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=True)
vol = db.volume_get(admin_ctxt, vol2_id)
self.assertTrue(vol['deleted'])
for snapshot_id in (ec2_snapshot1_id, ec2_snapshot2_id):
self.cloud.delete_snapshot(self.context, snapshot_id)
greenthread.sleep(0.3)
db.volume_destroy(self.context, vol['id'])

View File

@@ -22,21 +22,21 @@ Tests For Compute
import mox
import stubout
from nova.auth import manager
from nova import compute
from nova.compute import instance_types
from nova.compute import manager as compute_manager
from nova.compute import power_state
from nova import context
from nova import db
from nova.db.sqlalchemy import models
from nova import exception
from nova import flags
import nova.image.fake
from nova import log as logging
from nova import rpc
from nova import test
from nova import utils
from nova.auth import manager
from nova.compute import instance_types
from nova.compute import manager as compute_manager
from nova.compute import power_state
from nova.db.sqlalchemy import models
from nova.image import local
LOG = logging.getLogger('nova.tests.compute')
FLAGS = flags.FLAGS
@@ -73,7 +73,7 @@ class ComputeTestCase(test.TestCase):
def fake_show(meh, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1}}
self.stubs.Set(local.LocalImageService, 'show', fake_show)
self.stubs.Set(nova.image.fake._FakeImageService, 'show', fake_show)
def tearDown(self):
self.manager.delete_user(self.user)
@@ -228,6 +228,21 @@ class ComputeTestCase(test.TestCase):
self.assert_(instance_ref['launched_at'] < terminate)
self.assert_(instance_ref['deleted_at'] > terminate)
def test_stop(self):
"""Ensure instance can be stopped"""
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.compute.stop_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
def test_start(self):
"""Ensure instance can be started"""
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.compute.stop_instance(self.context, instance_id)
self.compute.start_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
def test_pause(self):
"""Ensure instance can be paused"""
instance_id = self._create_instance()
@@ -266,6 +281,14 @@ class ComputeTestCase(test.TestCase):
"File Contents")
self.compute.terminate_instance(self.context, instance_id)
def test_agent_update(self):
"""Ensure instance can have its agent updated"""
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.compute.agent_update(self.context, instance_id,
'http://127.0.0.1/agent', '00112233445566778899aabbccddeeff')
self.compute.terminate_instance(self.context, instance_id)
def test_snapshot(self):
"""Ensure instance can be snapshotted"""
instance_id = self._create_instance()

View File

@@ -33,12 +33,12 @@ from nova import utils
from nova.auth import manager
from nova.compute import instance_types
from nova.compute import power_state
from nova import exception
from nova.virt import xenapi_conn
from nova.virt.xenapi import fake as xenapi_fake
from nova.virt.xenapi import volume_utils
from nova.virt.xenapi import vmops
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi.vmops import SimpleDH
from nova.virt.xenapi.vmops import VMOps
from nova.tests.db import fakes as db_fakes
from nova.tests.xenapi import stubs
from nova.tests.glance import stubs as glance_stubs
@@ -84,7 +84,8 @@ class XenAPIVolumeTestCase(test.TestCase):
'ramdisk_id': 3,
'instance_type_id': '3', # m1.large
'mac_address': 'aa:bb:cc:dd:ee:ff',
'os_type': 'linux'}
'os_type': 'linux',
'architecture': 'x86-64'}
def _create_volume(self, size='0'):
"""Create a volume object."""
@@ -190,8 +191,8 @@ class XenAPIVMTestCase(test.TestCase):
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
stubs.stubout_get_this_vm_uuid(self.stubs)
stubs.stubout_stream_disk(self.stubs)
stubs.stubout_determine_is_pv_objectstore(self.stubs)
self.stubs.Set(VMOps, 'reset_network', reset_network)
stubs.stubout_is_vdi_pv(self.stubs)
self.stubs.Set(vmops.VMOps, 'reset_network', reset_network)
stubs.stub_out_vm_methods(self.stubs)
glance_stubs.stubout_glance_client(self.stubs)
fake_utils.stub_out_utils_execute(self.stubs)
@@ -211,7 +212,8 @@ class XenAPIVMTestCase(test.TestCase):
'ramdisk_id': 3,
'instance_type_id': '3', # m1.large
'mac_address': 'aa:bb:cc:dd:ee:ff',
'os_type': 'linux'}
'os_type': 'linux',
'architecture': 'x86-64'}
instance = db.instance_create(self.context, values)
self.conn.spawn(instance)
@@ -228,6 +230,23 @@ class XenAPIVMTestCase(test.TestCase):
instance = self._create_instance()
self.conn.get_diagnostics(instance)
def test_instance_snapshot_fails_with_no_primary_vdi(self):
def create_bad_vbd(vm_ref, vdi_ref):
vbd_rec = {'VM': vm_ref,
'VDI': vdi_ref,
'userdevice': 'fake',
'currently_attached': False}
vbd_ref = xenapi_fake._create_object('VBD', vbd_rec)
xenapi_fake.after_VBD_create(vbd_ref, vbd_rec)
return vbd_ref
self.stubs.Set(xenapi_fake, 'create_vbd', create_bad_vbd)
stubs.stubout_instance_snapshot(self.stubs)
instance = self._create_instance()
name = "MySnapshot"
self.assertRaises(exception.Error, self.conn.snapshot, instance, name)
def test_instance_snapshot(self):
stubs.stubout_instance_snapshot(self.stubs)
instance = self._create_instance()
@@ -331,7 +350,7 @@ class XenAPIVMTestCase(test.TestCase):
def check_vm_params_for_linux(self):
self.assertEquals(self.vm['platform']['nx'], 'false')
self.assertEquals(self.vm['PV_args'], 'clocksource=jiffies')
self.assertEquals(self.vm['PV_args'], '')
self.assertEquals(self.vm['PV_bootloader'], 'pygrub')
# check that these are not set
@@ -364,7 +383,8 @@ class XenAPIVMTestCase(test.TestCase):
def _test_spawn(self, image_ref, kernel_id, ramdisk_id,
instance_type_id="3", os_type="linux",
instance_id=1, check_injection=False):
architecture="x86-64", instance_id=1,
check_injection=False):
stubs.stubout_loopingcall_start(self.stubs)
values = {'id': instance_id,
'project_id': self.project.id,
@@ -374,11 +394,14 @@ class XenAPIVMTestCase(test.TestCase):
'ramdisk_id': ramdisk_id,
'instance_type_id': instance_type_id,
'mac_address': 'aa:bb:cc:dd:ee:ff',
'os_type': os_type}
'os_type': os_type,
'architecture': architecture}
instance = db.instance_create(self.context, values)
self.conn.spawn(instance)
self.create_vm_record(self.conn, os_type, instance_id)
self.check_vm_record(self.conn, check_injection)
self.assertTrue(instance.os_type)
self.assertTrue(instance.architecture)
def test_spawn_not_enough_memory(self):
FLAGS.xenapi_image_service = 'glance'
@@ -396,7 +419,7 @@ class XenAPIVMTestCase(test.TestCase):
stubs.stubout_fetch_image_glance_disk(self.stubs)
self.assertRaises(xenapi_fake.Failure,
self._test_spawn, 1, 2, 3)
# no new VDI should be found
# No additional VDI should be found.
vdi_recs_end = self._list_vdis()
self._check_vdis(vdi_recs_start, vdi_recs_end)
@@ -410,7 +433,7 @@ class XenAPIVMTestCase(test.TestCase):
stubs.stubout_create_vm(self.stubs)
self.assertRaises(xenapi_fake.Failure,
self._test_spawn, 1, 2, 3)
# no new VDI should be found
# No additional VDI should be found.
vdi_recs_end = self._list_vdis()
self._check_vdis(vdi_recs_start, vdi_recs_end)
@@ -431,7 +454,7 @@ class XenAPIVMTestCase(test.TestCase):
def test_spawn_vhd_glance_linux(self):
FLAGS.xenapi_image_service = 'glance'
self._test_spawn(glance_stubs.FakeGlance.IMAGE_VHD, None, None,
os_type="linux")
os_type="linux", architecture="x86-64")
self.check_vm_params_for_linux()
def test_spawn_vhd_glance_swapdisk(self):
@@ -460,7 +483,7 @@ class XenAPIVMTestCase(test.TestCase):
def test_spawn_vhd_glance_windows(self):
FLAGS.xenapi_image_service = 'glance'
self._test_spawn(glance_stubs.FakeGlance.IMAGE_VHD, None, None,
os_type="windows")
os_type="windows", architecture="i386")
self.check_vm_params_for_windows()
def test_spawn_glance(self):
@@ -611,7 +634,8 @@ class XenAPIVMTestCase(test.TestCase):
'ramdisk_id': 3,
'instance_type_id': '3', # m1.large
'mac_address': 'aa:bb:cc:dd:ee:ff',
'os_type': 'linux'}
'os_type': 'linux',
'architecture': 'x86-64'}
instance = db.instance_create(self.context, values)
self.conn.spawn(instance)
return instance
@@ -621,8 +645,8 @@ class XenAPIDiffieHellmanTestCase(test.TestCase):
"""Unit tests for Diffie-Hellman code."""
def setUp(self):
super(XenAPIDiffieHellmanTestCase, self).setUp()
self.alice = SimpleDH()
self.bob = SimpleDH()
self.alice = vmops.SimpleDH()
self.bob = vmops.SimpleDH()
def test_shared(self):
alice_pub = self.alice.get_public()
@@ -686,7 +710,8 @@ class XenAPIMigrateInstance(test.TestCase):
'local_gb': 5,
'instance_type_id': '3', # m1.large
'mac_address': 'aa:bb:cc:dd:ee:ff',
'os_type': 'linux'}
'os_type': 'linux',
'architecture': 'x86-64'}
fake_utils.stub_out_utils_execute(self.stubs)
stubs.stub_out_migration_methods(self.stubs)
@@ -725,6 +750,7 @@ class XenAPIDetermineDiskImageTestCase(test.TestCase):
self.fake_instance = FakeInstance()
self.fake_instance.id = 42
self.fake_instance.os_type = 'linux'
self.fake_instance.architecture = 'x86-64'
def assert_disk_type(self, disk_type):
dt = vm_utils.VMHelper.determine_disk_image_type(
@@ -769,6 +795,28 @@ class XenAPIDetermineDiskImageTestCase(test.TestCase):
self.assert_disk_type(vm_utils.ImageType.DISK_VHD)
class CompareVersionTestCase(test.TestCase):
def test_less_than(self):
"""Test that cmp_version compares a as less than b"""
self.assertTrue(vmops.cmp_version('1.2.3.4', '1.2.3.5') < 0)
def test_greater_than(self):
"""Test that cmp_version compares a as greater than b"""
self.assertTrue(vmops.cmp_version('1.2.3.5', '1.2.3.4') > 0)
def test_equal(self):
"""Test that cmp_version compares a as equal to b"""
self.assertTrue(vmops.cmp_version('1.2.3.4', '1.2.3.4') == 0)
def test_non_lexical(self):
"""Test that cmp_version compares non-lexically"""
self.assertTrue(vmops.cmp_version('1.2.3.10', '1.2.3.4') > 0)
def test_length(self):
"""Test that cmp_version compares by length as last resort"""
self.assertTrue(vmops.cmp_version('1.2.3', '1.2.3.4') < 0)
class FakeXenApi(object):
"""Fake XenApi for testing HostState."""

View File

@@ -92,6 +92,12 @@ def stubout_stream_disk(stubs):
stubs.Set(vm_utils, '_stream_disk', f)
def stubout_is_vdi_pv(stubs):
def f(_1):
return False
stubs.Set(vm_utils, '_is_vdi_pv', f)
def stubout_determine_is_pv_objectstore(stubs):
"""Assumes VMs never have PV kernels"""
@@ -150,7 +156,7 @@ class FakeSessionForVMTests(fake.SessionBase):
super(FakeSessionForVMTests, self).__init__(uri)
def host_call_plugin(self, _1, _2, plugin, method, _5):
# copy_kernel_vdi returns nothing
# If the call is for 'copy_kernel_vdi' return None.
if method == 'copy_kernel_vdi':
return
sr_ref = fake.get_all('SR')[0]