Merge "Add support for resizes to resource tracker."

This commit is contained in:
Jenkins 2012-11-16 17:02:45 +00:00 committed by Gerrit Code Review
commit e3f4d63d74
7 changed files with 676 additions and 104 deletions

View File

@ -1567,13 +1567,13 @@ class API(base.Base):
task_state=task_states.RESIZE_REVERTING,
expected_task_state=None)
self.db.migration_update(elevated, migration_ref['id'],
{'status': 'reverting'})
self.compute_rpcapi.revert_resize(context,
instance=instance, migration=migration_ref,
host=migration_ref['dest_compute'], reservations=reservations)
self.db.migration_update(elevated, migration_ref['id'],
{'status': 'reverted'})
@wrap_check_policy
@check_instance_lock
@check_instance_state(vm_state=[vm_states.RESIZED])
@ -1591,14 +1591,14 @@ class API(base.Base):
task_state=None,
expected_task_state=None)
self.db.migration_update(elevated, migration_ref['id'],
{'status': 'confirming'})
self.compute_rpcapi.confirm_resize(context,
instance=instance, migration=migration_ref,
host=migration_ref['source_compute'],
reservations=reservations)
self.db.migration_update(elevated, migration_ref['id'],
{'status': 'confirmed'})
@staticmethod
def _resize_quota_delta(context, new_instance_type,
old_instance_type, sense, compare):

View File

@ -28,6 +28,9 @@ COMPUTE_RESOURCE_SEMAPHORE = "compute_resources"
class NopClaim(object):
"""For use with compute drivers that do not support resource tracking"""
def __init__(self, migration=None):
self.migration = migration
@property
def disk_gb(self):
return 0
@ -183,3 +186,35 @@ class Claim(NopClaim):
LOG.info(msg, instance=self.instance)
return can_claim
class ResizeClaim(Claim):
"""Claim used for holding resources for an incoming resize/migration
operation.
"""
def __init__(self, instance, instance_type, tracker):
super(ResizeClaim, self).__init__(instance, tracker)
self.instance_type = instance_type
self.migration = None
@property
def disk_gb(self):
return (self.instance_type['root_gb'] +
self.instance_type['ephemeral_gb'])
@property
def memory_mb(self):
return self.instance_type['memory_mb']
@property
def vcpus(self):
return self.instance_type['vcpus']
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def abort(self):
"""Compute operation requiring claimed resources has failed or
been aborted.
"""
LOG.debug(_("Aborting claim: %s") % self, instance=self.instance)
self.tracker.abort_resize_claim(self.instance['uuid'],
self.instance_type)

View File

@ -1596,6 +1596,9 @@ class ComputeManager(manager.SchedulerDependentManager):
self.driver.confirm_migration(migration, instance,
self._legacy_nw_info(network_info))
rt = self._get_resource_tracker(instance.get('node'))
rt.confirm_resize(context, migration)
self._notify_about_instance_usage(
context, instance, "resize.confirm.end",
network_info=network_info)
@ -1640,6 +1643,9 @@ class ComputeManager(manager.SchedulerDependentManager):
self._terminate_volume_connections(context, instance)
rt = self._get_resource_tracker(instance.get('node'))
rt.revert_resize(context, migration, status='reverted_dest')
self.compute_rpcapi.finish_revert_resize(context, instance,
migration, migration['source_compute'],
reservations)
@ -1710,8 +1716,8 @@ class ComputeManager(manager.SchedulerDependentManager):
vm_state=vm_states.ACTIVE,
task_state=None)
self.db.migration_update(elevated, migration['id'],
{'status': 'reverted'})
rt = self._get_resource_tracker(instance.get('node'))
rt.revert_resize(context, migration)
self._notify_about_instance_usage(
context, instance, "resize.revert.end")
@ -1728,6 +1734,29 @@ class ComputeManager(manager.SchedulerDependentManager):
if reservations:
QUOTAS.rollback(context, reservations)
def _prep_resize(self, context, image, instance, instance_type,
reservations, request_spec, filter_properties):
if not filter_properties:
filter_properties = {}
same_host = instance['host'] == self.host
if same_host and not CONF.allow_resize_to_same_host:
self._set_instance_error_state(context, instance['uuid'])
msg = _('destination same as source!')
raise exception.MigrationError(msg)
limits = filter_properties.get('limits', {})
rt = self._get_resource_tracker(instance.get('node'))
with rt.resize_claim(context, instance, instance_type, limits=limits) \
as claim:
migration_ref = claim.migration
LOG.audit(_('Migrating'), context=context,
instance=instance)
self.compute_rpcapi.resize_instance(context, instance,
migration_ref, image, instance_type, reservations)
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
@reverts_task_state
@wrap_instance_fault
@ -1745,30 +1774,9 @@ class ComputeManager(manager.SchedulerDependentManager):
context, instance, current_period=True)
self._notify_about_instance_usage(
context, instance, "resize.prep.start")
try:
same_host = instance['host'] == self.host
if same_host and not CONF.allow_resize_to_same_host:
self._set_instance_error_state(context, instance['uuid'])
msg = _('destination same as source!')
raise exception.MigrationError(msg)
old_instance_type = instance['instance_type']
migration_ref = self.db.migration_create(context.elevated(),
{'instance_uuid': instance['uuid'],
'source_compute': instance['host'],
'dest_compute': self.host,
'dest_host': self.driver.get_host_ip_addr(),
'old_instance_type_id': old_instance_type['id'],
'new_instance_type_id': instance_type['id'],
'status': 'pre-migrating'})
LOG.audit(_('Migrating'), context=context,
instance=instance)
self.compute_rpcapi.resize_instance(context, instance,
migration_ref, image, instance_type, reservations)
self._prep_resize(context, image, instance, instance_type,
reservations, request_spec, filter_properties)
except Exception:
# try to re-schedule the resize elsewhere:
self._reschedule_resize_or_reraise(context, image, instance,

View File

@ -20,6 +20,8 @@ model.
"""
from nova.compute import claims
from nova.compute import instance_types
from nova.compute import task_states
from nova.compute import vm_states
from nova import config
from nova import context
@ -28,6 +30,7 @@ from nova import exception
from nova import notifications
from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
@ -60,6 +63,7 @@ class ResourceTracker(object):
self.compute_node = None
self.stats = importutils.import_object(CONF.compute_stats_class)
self.tracked_instances = {}
self.tracked_migrations = {}
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def instance_claim(self, context, instance_ref, limits=None):
@ -108,10 +112,69 @@ class ResourceTracker(object):
else:
raise exception.ComputeResourcesUnavailable()
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def resize_claim(self, context, instance_ref, instance_type, limits=None):
"""Indicate that resources are needed for a resize operation to this
compute host.
:param context: security context
:param instance_ref: instance to reserve resources for
:param instance_type: new instance_type being resized to
:param limits: Dict of oversubscription limits for memory, disk,
and CPUs.
:returns: A Claim ticket representing the reserved resources. This
should be turned into finalize a resource claim or free
resources after the compute operation is finished.
"""
if self.disabled:
# compute_driver doesn't support resource tracking, just
# generate the migration record and continue the resize:
migration_ref = self._create_migration(context, instance_ref,
instance_type)
return claims.NopClaim(migration=migration_ref)
claim = claims.ResizeClaim(instance_ref, instance_type, self)
if claim.test(self.compute_node, limits):
migration_ref = self._create_migration(context, instance_ref,
instance_type)
claim.migration = migration_ref
# Mark the resources in-use for the resize landing on this
# compute host:
self._update_usage_from_migration(self.compute_node, migration_ref)
self._update(context, self.compute_node)
return claim
else:
raise exception.ComputeResourcesUnavailable()
def _create_migration(self, context, instance, instance_type):
"""Create a migration record for the upcoming resize. This should
be done while the COMPUTE_RESOURCES_SEMAPHORE is held so the resource
claim will not be lost if the audit process starts.
"""
# TODO(russellb): no-db-compute: Send the old instance type
# info that is needed via rpc so db access isn't required
# here.
old_instance_type_id = instance['instance_type_id']
old_instance_type = instance_types.get_instance_type(
old_instance_type_id)
return db.migration_create(context.elevated(),
{'instance_uuid': instance['uuid'],
'source_compute': instance['host'],
'dest_compute': self.host,
'dest_host': self.driver.get_host_ip_addr(),
'old_instance_type_id': old_instance_type['id'],
'new_instance_type_id': instance_type['id'],
'status': 'pre-migrating'})
def _set_instance_host(self, context, instance_uuid):
"""Tag the instance as belonging to this host. This should be done
while the COMPUTE_RESOURCES_SEMPAHORE is being held so the resource
claim will not be lost if the audit process starts.
while the COMPUTE_RESOURCES_SEMPAHORE is held so the resource claim
will not be lost if the audit process starts.
"""
values = {'host': self.host, 'launched_on': self.host}
(old_ref, instance_ref) = db.instance_update_and_get_original(context,
@ -129,6 +192,18 @@ class ResourceTracker(object):
ctxt = context.get_admin_context()
self._update(ctxt, self.compute_node)
def abort_resize_claim(self, instance_uuid, instance_type):
"""Remove usage for an incoming migration"""
if instance_uuid in self.tracked_migrations:
migration, itype = self.tracked_migrations.pop(instance_uuid)
if instance_type['id'] == migration['new_instance_type_id']:
self.stats.update_stats_for_migration(itype, sign=-1)
self._update_usage(self.compute_node, itype, sign=-1)
ctxt = context.get_admin_context()
self._update(ctxt, self.compute_node)
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def update_usage(self, context, instance):
"""Update the resource usage and stats after a change in an
@ -137,9 +212,10 @@ class ResourceTracker(object):
if self.disabled:
return
uuid = instance['uuid']
# don't update usage for this instance unless it submitted a resource
# claim first:
uuid = instance['uuid']
if uuid in self.tracked_instances:
self._update_usage_from_instance(self.compute_node, instance)
self._update(context.elevated(), self.compute_node)
@ -157,6 +233,7 @@ class ResourceTracker(object):
declared a need for resources, but not necessarily retrieved them from
the hypervisor layer yet.
"""
LOG.audit(_("Auditing locally available compute resources"))
resources = self.driver.get_available_resource(self.nodename)
if not resources:
# The virt driver does not support this function
@ -175,6 +252,12 @@ class ResourceTracker(object):
# Now calculate usage based on instance utilization:
self._update_usage_from_instances(resources, instances)
# Grab all in-progress migrations:
migrations = db.migration_get_in_progress_by_host(context, self.host)
self._update_usage_from_migrations(resources, migrations)
self._report_final_resource_view(resources)
self._sync_compute_node(context, resources)
@ -258,6 +341,104 @@ class ResourceTracker(object):
self.compute_node['id'], values, prune_stats)
self.compute_node = dict(compute_node)
def confirm_resize(self, context, migration, status='confirmed'):
"""Cleanup usage for a confirmed resize"""
elevated = context.elevated()
db.migration_update(elevated, migration['id'],
{'status': status})
self.update_available_resource(elevated)
def revert_resize(self, context, migration, status='reverted'):
"""Cleanup usage for a reverted resize"""
self.confirm_resize(context, migration, status)
def _update_usage(self, resources, usage, sign=1):
resources['memory_mb_used'] += sign * usage['memory_mb']
resources['local_gb_used'] += sign * usage['root_gb']
resources['local_gb_used'] += sign * usage['ephemeral_gb']
# free ram and disk may be negative, depending on policy:
resources['free_ram_mb'] = (resources['memory_mb'] -
resources['memory_mb_used'])
resources['free_disk_gb'] = (resources['local_gb'] -
resources['local_gb_used'])
resources['running_vms'] = self.stats.num_instances
resources['vcpus_used'] = self.stats.num_vcpus_used
def _update_usage_from_migration(self, resources, migration):
"""Update usage for a single migration. The record may
represent an incoming or outbound migration.
"""
uuid = migration['instance_uuid']
LOG.audit("Updating from migration %s" % uuid)
incoming = (migration['dest_compute'] == self.host)
outbound = (migration['source_compute'] == self.host)
same_host = (incoming and outbound)
instance = self.tracked_instances.get(uuid, None)
itype = None
if same_host:
# same host resize. record usage for whichever instance type the
# instance is *not* in:
if (instance['instance_type_id'] ==
migration['old_instance_type_id']):
itype = migration['new_instance_type_id']
else:
# instance record already has new flavor, hold space for a
# possible revert to the old instance type:
itype = migration['old_instance_type_id']
elif incoming and not instance:
# instance has not yet migrated here:
itype = migration['new_instance_type_id']
elif outbound and not instance:
# instance migrated, but record usage for a possible revert:
itype = migration['old_instance_type_id']
if itype:
instance_type = instance_types.get_instance_type(itype)
self.stats.update_stats_for_migration(instance_type)
self._update_usage(resources, instance_type)
resources['stats'] = self.stats
self.tracked_migrations[uuid] = (migration, instance_type)
def _update_usage_from_migrations(self, resources, migrations):
self.tracked_migrations.clear()
filtered = {}
# do some defensive filtering against bad migrations records in the
# database:
for migration in migrations:
instance = migration['instance']
if not instance:
# migration referencing deleted instance
continue
uuid = instance['uuid']
# skip migration if instance isn't in a resize state:
if not self._instance_in_resize_state(instance):
LOG.warn(_("Instance not resizing, skipping migration."),
instance_uuid=uuid)
continue
# filter to most recently updated migration for each instance:
m = filtered.get(uuid, None)
if not m or migration['updated_at'] >= m['updated_at']:
filtered[uuid] = migration
for migration in filtered.values():
self._update_usage_from_migration(resources, migration)
def _update_usage_from_instance(self, resources, instance):
"""Update usage for a single instance."""
@ -266,7 +447,7 @@ class ResourceTracker(object):
is_deleted_instance = instance['vm_state'] == vm_states.DELETED
if is_new_instance:
self.tracked_instances[uuid] = 1
self.tracked_instances[uuid] = jsonutils.to_primitive(instance)
sign = 1
if is_deleted_instance:
@ -278,18 +459,7 @@ class ResourceTracker(object):
# if it's a new or deleted instance:
if is_new_instance or is_deleted_instance:
# new instance, update compute node resource usage:
resources['memory_mb_used'] += sign * instance['memory_mb']
resources['local_gb_used'] += sign * instance['root_gb']
resources['local_gb_used'] += sign * instance['ephemeral_gb']
# free ram and disk may be negative, depending on policy:
resources['free_ram_mb'] = (resources['memory_mb'] -
resources['memory_mb_used'])
resources['free_disk_gb'] = (resources['local_gb'] -
resources['local_gb_used'])
resources['running_vms'] = self.stats.num_instances
resources['vcpus_used'] = self.stats.num_vcpus_used
self._update_usage(resources, instance, sign=sign)
resources['current_workload'] = self.stats.calculate_workload()
resources['stats'] = self.stats
@ -327,3 +497,17 @@ class ResourceTracker(object):
if missing_keys:
reason = _("Missing keys: %s") % missing_keys
raise exception.InvalidInput(reason=reason)
def _instance_in_resize_state(self, instance):
vm = instance['vm_state']
task = instance['task_state']
if vm == vm_states.RESIZED:
return True
if (vm == vm_states.ACTIVE and task in [task_states.RESIZE_PREP,
task_states.RESIZE_MIGRATING, task_states.RESIZE_MIGRATED,
task_states.RESIZE_FINISH]):
return True
return False

View File

@ -114,6 +114,10 @@ class Stats(dict):
# save updated I/O workload in stats:
self["io_workload"] = self.io_workload
def update_stats_for_migration(self, instance_type, sign=1):
x = self.get("num_vcpus_used", 0)
self["num_vcpus_used"] = x + (sign * instance_type['vcpus'])
def _decrement(self, key):
x = self.get(key, 0)
self[key] = x - 1

View File

@ -26,15 +26,27 @@ from nova import test
LOG = logging.getLogger(__name__)
class DummyTracker(object):
icalled = False
rcalled = False
def abort_instance_claim(self, *args, **kwargs):
self.icalled = True
def abort_resize_claim(self, *args, **kwargs):
self.rcalled = True
class ClaimTestCase(test.TestCase):
def setUp(self):
super(ClaimTestCase, self).setUp()
self.resources = self._fake_resources()
self.tracker = DummyTracker()
def _claim(self, **kwargs):
instance = self._fake_instance(**kwargs)
return claims.Claim(instance, None)
return claims.Claim(instance, self.tracker)
def _fake_instance(self, **kwargs):
instance = {
@ -47,6 +59,18 @@ class ClaimTestCase(test.TestCase):
instance.update(**kwargs)
return instance
def _fake_instance_type(self, **kwargs):
instance_type = {
'id': 1,
'name': 'fakeitype',
'memory_mb': 1,
'vcpus': 1,
'root_gb': 1,
'ephemeral_gb': 2
}
instance_type.update(**kwargs)
return instance_type
def _fake_resources(self, values=None):
resources = {
'memory_mb': 2048,
@ -109,17 +133,30 @@ class ClaimTestCase(test.TestCase):
self.assertFalse(claim.test(self.resources, limits))
def test_abort(self):
instance = self._fake_instance(root_gb=10, ephemeral_gb=40)
claim = self._abort()
self.assertTrue(claim.tracker.icalled)
def fake_abort(self):
self._called = True
self.stubs.Set(claims.Claim, 'abort', fake_abort)
def _abort(self):
claim = None
try:
with claims.Claim(instance, None) as claim:
with self._claim(memory_mb=4096) as claim:
raise test.TestingException("abort")
except test.TestingException:
pass
self.assertTrue(claim._called)
return claim
class ResizeClaimTestCase(ClaimTestCase):
def setUp(self):
super(ResizeClaimTestCase, self).setUp()
self.instance = self._fake_instance()
def _claim(self, **kwargs):
instance_type = self._fake_instance_type(**kwargs)
return claims.ResizeClaim(self.instance, instance_type, self.tracker)
def test_abort(self):
claim = self._abort()
self.assertTrue(claim.tracker.rcalled)

View File

@ -19,6 +19,8 @@
import uuid
from nova.compute import claims
from nova.compute import instance_types
from nova.compute import resource_tracker
from nova.compute import task_states
from nova.compute import vm_states
@ -40,9 +42,13 @@ FAKE_VIRT_VCPUS = 1
class UnsupportedVirtDriver(driver.ComputeDriver):
"""Pretend version of a lame virt driver"""
def __init__(self):
super(UnsupportedVirtDriver, self).__init__(None)
def get_host_ip_addr(self):
return '127.0.0.1'
def get_available_resource(self, nodename):
# no support for getting resource usage info
return {}
@ -59,6 +65,9 @@ class FakeVirtDriver(driver.ComputeDriver):
self.memory_mb_used = 0
self.local_gb_used = 0
def get_host_ip_addr(self):
return '127.0.0.1'
def get_available_resource(self, nodename):
d = {
'vcpus': self.vcpus,
@ -83,13 +92,18 @@ class BaseTestCase(test.TestCase):
self.flags(reserved_host_disk_mb=0,
reserved_host_memory_mb=0)
self.context = context.RequestContext('fake', 'fake')
self.context = context.get_admin_context()
self._instances = {}
self._instance_types = {}
self.stubs.Set(db, 'instance_get_all_by_host_and_node',
lambda c, h, n: self._instances.values())
self._fake_instance_get_all_by_host_and_node)
self.stubs.Set(db, 'instance_update_and_get_original',
self._fake_instance_update_and_get_original)
self.stubs.Set(db, 'instance_type_get', self._fake_instance_type_get)
self.host = 'fakehost'
def _create_compute_node(self, values=None):
compute = {
@ -131,7 +145,7 @@ class BaseTestCase(test.TestCase):
instance_uuid = str(uuid.uuid1())
instance = {
'uuid': instance_uuid,
'vm_state': vm_states.BUILDING,
'vm_state': vm_states.RESIZED,
'task_state': None,
'memory_mb': 2,
'root_gb': 3,
@ -140,12 +154,35 @@ class BaseTestCase(test.TestCase):
'project_id': '123456',
'vcpus': 1,
'host': None,
'instance_type_id': 1,
}
instance.update(kwargs)
self._instances[instance_uuid] = instance
return instance
def _fake_instance_type_create(self, **kwargs):
instance_type = {
'id': 1,
'name': 'fakeitype',
'memory_mb': FAKE_VIRT_MEMORY_MB,
'vcpus': FAKE_VIRT_VCPUS,
'root_gb': FAKE_VIRT_LOCAL_GB / 2,
'ephemeral_gb': FAKE_VIRT_LOCAL_GB / 2,
'flavorid': 'fakeflavor'
}
instance_type.update(**kwargs)
id_ = instance_type['id']
self._instance_types[id_] = instance_type
return instance_type
def _fake_instance_get_all_by_host_and_node(self, context, host, nodename):
return [i for i in self._instances.values() if i['host'] == host]
def _fake_instance_type_get(self, ctxt, id_):
return self._instance_types[id_]
def _fake_instance_update_and_get_original(self, context, instance_uuid,
values):
instance = self._instances[instance_uuid]
@ -154,8 +191,11 @@ class BaseTestCase(test.TestCase):
# only used in the subsequent notification:
return (instance, instance)
def _tracker(self, unsupported=False):
host = "fakehost"
def _tracker(self, host=None, unsupported=False):
if host is None:
host = self.host
node = "fakenode"
if unsupported:
@ -206,6 +246,23 @@ class UnsupportedDriverTestCase(BaseTestCase):
root_gb=10)
self.tracker.update_usage(self.context, instance)
def testDisabledResizeClaim(self):
instance = self._fake_instance()
instance_type = self._fake_instance_type_create()
claim = self.tracker.resize_claim(self.context, instance,
instance_type)
self.assertEqual(0, claim.memory_mb)
self.assertEqual(instance['uuid'], claim.migration['instance_uuid'])
self.assertEqual(instance_type['id'],
claim.migration['new_instance_type_id'])
def testDisabledResizeContextClaim(self):
instance = self._fake_instance()
instance_type = self._fake_instance_type_create()
with self.tracker.resize_claim(self.context, instance, instance_type) \
as claim:
self.assertEqual(0, claim.memory_mb)
class MissingServiceTestCase(BaseTestCase):
def setUp(self):
@ -246,17 +303,39 @@ class MissingComputeNodeTestCase(BaseTestCase):
self.assertFalse(self.tracker.disabled)
class ResourceTestCase(BaseTestCase):
class BaseTrackerTestCase(BaseTestCase):
def setUp(self):
super(ResourceTestCase, self).setUp()
# setup plumbing for a working resource tracker with required
# database models and a compatible compute driver:
super(BaseTrackerTestCase, self).setUp()
self.tracker = self._tracker()
self._migrations = {}
self.stubs.Set(db, 'service_get_all_compute_by_host',
self._fake_service_get_all_compute_by_host)
self.stubs.Set(db, 'compute_node_update',
self._fake_compute_node_update)
self.stubs.Set(db, 'migration_update',
self._fake_migration_update)
self.stubs.Set(db, 'migration_get_in_progress_by_host',
self._fake_migration_get_in_progress_by_host)
self.tracker.update_available_resource(self.context)
self.limits = self._basic_limits()
self.limits = self._limits()
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb')
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb')
self._assert(FAKE_VIRT_VCPUS, 'vcpus')
self._assert(0, 'memory_mb_used')
self._assert(0, 'local_gb_used')
self._assert(0, 'vcpus_used')
self._assert(0, 'running_vms')
self._assert(FAKE_VIRT_MEMORY_MB, 'free_ram_mb')
self._assert(FAKE_VIRT_LOCAL_GB, 'free_disk_gb')
self.assertFalse(self.tracker.disabled)
self.assertEqual(0, self.tracker.compute_node['current_workload'])
def _fake_service_get_all_compute_by_host(self, ctx, host):
self.compute = self._create_compute_node()
@ -271,36 +350,50 @@ class ResourceTestCase(BaseTestCase):
self.compute.update(values)
return self.compute
def _basic_limits(self):
"""Get basic limits, no oversubscription"""
def _fake_migration_get_in_progress_by_host(self, ctxt, host):
status = ['confirmed', 'reverted']
migrations = []
for migration in self._migrations.values():
if migration['status'] in status:
continue
uuid = migration['instance_uuid']
migration['instance'] = self._instances[uuid]
migrations.append(migration)
return migrations
def _fake_migration_update(self, ctxt, migration_id, values):
# cheat and assume there's only 1 migration present
migration = self._migrations.values()[0]
migration.update(values)
return migration
def _limits(self, memory_mb=FAKE_VIRT_MEMORY_MB,
disk_gb=FAKE_VIRT_LOCAL_GB, vcpus=FAKE_VIRT_VCPUS):
"""Create limits dictionary used for oversubscribing resources"""
return {
'memory_mb': FAKE_VIRT_MEMORY_MB * 2,
'disk_gb': FAKE_VIRT_LOCAL_GB,
'vcpu': FAKE_VIRT_VCPUS,
'memory_mb': memory_mb,
'disk_gb': disk_gb,
'vcpu': vcpus
}
def test_update_usage_only_for_tracked(self):
instance = self._fake_instance(memory_mb=3, root_gb=1, ephemeral_gb=1,
task_state=None)
self.tracker.update_usage(self.context, instance)
def _assert(self, value, field, tracker=None):
self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
self.assertEqual(0, self.tracker.compute_node['current_workload'])
if tracker is None:
tracker = self.tracker
claim = self.tracker.instance_claim(self.context, instance,
self.limits)
self.assertNotEqual(0, claim.memory_mb)
self.assertEqual(3, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
if not field in tracker.compute_node:
raise test.TestingException(
"'%(field)s' not in compute node." % locals())
x = tracker.compute_node[field]
# now update should actually take effect
instance['task_state'] = task_states.SCHEDULING
self.tracker.update_usage(self.context, instance)
self.assertEqual(value, x)
self.assertEqual(3, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
self.assertEqual(1, self.tracker.compute_node['current_workload'])
class TrackerTestCase(BaseTrackerTestCase):
def test_free_ram_resource_value(self):
driver = FakeVirtDriver()
@ -316,13 +409,33 @@ class ResourceTestCase(BaseTestCase):
self.assertFalse(self.tracker.disabled)
self.assertTrue(self.updated)
class InstanceClaimTestCase(BaseTrackerTestCase):
def test_update_usage_only_for_tracked(self):
instance = self._fake_instance(memory_mb=3, root_gb=1, ephemeral_gb=1,
task_state=None)
self.tracker.update_usage(self.context, instance)
self._assert(0, 'memory_mb_used')
self._assert(0, 'local_gb_used')
self._assert(0, 'current_workload')
claim = self.tracker.instance_claim(self.context, instance,
self.limits)
self.assertNotEqual(0, claim.memory_mb)
self._assert(3, 'memory_mb_used')
self._assert(2, 'local_gb_used')
# now update should actually take effect
instance['task_state'] = task_states.SCHEDULING
self.tracker.update_usage(self.context, instance)
self._assert(3, 'memory_mb_used')
self._assert(2, 'local_gb_used')
self._assert(1, 'current_workload')
def test_claim_and_audit(self):
self.assertEqual(5, self.tracker.compute_node['memory_mb'])
self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(6, self.tracker.compute_node['local_gb'])
self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
claim_mem = 3
claim_disk = 2
instance = self._fake_instance(memory_mb=claim_mem, root_gb=claim_disk,
@ -356,12 +469,6 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(6 - claim_disk, self.compute['free_disk_gb'])
def test_claim_and_abort(self):
self.assertEqual(5, self.tracker.compute_node['memory_mb'])
self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(6, self.tracker.compute_node['local_gb'])
self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
claim_mem = 3
claim_disk = 2
instance = self._fake_instance(memory_mb=claim_mem,
@ -370,21 +477,17 @@ class ResourceTestCase(BaseTestCase):
self.limits)
self.assertNotEqual(None, claim)
self.assertEqual(5, self.compute["memory_mb"])
self.assertEqual(claim_mem, self.compute["memory_mb_used"])
self.assertEqual(5 - claim_mem, self.compute["free_ram_mb"])
self.assertEqual(6, self.compute["local_gb"])
self.assertEqual(claim_disk, self.compute["local_gb_used"])
self.assertEqual(6 - claim_disk, self.compute["free_disk_gb"])
claim.abort()
self.assertEqual(5, self.compute["memory_mb"])
self.assertEqual(0, self.compute["memory_mb_used"])
self.assertEqual(5, self.compute["free_ram_mb"])
self.assertEqual(6, self.compute["local_gb"])
self.assertEqual(0, self.compute["local_gb_used"])
self.assertEqual(6, self.compute["free_disk_gb"])
@ -452,8 +555,6 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(2, self.compute['local_gb_used'])
def test_update_load_stats_for_instance(self):
self.assertFalse(self.tracker.disabled)
self.assertEqual(0, self.tracker.compute_node['current_workload'])
instance = self._fake_instance(task_state=task_states.SCHEDULING)
with self.tracker.instance_claim(self.context, instance):
@ -495,3 +596,206 @@ class ResourceTestCase(BaseTestCase):
instance['vm_state'] = vm_states.DELETED
self.tracker.update_usage(self.context, instance)
self.assertEqual(1, self.tracker.compute_node['vcpus_used'])
class ResizeClaimTestCase(BaseTrackerTestCase):
def setUp(self):
super(ResizeClaimTestCase, self).setUp()
self.stubs.Set(db, 'migration_create', self._fake_migration_create)
self.instance = self._fake_instance()
self.instance_type = self._fake_instance_type_create()
def _fake_migration_create(self, context, values=None):
instance_uuid = str(uuid.uuid1())
migration = {
'id': 1,
'source_compute': 'host1',
'dest_compute': 'host2',
'dest_host': '127.0.0.1',
'old_instance_type_id': 1,
'new_instance_type_id': 2,
'instance_uuid': instance_uuid,
'status': 'pre-migrating',
'updated_at': timeutils.utcnow()
}
if values:
migration.update(values)
self._migrations[instance_uuid] = migration
return migration
def test_claim(self):
self.tracker.resize_claim(self.context, self.instance,
self.instance_type, self.limits)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used')
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used')
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used')
self.assertEqual(1, len(self.tracker.tracked_migrations))
def test_abort(self):
try:
with self.tracker.resize_claim(self.context, self.instance,
self.instance_type, self.limits):
raise test.TestingException("abort")
except test.TestingException:
pass
self._assert(0, 'memory_mb_used')
self._assert(0, 'local_gb_used')
self._assert(0, 'vcpus_used')
self.assertEqual(0, len(self.tracker.tracked_migrations))
def test_additive_claims(self):
limits = self._limits(FAKE_VIRT_MEMORY_MB * 2, FAKE_VIRT_LOCAL_GB * 2,
FAKE_VIRT_VCPUS * 2)
self.tracker.resize_claim(self.context, self.instance,
self.instance_type, limits)
instance2 = self._fake_instance()
self.tracker.resize_claim(self.context, instance2, self.instance_type,
limits)
self._assert(2 * FAKE_VIRT_MEMORY_MB, 'memory_mb_used')
self._assert(2 * FAKE_VIRT_LOCAL_GB, 'local_gb_used')
self._assert(2 * FAKE_VIRT_VCPUS, 'vcpus_used')
def test_claim_and_audit(self):
self.tracker.resize_claim(self.context, self.instance,
self.instance_type, self.limits)
self.tracker.update_available_resource(self.context)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used')
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used')
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used')
def test_same_host(self):
self.limits['vcpu'] = 3
src_type = self._fake_instance_type_create(id=2, memory_mb=1,
root_gb=1, ephemeral_gb=0, vcpus=1)
dest_type = self._fake_instance_type_create(id=2, memory_mb=2,
root_gb=2, ephemeral_gb=1, vcpus=2)
# make an instance of src_type:
instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=0,
vcpus=1, instance_type_id=2)
self.tracker.instance_claim(self.context, instance, self.limits)
# resize to dest_type:
claim = self.tracker.resize_claim(self.context, self.instance,
dest_type, self.limits)
self._assert(3, 'memory_mb_used')
self._assert(4, 'local_gb_used')
self._assert(3, 'vcpus_used')
self.tracker.update_available_resource(self.context)
claim.abort()
# only the original instance should remain, not the migration:
self._assert(1, 'memory_mb_used')
self._assert(1, 'local_gb_used')
self._assert(1, 'vcpus_used')
self.assertEqual(1, len(self.tracker.tracked_instances))
self.assertEqual(0, len(self.tracker.tracked_migrations))
def test_revert(self):
self.tracker.resize_claim(self.context, self.instance,
self.instance_type, self.limits)
migration, itype = self.tracker.tracked_migrations[
self.instance['uuid']]
self.tracker.revert_resize(self.context, migration)
self.assertEqual(0, len(self.tracker.tracked_instances))
self.assertEqual(0, len(self.tracker.tracked_migrations))
self._assert(0, 'memory_mb_used')
self._assert(0, 'local_gb_used')
self._assert(0, 'vcpus_used')
def test_revert_reserve_source(self):
# if a revert has started at the API and audit runs on
# the source compute before the instance flips back to source,
# resources should still be help at the source based on the
# migration:
dest = "desthost"
dest_tracker = self._tracker(host=dest)
dest_tracker.update_available_resource(self.context)
self.instance = self._fake_instance(memory_mb=FAKE_VIRT_MEMORY_MB,
root_gb=FAKE_VIRT_LOCAL_GB, ephemeral_gb=0,
vcpus=FAKE_VIRT_VCPUS, instance_type_id=1)
values = {'source_compute': self.host, 'dest_compute': dest,
'old_instance_type_id': 1, 'new_instance_type_id': 1,
'status': 'post-migrating',
'instance_uuid': self.instance['uuid']}
migration = self._fake_migration_create(self.context, values)
# attach an instance to the destination host tracker:
dest_tracker.instance_claim(self.context, self.instance)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used',
tracker=dest_tracker)
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used',
tracker=dest_tracker)
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used',
tracker=dest_tracker)
# audit and recheck to confirm migration doesn't get double counted
# on dest:
dest_tracker.update_available_resource(self.context)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used',
tracker=dest_tracker)
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used',
tracker=dest_tracker)
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used',
tracker=dest_tracker)
# apply the migration to the source host tracker:
self.tracker.update_available_resource(self.context)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used')
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used')
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used')
# flag the instance and migration as reverting and re-audit:
self.instance['vm_state'] = vm_states.RESIZED
self.instance['task_state'] = task_states.RESIZE_REVERTING
self.tracker.update_available_resource(self.context)
self._assert(FAKE_VIRT_MEMORY_MB, 'memory_mb_used')
self._assert(FAKE_VIRT_LOCAL_GB, 'local_gb_used')
self._assert(FAKE_VIRT_VCPUS, 'vcpus_used')
def test_resize_filter(self):
instance = self._fake_instance(vm_state=vm_states.ACTIVE,
task_state=task_states.SUSPENDING)
self.assertFalse(self.tracker._instance_in_resize_state(instance))
instance = self._fake_instance(vm_state=vm_states.RESIZED,
task_state=task_states.SUSPENDING)
self.assertTrue(self.tracker._instance_in_resize_state(instance))
instance = self._fake_instance(vm_state=vm_states.ACTIVE,
task_state=task_states.RESIZE_MIGRATING)
self.assertTrue(self.tracker._instance_in_resize_state(instance))
def test_dupe_filter(self):
self._fake_instance_type_create(id=2, memory_mb=1, root_gb=1,
ephemeral_gb=1, vcpus=1)
instance = self._fake_instance(host=self.host)
values = {'source_compute': self.host, 'dest_compute': self.host,
'instance_uuid': instance['uuid'], 'new_instance_type_id': 2}
self._fake_migration_create(self.context, values)
self._fake_migration_create(self.context, values)
self.tracker.update_available_resource(self.context)
self.assertEqual(1, len(self.tracker.tracked_migrations))