Add support for resizes to resource tracker.

Keep track of additional resources required to resize an instance
to a new host.  Also hold resources for a revert resize to the original
host.

This fixes race conditions where the destination host could become
overscheduled.  (or the source host in the event of a revert)

bug 1065267

Change-Id: Ic565d4e2ab9bee40f25fe9f198e1217cdd92ca1b
This commit is contained in:
Brian Elliott 2012-11-02 19:41:15 +00:00
parent 16266a4afb
commit 5bc0ff6354
7 changed files with 676 additions and 104 deletions

View File

@ -1564,13 +1564,13 @@ class API(base.Base):
task_state=task_states.RESIZE_REVERTING,
expected_task_state=None)
self.db.migration_update(elevated, migration_ref['id'],
{'status': 'reverting'})
self.compute_rpcapi.revert_resize(context,
instance=instance, migration=migration_ref,
host=migration_ref['dest_compute'], reservations=reservations)
self.db.migration_update(elevated, migration_ref['id'],
{'status': 'reverted'})
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.RESIZED])
@ -1588,14 +1588,14 @@ class API(base.Base):
task_state=None,
expected_task_state=None)
self.db.migration_update(elevated, migration_ref['id'],
{'status': 'confirming'})
self.compute_rpcapi.confirm_resize(context,
instance=instance, migration=migration_ref,
host=migration_ref['source_compute'],
reservations=reservations)
self.db.migration_update(elevated, migration_ref['id'],
{'status': 'confirmed'})
@staticmethod
def _resize_quota_delta(context, new_instance_type,
old_instance_type, sense, compare):

View File

@ -29,6 +29,9 @@ COMPUTE_RESOURCE_SEMAPHORE = "compute_resources"
class NopClaim(object):
"""For use with compute drivers that do not support resource tracking"""
def __init__(self, migration=None):
self.migration = migration
@property
def disk_gb(self):
return 0
@ -184,3 +187,35 @@ class Claim(NopClaim):
LOG.info(msg, instance=self.instance)
return can_claim
class ResizeClaim(Claim):
"""Claim used for holding resources for an incoming resize/migration
operation.
"""
def __init__(self, instance, instance_type, tracker):
super(ResizeClaim, self).__init__(instance, tracker)
self.instance_type = instance_type
self.migration = None
@property
def disk_gb(self):
return (self.instance_type['root_gb'] +
self.instance_type['ephemeral_gb'])
@property
def memory_mb(self):
return self.instance_type['memory_mb']
@property
def vcpus(self):
return self.instance_type['vcpus']
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def abort(self):
"""Compute operation requiring claimed resources has failed or
been aborted.
"""
LOG.debug(_("Aborting claim: %s") % self, instance=self.instance)
self.tracker.abort_resize_claim(self.instance['uuid'],
self.instance_type)

View File

@ -1593,6 +1593,9 @@ class ComputeManager(manager.SchedulerDependentManager):
self.driver.confirm_migration(migration, instance,
self._legacy_nw_info(network_info))
rt = self._get_resource_tracker(instance.get('node'))
rt.confirm_resize(context, migration)
self._notify_about_instance_usage(
context, instance, "resize.confirm.end",
network_info=network_info)
@ -1637,6 +1640,9 @@ class ComputeManager(manager.SchedulerDependentManager):
self._terminate_volume_connections(context, instance)
rt = self._get_resource_tracker(instance.get('node'))
rt.revert_resize(context, migration, status='reverted_dest')
self.compute_rpcapi.finish_revert_resize(context, instance,
migration, migration['source_compute'],
reservations)
@ -1707,8 +1713,8 @@ class ComputeManager(manager.SchedulerDependentManager):
vm_state=vm_states.ACTIVE,
task_state=None)
self.db.migration_update(elevated, migration['id'],
{'status': 'reverted'})
rt = self._get_resource_tracker(instance.get('node'))
rt.revert_resize(context, migration)
self._notify_about_instance_usage(
context, instance, "resize.revert.end")
@ -1725,6 +1731,29 @@ class ComputeManager(manager.SchedulerDependentManager):
if reservations:
QUOTAS.rollback(context, reservations)
def _prep_resize(self, context, image, instance, instance_type,
reservations, request_spec, filter_properties):
if not filter_properties:
filter_properties = {}
same_host = instance['host'] == self.host
if same_host and not CONF.allow_resize_to_same_host:
self._set_instance_error_state(context, instance['uuid'])
msg = _('destination same as source!')
raise exception.MigrationError(msg)
limits = filter_properties.get('limits', {})
rt = self._get_resource_tracker(instance.get('node'))
with rt.resize_claim(context, instance, instance_type, limits=limits) \
as claim:
migration_ref = claim.migration
LOG.audit(_('Migrating'), context=context,
instance=instance)
self.compute_rpcapi.resize_instance(context, instance,
migration_ref, image, instance_type, reservations)
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
@reverts_task_state
@wrap_instance_fault
@ -1742,30 +1771,9 @@ class ComputeManager(manager.SchedulerDependentManager):
context, instance, current_period=True)
self._notify_about_instance_usage(
context, instance, "resize.prep.start")
try:
same_host = instance['host'] == self.host
if same_host and not CONF.allow_resize_to_same_host:
self._set_instance_error_state(context, instance['uuid'])
msg = _('destination same as source!')
raise exception.MigrationError(msg)
old_instance_type = instance['instance_type']
migration_ref = self.db.migration_create(context.elevated(),
{'instance_uuid': instance['uuid'],
'source_compute': instance['host'],
'dest_compute': self.host,
'dest_host': self.driver.get_host_ip_addr(),
'old_instance_type_id': old_instance_type['id'],
'new_instance_type_id': instance_type['id'],
'status': 'pre-migrating'})
LOG.audit(_('Migrating'), context=context,
instance=instance)
self.compute_rpcapi.resize_instance(context, instance,
migration_ref, image, instance_type, reservations)
self._prep_resize(context, image, instance, instance_type,
reservations, request_spec, filter_properties)
except Exception:
# try to re-schedule the resize elsewhere:
self._reschedule_resize_or_reraise(context, image, instance,

View File

@ -20,6 +20,8 @@ model.
"""
from nova.compute import claims
from nova.compute import instance_types
from nova.compute import task_states
from nova.compute import vm_states
from nova import config
from nova import context
@ -29,6 +31,7 @@ from nova import flags
from nova import notifications
from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova import utils
@ -62,6 +65,7 @@ class ResourceTracker(object):
self.compute_node = None
self.stats = importutils.import_object(CONF.compute_stats_class)
self.tracked_instances = {}
self.tracked_migrations = {}
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def instance_claim(self, context, instance_ref, limits=None):
@ -110,10 +114,69 @@ class ResourceTracker(object):
else:
raise exception.ComputeResourcesUnavailable()
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def resize_claim(self, context, instance_ref, instance_type, limits=None):
"""Indicate that resources are needed for a resize operation to this
compute host.
:param context: security context
:param instance_ref: instance to reserve resources for
:param instance_type: new instance_type being resized to
:param limits: Dict of oversubscription limits for memory, disk,
and CPUs.
:returns: A Claim ticket representing the reserved resources. This
should be turned into finalize a resource claim or free
resources after the compute operation is finished.
"""
if self.disabled:
# compute_driver doesn't support resource tracking, just
# generate the migration record and continue the resize:
migration_ref = self._create_migration(context, instance_ref,
instance_type)
return claims.NopClaim(migration=migration_ref)
claim = claims.ResizeClaim(instance_ref, instance_type, self)
if claim.test(self.compute_node, limits):
migration_ref = self._create_migration(context, instance_ref,
instance_type)
claim.migration = migration_ref
# Mark the resources in-use for the resize landing on this
# compute host:
self._update_usage_from_migration(self.compute_node, migration_ref)
self._update(context, self.compute_node)
return claim
else:
raise exception.ComputeResourcesUnavailable()
def _create_migration(self, context, instance, instance_type):
"""Create a migration record for the upcoming resize. This should
be done while the COMPUTE_RESOURCES_SEMAPHORE is held so the resource
claim will not be lost if the audit process starts.
"""
# TODO(russellb): no-db-compute: Send the old instance type
# info that is needed via rpc so db access isn't required
# here.
old_instance_type_id = instance['instance_type_id']
old_instance_type = instance_types.get_instance_type(
old_instance_type_id)
return db.migration_create(context.elevated(),
{'instance_uuid': instance['uuid'],
'source_compute': instance['host'],
'dest_compute': self.host,
'dest_host': self.driver.get_host_ip_addr(),
'old_instance_type_id': old_instance_type['id'],
'new_instance_type_id': instance_type['id'],
'status': 'pre-migrating'})
def _set_instance_host(self, context, instance_uuid):
"""Tag the instance as belonging to this host. This should be done
while the COMPUTE_RESOURCES_SEMPAHORE is being held so the resource
claim will not be lost if the audit process starts.
while the COMPUTE_RESOURCES_SEMPAHORE is held so the resource claim
will not be lost if the audit process starts.
"""
values = {'host': self.host, 'launched_on': self.host}
(old_ref, instance_ref) = db.instance_update_and_get_original(context,
@ -131,6 +194,18 @@ class ResourceTracker(object):
ctxt = context.get_admin_context()
self._update(ctxt, self.compute_node)
def abort_resize_claim(self, instance_uuid, instance_type):
"""Remove usage for an incoming migration"""
if instance_uuid in self.tracked_migrations:
migration, itype = self.tracked_migrations.pop(instance_uuid)
if instance_type['id'] == migration['new_instance_type_id']:
self.stats.update_stats_for_migration(itype, sign=-1)
self._update_usage(self.compute_node, itype, sign=-1)
ctxt = context.get_admin_context()
self._update(ctxt, self.compute_node)
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def update_usage(self, context, instance):
"""Update the resource usage and stats after a change in an
@ -139,9 +214,10 @@ class ResourceTracker(object):
if self.disabled:
return
uuid = instance['uuid']
# don't update usage for this instance unless it submitted a resource
# claim first:
uuid = instance['uuid']
if uuid in self.tracked_instances:
self._update_usage_from_instance(self.compute_node, instance)
self._update(context.elevated(), self.compute_node)
@ -159,6 +235,7 @@ class ResourceTracker(object):
declared a need for resources, but not necessarily retrieved them from
the hypervisor layer yet.
"""
LOG.audit(_("Auditing locally available compute resources"))
resources = self.driver.get_available_resource(self.nodename)
if not resources:
# The virt driver does not support this function
@ -177,6 +254,12 @@ class ResourceTracker(object):
# Now calculate usage based on instance utilization:
self._update_usage_from_instances(resources, instances)
# Grab all in-progress migrations:
migrations = db.migration_get_in_progress_by_host(context, self.host)
self._update_usage_from_migrations(resources, migrations)
self._report_final_resource_view(resources)
self._sync_compute_node(context, resources)
@ -260,6 +343,104 @@ class ResourceTracker(object):
self.compute_node['id'], values, prune_stats)
self.compute_node = dict(compute_node)
def confirm_resize(self, context, migration, status='confirmed'):
"""Cleanup usage for a confirmed resize"""
elevated = context.elevated()
db.migration_update(elevated, migration['id'],
{'status': status})
self.update_available_resource(elevated)
def revert_resize(self, context, migration, status='reverted'):
"""Cleanup usage for a reverted resize"""
self.confirm_resize(context, migration, status)
def _update_usage(self, resources, usage, sign=1):
resources['memory_mb_used'] += sign * usage['memory_mb']
resources['local_gb_used'] += sign * usage['root_gb']
resources['local_gb_used'] += sign * usage['ephemeral_gb']
# free ram and disk may be negative, depending on policy:
resources['free_ram_mb'] = (resources['memory_mb'] -
resources['memory_mb_used'])
resources['free_disk_gb'] = (resources['local_gb'] -
resources['local_gb_used'])
resources['running_vms'] = self.stats.num_instances
resources['vcpus_used'] = self.stats.num_vcpus_used
def _update_usage_from_migration(self, resources, migration):
"""Update usage for a single migration. The record may
represent an incoming or outbound migration.
"""
uuid = migration['instance_uuid']
LOG.audit("Updating from migration %s" % uuid)
incoming = (migration['dest_compute'] == self.host)
outbound = (migration['source_compute'] == self.host)
same_host = (incoming and outbound)
instance = self.tracked_instances.get(uuid, None)
itype = None
if same_host:
# same host resize. record usage for whichever instance type the
# instance is *not* in:
if (instance['instance_type_id'] ==
migration['old_instance_type_id']):
itype = migration['new_instance_type_id']
else:
# instance record already has new flavor, hold space for a
# possible revert to the old instance type:
itype = migration['old_instance_type_id']
elif incoming and not instance:
# instance has not yet migrated here:
itype = migration['new_instance_type_id']
elif outbound and not instance:
# instance migrated, but record usage for a possible revert:
itype = migration['old_instance_type_id']
if itype:
instance_type = instance_types.get_instance_type(itype)
self.stats.update_stats_for_migration(instance_type)
self._update_usage(resources, instance_type)
resources['stats'] = self.stats
self.tracked_migrations[uuid] = (migration, instance_type)
def _update_usage_from_migrations(self, resources, migrations):
self.tracked_migrations.clear()
filtered = {}
# do some defensive filtering against bad migrations records in the
# database:
for migration in migrations:
instance = migration['instance']
if not instance:
# migration referencing deleted instance
continue
uuid = instance['uuid']
# skip migration if instance isn't in a resize state:
if not self._instance_in_resize_state(instance):
LOG.warn(_("Instance not resizing, skipping migration."),
instance_uuid=uuid)
continue
# filter to most recently updated migration for each instance:
m = filtered.get(uuid, None)
if not m or migration['updated_at'] >= m['updated_at']:
filtered[uuid] = migration
for migration in filtered.values():
self._update_usage_from_migration(resources, migration)
def _update_usage_from_instance(self, resources, instance):
"""Update usage for a single instance."""
@ -268,7 +449,7 @@ class ResourceTracker(object):
is_deleted_instance = instance['vm_state'] == vm_states.DELETED
if is_new_instance:
self.tracked_instances[uuid] = 1
self.tracked_instances[uuid] = jsonutils.to_primitive(instance)
sign = 1
if is_deleted_instance:
@ -280,18 +461,7 @@ class ResourceTracker(object):
# if it's a new or deleted instance:
if is_new_instance or is_deleted_instance:
# new instance, update compute node resource usage:
resources['memory_mb_used'] += sign * instance['memory_mb']
resources['local_gb_used'] += sign * instance['root_gb']
resources['local_gb_used'] += sign * instance['ephemeral_gb']
# free ram and disk may be negative, depending on policy:
resources['free_ram_mb'] = (resources['memory_mb'] -
resources['memory_mb_used'])
resources['free_disk_gb'] = (resources['local_gb'] -
resources['local_gb_used'])
resources['running_vms'] = self.stats.num_instances
resources['vcpus_used'] = self.stats.num_vcpus_used
self._update_usage(resources, instance, sign=sign)
resources['current_workload'] = self.stats.calculate_workload()
resources['stats'] = self.stats
@ -329,3 +499,17 @@ class ResourceTracker(object):
if missing_keys:
reason = _("Missing keys: %s") % missing_keys
raise exception.InvalidInput(reason=reason)
def _instance_in_resize_state(self, instance):
vm = instance['vm_state']
task = instance['task_state']
if vm == vm_states.RESIZED:
return True
if (vm == vm_states.ACTIVE and task in [task_states.RESIZE_PREP,
task_states.RESIZE_MIGRATING, task_states.RESIZE_MIGRATED,
task_states.RESIZE_FINISH]):
return True
return False

View File

@ -114,6 +114,10 @@ class Stats(dict):
# save updated I/O workload in stats:
self["io_workload"] = self.io_workload
def update_stats_for_migration(self, instance_type, sign=1):
x = self.get("num_vcpus_used", 0)
self["num_vcpus_used"] = x + (sign * instance_type['vcpus'])
def _decrement(self, key):
x = self.get(key, 0)
self[key] = x - 1

View File

@ -26,15 +26,27 @@ from nova import test
LOG = logging.getLogger(__name__)
class DummyTracker(object):
icalled = False
rcalled = False
def abort_instance_claim(self, *args, **kwargs):
self.icalled = True
def abort_resize_claim(self, *args, **kwargs):
self.rcalled = True
class ClaimTestCase(test.TestCase):
def setUp(self):
super(ClaimTestCase, self).setUp()
self.resources = self._fake_resources()
self.tracker = DummyTracker()
def _claim(self, **kwargs):
instance = self._fake_instance(**kwargs)
return claims.Claim(instance, None)
return claims.Claim(instance, self.tracker)
def _fake_instance(self, **kwargs):
instance = {
@ -47,6 +59,18 @@ class ClaimTestCase(test.TestCase):
instance.update(**kwargs)
return instance
def _fake_instance_type(self, **kwargs):
instance_type = {
'id': 1,
'name': 'fakeitype',
'memory_mb': 1,
'vcpus': 1,
'root_gb': 1,
'ephemeral_gb': 2
}
instance_type.update(**kwargs)
return instance_type
def _fake_resources(self, values=None):
resources = {
'memory_mb': 2048,
@ -109,17 +133,30 @@ class ClaimTestCase(test.TestCase):
self.assertFalse(claim.test(self.resources, limits))
def test_abort(self):
instance = self._fake_instance(root_gb=10, ephemeral_gb=40)
claim = self._abort()
self.assertTrue(claim.tracker.icalled)
def fake_abort(self):
self._called = True
self.stubs.Set(claims.Claim, 'abort', fake_abort)
def _abort(self):
claim = None
try:
with claims.Claim(instance, None) as claim:
with self._claim(memory_mb=4096) as claim:
raise test.TestingException("abort")
except test.TestingException:
pass
self.assertTrue(claim._called)
return claim
class ResizeClaimTestCase(ClaimTestCase):
def setUp(self):
super(ResizeClaimTestCase, self).setUp()
self.instance = self._fake_instance()
def _claim(self, **kwargs):
instance_type = self._fake_instance_type(**kwargs)
return claims.ResizeClaim(self.instance, instance_type, self.tracker)
def test_abort(self):
claim = self._abort()
self.assertTrue(claim.tracker.rcalled)

View File

@ -19,6 +19,8 @@
import uuid
from nova.compute import claims
from nova.compute import instance_types
from nova.compute import resource_tracker
from nova.compute import task_states
from nova.compute import vm_states
@ -40,9 +42,13 @@ FAKE_VIRT_VCPUS = 1
class UnsupportedVirtDriver(driver.ComputeDriver):
"""Pretend version of a lame virt driver"""
def __init__(self):
super(UnsupportedVirtDriver, self).__init__(None)
def get_host_ip_addr(self):
return '127.0.0.1'
def get_available_resource(self, nodename):
# no support for getting resource usage info
return {}
@ -59,6 +65,9 @@ class FakeVirtDriver(driver.ComputeDriver):
self.memory_mb_used = 0
self.local_gb_used = 0
def get_host_ip_addr(self):
return '127.0.0.1'
def get_available_resource(self, nodename):
d = {
'vcpus': self.vcpus,
@ -83,13 +92,18 @@ class BaseTestCase(test.TestCase):
self.flags(reserved_host_disk_mb=0,
reserved_host_memory_mb=0)
self.context = context.RequestContext('fake', 'fake')
self.context = context.get_admin_context()
self._instances = {}
self._instance_types = {}
self.stubs.Set(db, 'instance_get_all_by_host_and_node',
lambda c, h, n: self._instances.values())
self._fake_instance_get_all_by_host_and_node)
self.stubs.Set(db, 'instance_update_and_get_original',
self._fake_instance_update_and_get_original)
self.stubs.Set(db, 'instance_type_get', self._fake_instance_type_get)
self.host = 'fakehost'
def _create_compute_node(self, values=None):
compute = {
@ -131,7 +145,7 @@ class BaseTestCase(test.TestCase):
instance_uuid = str(uuid.uuid1())
instance = {
'uuid': instance_uuid,
'vm_state': vm_states.BUILDING,
'vm_state': vm_states.RESIZED,
'task_state': None,
'memory_mb': 2,
'root_gb': 3,
@ -140,12 +154,35 @@ class BaseTestCase(test.TestCase):
'project_id': '123456',
'vcpus': 1,
'host': None,
'instance_type_id': 1,
}
instance.update(kwargs)
self._instances[instance_uuid] = instance
return instance
def _fake_instance_type_create(self, **kwargs):
instance_type = {
'id': 1,
'name': 'fakeitype',
'memory_mb': FAKE_VIRT_MEMORY_MB,
'vcpus': FAKE_VIRT_VCPUS,
'root_gb': FAKE_VIRT_LOCAL_GB / 2,
'ephemeral_gb': FAKE_VIRT_LOCAL_GB / 2,
'flavorid': 'fakeflavor'
}
instance_type.update(**kwargs)
id_ = instance_type['id']
self._instance_types[id_] = instance_type
return instance_type
def _fake_instance_get_all_by_host_and_node(self, context, host, nodename):
return [i for i in self._instances.values() if i['host'] == host]
def _fake_instance_type_get(self, ctxt, id_):
return self._instance_types[id_]
def _fake_instance_update_and_get_original(self, context, instance_uuid,
values):
instance = self._instances[instance_uuid]
@ -154,8 +191,11 @@ class BaseTestCase(test.TestCase):
# only used in the subsequent notification:
return (instance, instance)
def _tracker(self, unsupported=False):
host = "fakehost"
def _tracker(self, host=None, unsupported=False):
if host is None:
host = self.host
node = "fakenode"
if unsupported:
@ -206,6 +246,23 @@ class UnsupportedDriverTestCase(BaseTestCase):
root_gb=10)
self.tracker.update_usage(self.context, instance)
def testDisabledResizeClaim(self):
instance = self._fake_instance()
instance_type = self._fake_instance_type_create()
claim = self.tracker.resize_claim(self.context, instance,
instance_type)
self.assertEqual(0, claim.memory_mb)
self.assertEqual(instance['uuid'], claim.migration['instance_uuid'])
self.assertEqual(instance_type['id'],
claim.migration['new_instance_type_id'])
def testDisabledResizeContextClaim(self):
instance = self._fake_instance()
instance_type = self._fake_instance_type_create()
with self.tracker.resize_claim(self.context, instance, instance_type) \
as claim:
self.assertEqual(0, claim.memory_mb)
class MissingServiceTestCase(BaseTestCase):
def setUp(self):
@ -246,17 +303,39 @@ class MissingComputeNodeTestCase(BaseTestCase):
self.assertFalse(self.tracker.disabled)
class ResourceTestCase(BaseTestCase):
class BaseTrackerTestCase(BaseTestCase):
def setUp(self):
super(ResourceTestCase, self).setUp()
# setup plumbing for a working resource tracker with required
# database models and a compatible compute driver:
super(BaseTrackerTestCase, self).setUp()
self.tracker = self._tracker()
self._migrations = {}
self.stubs.Set(db, 'service_get_all_compute_by_host',
self._fake_service_get_all_compute_by_host)
self.stubs.Set(db, 'compute_node_update',
self._fake_compute_node_update)
self.stubs.Set(db, 'migration_update',
self._fake_migration_update)
self.stubs.Set(db, 'migration_get_in_progress_by_host',
self._fake_migration_get_in_progress_by_host)
self.tracker.update_available_resource(self.context)
self.limits = self._basic_limits()
self.limits = self._limits()
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb')
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb')
self._assert(FAKE_VIRT_VCPUS, 'vcpus')
self._assert(0, 'memory_mb_used')
self._assert(0, 'local_gb_used')
self._assert(0, 'vcpus_used')
self._assert(0, 'running_vms')
self._assert(FAKE_VIRT_MEMORY_MB, 'free_ram_mb')
self._assert(FAKE_VIRT_LOCAL_GB, 'free_disk_gb')
self.assertFalse(self.tracker.disabled)
self.assertEqual(0, self.tracker.compute_node['current_workload'])
def _fake_service_get_all_compute_by_host(self, ctx, host):
self.compute = self._create_compute_node()
@ -271,36 +350,50 @@ class ResourceTestCase(BaseTestCase):
self.compute.update(values)
return self.compute
def _basic_limits(self):
"""Get basic limits, no oversubscription"""
def _fake_migration_get_in_progress_by_host(self, ctxt, host):
status = ['confirmed', 'reverted']
migrations = []
for migration in self._migrations.values():
if migration['status'] in status:
continue
uuid = migration['instance_uuid']
migration['instance'] = self._instances[uuid]
migrations.append(migration)
return migrations
def _fake_migration_update(self, ctxt, migration_id, values):
# cheat and assume there's only 1 migration present
migration = self._migrations.values()[0]
migration.update(values)
return migration
def _limits(self, memory_mb=FAKE_VIRT_MEMORY_MB,
disk_gb=FAKE_VIRT_LOCAL_GB, vcpus=FAKE_VIRT_VCPUS):
"""Create limits dictionary used for oversubscribing resources"""
return {
'memory_mb': FAKE_VIRT_MEMORY_MB * 2,
'disk_gb': FAKE_VIRT_LOCAL_GB,
'vcpu': FAKE_VIRT_VCPUS,
'memory_mb': memory_mb,
'disk_gb': disk_gb,
'vcpu': vcpus
}
def test_update_usage_only_for_tracked(self):
instance = self._fake_instance(memory_mb=3, root_gb=1, ephemeral_gb=1,
task_state=None)
self.tracker.update_usage(self.context, instance)
def _assert(self, value, field, tracker=None):
self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
self.assertEqual(0, self.tracker.compute_node['current_workload'])
if tracker is None:
tracker = self.tracker
claim = self.tracker.instance_claim(self.context, instance,
self.limits)
self.assertNotEqual(0, claim.memory_mb)
self.assertEqual(3, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
if not field in tracker.compute_node:
raise test.TestingException(
"'%(field)s' not in compute node." % locals())
x = tracker.compute_node[field]
# now update should actually take effect
instance['task_state'] = task_states.SCHEDULING
self.tracker.update_usage(self.context, instance)
self.assertEqual(value, x)
self.assertEqual(3, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
self.assertEqual(1, self.tracker.compute_node['current_workload'])
class TrackerTestCase(BaseTrackerTestCase):
def test_free_ram_resource_value(self):
driver = FakeVirtDriver()
@ -316,13 +409,33 @@ class ResourceTestCase(BaseTestCase):
self.assertFalse(self.tracker.disabled)
self.assertTrue(self.updated)
class InstanceClaimTestCase(BaseTrackerTestCase):
def test_update_usage_only_for_tracked(self):
instance = self._fake_instance(memory_mb=3, root_gb=1, ephemeral_gb=1,
task_state=None)
self.tracker.update_usage(self.context, instance)
self._assert(0, 'memory_mb_used')
self._assert(0, 'local_gb_used')
self._assert(0, 'current_workload')
claim = self.tracker.instance_claim(self.context, instance,
self.limits)
self.assertNotEqual(0, claim.memory_mb)
self._assert(3, 'memory_mb_used')
self._assert(2, 'local_gb_used')
# now update should actually take effect
instance['task_state'] = task_states.SCHEDULING
self.tracker.update_usage(self.context, instance)
self._assert(3, 'memory_mb_used')
self._assert(2, 'local_gb_used')
self._assert(1, 'current_workload')
def test_claim_and_audit(self):
self.assertEqual(5, self.tracker.compute_node['memory_mb'])
self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(6, self.tracker.compute_node['local_gb'])
self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
claim_mem = 3
claim_disk = 2
instance = self._fake_instance(memory_mb=claim_mem, root_gb=claim_disk,
@ -356,12 +469,6 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(6 - claim_disk, self.compute['free_disk_gb'])
def test_claim_and_abort(self):
self.assertEqual(5, self.tracker.compute_node['memory_mb'])
self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(6, self.tracker.compute_node['local_gb'])
self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
claim_mem = 3
claim_disk = 2
instance = self._fake_instance(memory_mb=claim_mem,
@ -370,21 +477,17 @@ class ResourceTestCase(BaseTestCase):
self.limits)
self.assertNotEqual(None, claim)
self.assertEqual(5, self.compute["memory_mb"])
self.assertEqual(claim_mem, self.compute["memory_mb_used"])
self.assertEqual(5 - claim_mem, self.compute["free_ram_mb"])
self.assertEqual(6, self.compute["local_gb"])
self.assertEqual(claim_disk, self.compute["local_gb_used"])
self.assertEqual(6 - claim_disk, self.compute["free_disk_gb"])
claim.abort()
self.assertEqual(5, self.compute["memory_mb"])
self.assertEqual(0, self.compute["memory_mb_used"])
self.assertEqual(5, self.compute["free_ram_mb"])
self.assertEqual(6, self.compute["local_gb"])
self.assertEqual(0, self.compute["local_gb_used"])
self.assertEqual(6, self.compute["free_disk_gb"])
@ -452,8 +555,6 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(2, self.compute['local_gb_used'])
def test_update_load_stats_for_instance(self):
self.assertFalse(self.tracker.disabled)
self.assertEqual(0, self.tracker.compute_node['current_workload'])
instance = self._fake_instance(task_state=task_states.SCHEDULING)
with self.tracker.instance_claim(self.context, instance):
@ -495,3 +596,206 @@ class ResourceTestCase(BaseTestCase):
instance['vm_state'] = vm_states.DELETED
self.tracker.update_usage(self.context, instance)
self.assertEqual(1, self.tracker.compute_node['vcpus_used'])
class ResizeClaimTestCase(BaseTrackerTestCase):
def setUp(self):
super(ResizeClaimTestCase, self).setUp()
self.stubs.Set(db, 'migration_create', self._fake_migration_create)
self.instance = self._fake_instance()
self.instance_type = self._fake_instance_type_create()
def _fake_migration_create(self, context, values=None):
instance_uuid = str(uuid.uuid1())
migration = {
'id': 1,
'source_compute': 'host1',
'dest_compute': 'host2',
'dest_host': '127.0.0.1',
'old_instance_type_id': 1,
'new_instance_type_id': 2,
'instance_uuid': instance_uuid,
'status': 'pre-migrating',
'updated_at': timeutils.utcnow()
}
if values:
migration.update(values)
self._migrations[instance_uuid] = migration
return migration
def test_claim(self):
self.tracker.resize_claim(self.context, self.instance,
self.instance_type, self.limits)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used')
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used')
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used')
self.assertEqual(1, len(self.tracker.tracked_migrations))
def test_abort(self):
try:
with self.tracker.resize_claim(self.context, self.instance,
self.instance_type, self.limits):
raise test.TestingException("abort")
except test.TestingException:
pass
self._assert(0, 'memory_mb_used')
self._assert(0, 'local_gb_used')
self._assert(0, 'vcpus_used')
self.assertEqual(0, len(self.tracker.tracked_migrations))
def test_additive_claims(self):
limits = self._limits(FAKE_VIRT_MEMORY_MB * 2, FAKE_VIRT_LOCAL_GB * 2,
FAKE_VIRT_VCPUS * 2)
self.tracker.resize_claim(self.context, self.instance,
self.instance_type, limits)
instance2 = self._fake_instance()
self.tracker.resize_claim(self.context, instance2, self.instance_type,
limits)
self._assert(2 * FAKE_VIRT_MEMORY_MB, 'memory_mb_used')
self._assert(2 * FAKE_VIRT_LOCAL_GB, 'local_gb_used')
self._assert(2 * FAKE_VIRT_VCPUS, 'vcpus_used')
def test_claim_and_audit(self):
self.tracker.resize_claim(self.context, self.instance,
self.instance_type, self.limits)
self.tracker.update_available_resource(self.context)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used')
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used')
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used')
def test_same_host(self):
self.limits['vcpu'] = 3
src_type = self._fake_instance_type_create(id=2, memory_mb=1,
root_gb=1, ephemeral_gb=0, vcpus=1)
dest_type = self._fake_instance_type_create(id=2, memory_mb=2,
root_gb=2, ephemeral_gb=1, vcpus=2)
# make an instance of src_type:
instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=0,
vcpus=1, instance_type_id=2)
self.tracker.instance_claim(self.context, instance, self.limits)
# resize to dest_type:
claim = self.tracker.resize_claim(self.context, self.instance,
dest_type, self.limits)
self._assert(3, 'memory_mb_used')
self._assert(4, 'local_gb_used')
self._assert(3, 'vcpus_used')
self.tracker.update_available_resource(self.context)
claim.abort()
# only the original instance should remain, not the migration:
self._assert(1, 'memory_mb_used')
self._assert(1, 'local_gb_used')
self._assert(1, 'vcpus_used')
self.assertEqual(1, len(self.tracker.tracked_instances))
self.assertEqual(0, len(self.tracker.tracked_migrations))
def test_revert(self):
self.tracker.resize_claim(self.context, self.instance,
self.instance_type, self.limits)
migration, itype = self.tracker.tracked_migrations[
self.instance['uuid']]
self.tracker.revert_resize(self.context, migration)
self.assertEqual(0, len(self.tracker.tracked_instances))
self.assertEqual(0, len(self.tracker.tracked_migrations))
self._assert(0, 'memory_mb_used')
self._assert(0, 'local_gb_used')
self._assert(0, 'vcpus_used')
def test_revert_reserve_source(self):
# if a revert has started at the API and audit runs on
# the source compute before the instance flips back to source,
# resources should still be help at the source based on the
# migration:
dest = "desthost"
dest_tracker = self._tracker(host=dest)
dest_tracker.update_available_resource(self.context)
self.instance = self._fake_instance(memory_mb=FAKE_VIRT_MEMORY_MB,
root_gb=FAKE_VIRT_LOCAL_GB, ephemeral_gb=0,
vcpus=FAKE_VIRT_VCPUS, instance_type_id=1)
values = {'source_compute': self.host, 'dest_compute': dest,
'old_instance_type_id': 1, 'new_instance_type_id': 1,
'status': 'post-migrating',
'instance_uuid': self.instance['uuid']}
migration = self._fake_migration_create(self.context, values)
# attach an instance to the destination host tracker:
dest_tracker.instance_claim(self.context, self.instance)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used',
tracker=dest_tracker)
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used',
tracker=dest_tracker)
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used',
tracker=dest_tracker)
# audit and recheck to confirm migration doesn't get double counted
# on dest:
dest_tracker.update_available_resource(self.context)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used',
tracker=dest_tracker)
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used',
tracker=dest_tracker)
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used',
tracker=dest_tracker)
# apply the migration to the source host tracker:
self.tracker.update_available_resource(self.context)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used')
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used')
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used')
# flag the instance and migration as reverting and re-audit:
self.instance['vm_state'] = vm_states.RESIZED
self.instance['task_state'] = task_states.RESIZE_REVERTING
self.tracker.update_available_resource(self.context)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used')
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used')
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used')
def test_resize_filter(self):
instance = self._fake_instance(vm_state=vm_states.ACTIVE,
task_state=task_states.SUSPENDING)
self.assertFalse(self.tracker._instance_in_resize_state(instance))
instance = self._fake_instance(vm_state=vm_states.RESIZED,
task_state=task_states.SUSPENDING)
self.assertTrue(self.tracker._instance_in_resize_state(instance))
instance = self._fake_instance(vm_state=vm_states.ACTIVE,
task_state=task_states.RESIZE_MIGRATING)
self.assertTrue(self.tracker._instance_in_resize_state(instance))
def test_dupe_filter(self):
self._fake_instance_type_create(id=2, memory_mb=1, root_gb=1,
ephemeral_gb=1, vcpus=1)
instance = self._fake_instance(host=self.host)
values = {'source_compute': self.host, 'dest_compute': self.host,
'instance_uuid': instance['uuid'], 'new_instance_type_id': 2}
self._fake_migration_create(self.context, values)
self._fake_migration_create(self.context, values)
self.tracker.update_available_resource(self.context)
self.assertEqual(1, len(self.tracker.tracked_migrations))