Refactor resource tracker claims and test logic.

In preparation for adding resize support to resource tracker:

* Claim class has been factored out of resource tracker.
* Broke out claim testing logic for easier re-use

This patch is pre-work to ease adding resize claims to
resource tracker:

bug 1065267

Change-Id: Ib0b856376417f140dc9ed114913f47a8a790b8a2
This commit is contained in:
Brian Elliott
2012-11-01 20:35:47 +00:00
parent 562b3abc84
commit 6ded00b034
5 changed files with 433 additions and 419 deletions

186
nova/compute/claims.py Normal file
View File

@@ -0,0 +1,186 @@
# Copyright (c) 2012 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Claim objects for use with resource tracking.
"""
from nova import context
from nova.openstack.common import jsonutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
COMPUTE_RESOURCE_SEMAPHORE = "compute_resources"
class NopClaim(object):
"""For use with compute drivers that do not support resource tracking"""
@property
def disk_gb(self):
return 0
@property
def memory_mb(self):
return 0
@property
def vcpus(self):
return 0
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.abort()
def abort(self):
pass
def __str__(self):
return "[Claim: %d MB memory, %d GB disk, %d VCPUS]" % (self.memory_mb,
self.disk_gb, self.vcpus)
class Claim(NopClaim):
"""A declaration that a compute host operation will require free resources.
Claims serve as marker objects that resources are being held until the
update_available_resource audit process runs to do a full reconciliation
of resource usage.
This information will be used to help keep the local compute hosts's
ComputeNode model in sync to aid the scheduler in making efficient / more
correct decisions with respect to host selection.
"""
def __init__(self, instance, tracker):
super(Claim, self).__init__()
self.instance = jsonutils.to_primitive(instance)
self.tracker = tracker
@property
def disk_gb(self):
return self.instance['root_gb'] + self.instance['ephemeral_gb']
@property
def memory_mb(self):
return self.instance['memory_mb']
@property
def vcpus(self):
return self.instance['vcpus']
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def abort(self):
"""Compute operation requiring claimed resources has failed or
been aborted.
"""
LOG.debug(_("Aborting claim: %s") % self, instance=self.instance)
self.tracker.abort_instance_claim(self.instance)
def test(self, resources, limits=None):
"""Test if this claim can be satisfied given available resources and
optional oversubscription limits
This should be called before the compute node actually consumes the
resources required to execute the claim.
:param resources: available local compute node resources
:returns: Return true if resources are available to claim.
"""
if not limits:
limits = {}
# If an individual limit is None, the resource will be considered
# unlimited:
memory_mb_limit = limits.get('memory_mb')
disk_gb_limit = limits.get('disk_gb')
vcpu_limit = limits.get('vcpu')
msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d "
"GB, VCPUs %(vcpus)d")
params = {'memory_mb': self.memory_mb, 'disk_gb': self.disk_gb,
'vcpus': self.vcpus}
LOG.audit(msg % params, instance=self.instance)
# Test for resources:
can_claim = (self._test_memory(resources, memory_mb_limit) and
self._test_disk(resources, disk_gb_limit) and
self._test_cpu(resources, vcpu_limit))
if can_claim:
LOG.audit(_("Claim successful"), instance=self.instance)
else:
LOG.audit(_("Claim failed"), instance=self.instance)
return can_claim
def _test_memory(self, resources, limit):
type_ = _("Memory")
unit = "MB"
total = resources['memory_mb']
used = resources['memory_mb_used']
requested = self.memory_mb
return self._test(type_, unit, total, used, requested, limit)
def _test_disk(self, resources, limit):
type_ = _("Disk")
unit = "GB"
total = resources['local_gb']
used = resources['local_gb_used']
requested = self.disk_gb
return self._test(type_, unit, total, used, requested, limit)
def _test_cpu(self, resources, limit):
type_ = _("CPU")
unit = "VCPUs"
total = resources['vcpus']
used = resources['vcpus_used']
requested = self.vcpus
return self._test(type_, unit, total, used, requested, limit)
def _test(self, type_, unit, total, used, requested, limit):
"""Test if the given type of resource needed for a claim can be safely
allocated.
"""
msg = _("Total %(type_)s: %(total)d %(unit)s, used: %(used)d %(unit)s")
LOG.audit(msg % locals(), instance=self.instance)
if limit is None:
# treat resource as unlimited:
LOG.audit(_("%(type_)s limit not specified, defaulting to "
"unlimited") % locals(), instance=self.instance)
return True
free = limit - used
# Oversubscribed resource policy info:
msg = _("%(type_)s limit: %(limit)d %(unit)s, free: %(free)d "
"%(unit)s") % locals()
LOG.audit(msg, instance=self.instance)
can_claim = requested <= free
if not can_claim:
msg = _("Unable to claim resources. Free %(type_)s %(free)d "
"%(unit)s < requested %(requested)d %(unit)s") % locals()
LOG.info(msg, instance=self.instance)
return can_claim

View File

@@ -522,7 +522,7 @@ class ComputeManager(manager.SchedulerDependentManager):
network_info = None
try:
limits = filter_properties.get('limits', {})
with self.resource_tracker.resource_claim(context, instance,
with self.resource_tracker.instance_claim(context, instance,
limits):
network_info = self._allocate_network(context, instance,

View File

@@ -19,14 +19,15 @@ scheduler with useful information about availability through the ComputeNode
model.
"""
from nova.compute import claims
from nova.compute import vm_states
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import notifications
from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova import utils
@@ -45,63 +46,7 @@ FLAGS = flags.FLAGS
FLAGS.register_opts(resource_tracker_opts)
LOG = logging.getLogger(__name__)
COMPUTE_RESOURCE_SEMAPHORE = "compute_resources"
class Claim(object):
"""A declaration that a compute host operation will require free resources.
Claims serve as marker objects that resources are being held until the
update_available_resource audit process runs to do a full reconciliation
of resource usage.
This information will be used to help keep the local compute hosts's
ComputeNode model in sync to aid the scheduler in making efficient / more
correct decisions with respect to host selection.
"""
def __init__(self, instance):
self.instance = jsonutils.to_primitive(instance)
@property
def claim_id(self):
return self.instance['uuid']
@property
def disk_gb(self):
return self.instance['root_gb'] + self.instance['ephemeral_gb']
@property
def memory_mb(self):
return self.instance['memory_mb']
@property
def vcpus(self):
return self.instance['vcpus']
def __str__(self):
return "[Claim %s: %d MB memory, %d GB disk, %d VCPUS]" % \
(self.claim_id, self.memory_mb, self.disk_gb, self.vcpus)
class ResourceContextManager(object):
def __init__(self, context, claim, tracker):
self.context = context
self.claim = claim
self.tracker = tracker
def __enter__(self):
if not self.claim and not self.tracker.disabled:
# insufficient resources to complete request
raise exception.ComputeResourcesUnavailable()
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.claim:
return
if exc_type is None:
self.tracker.finish_resource_claim(self.claim)
else:
self.tracker.abort_resource_claim(self.context, self.claim)
COMPUTE_RESOURCE_SEMAPHORE = claims.COMPUTE_RESOURCE_SEMAPHORE
class ResourceTracker(object):
@@ -113,17 +58,11 @@ class ResourceTracker(object):
self.host = host
self.driver = driver
self.compute_node = None
self.next_claim_id = 1
self.claims = {}
self.stats = importutils.import_object(FLAGS.compute_stats_class)
self.tracked_instances = {}
def resource_claim(self, context, instance_ref, limits=None):
claim = self.begin_resource_claim(context, instance_ref, limits)
return ResourceContextManager(context, claim, self)
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def begin_resource_claim(self, context, instance_ref, limits=None):
def instance_claim(self, context, instance_ref, limits=None):
"""Indicate that some resources are needed for an upcoming compute
instance build operation.
@@ -134,17 +73,16 @@ class ResourceTracker(object):
:param instance_ref: instance to reserve resources for
:param limits: Dict of oversubscription limits for memory, disk,
and CPUs.
:returns: An integer 'claim ticket'. This should be turned into
finalize a resource claim or free resources after the
compute operation is finished. Returns None if the claim
failed.
:returns: A Claim ticket representing the reserved resources. It can
be used to revert the resource usage if an error occurs
during the instance build.
"""
if self.disabled:
# compute_driver doesn't support resource tracking, just
# set the 'host' field and continue the build:
instance_ref = self._set_instance_host(context,
instance_ref['uuid'])
return
instance_ref['uuid'])
return claims.NopClaim()
# sanity check:
if instance_ref['host']:
@@ -152,46 +90,23 @@ class ResourceTracker(object):
"until resources have been claimed."),
instance=instance_ref)
if not limits:
limits = {}
claim = claims.Claim(instance_ref, self)
# If an individual limit is None, the resource will be considered
# unlimited:
memory_mb_limit = limits.get('memory_mb')
disk_gb_limit = limits.get('disk_gb')
vcpu_limit = limits.get('vcpu')
if claim.test(self.compute_node, limits):
memory_mb = instance_ref['memory_mb']
disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb']
vcpus = instance_ref['vcpus']
instance_ref = self._set_instance_host(context,
instance_ref['uuid'])
msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d "
"GB, VCPUs %(vcpus)d") % locals()
LOG.audit(msg)
# Mark resources in-use and update stats
self._update_usage_from_instance(self.compute_node, instance_ref)
# Test for resources:
if not self._can_claim_memory(memory_mb, memory_mb_limit):
return
# persist changes to the compute node:
self._update(context, self.compute_node)
if not self._can_claim_disk(disk_gb, disk_gb_limit):
return
return claim
if not self._can_claim_cpu(vcpus, vcpu_limit):
return
instance_ref = self._set_instance_host(context, instance_ref['uuid'])
# keep track of this claim until we know whether the compute operation
# was successful/completed:
claim = Claim(instance_ref)
self.claims[claim.claim_id] = claim
# Mark resources in-use and update stats
self._update_usage_from_instance(self.compute_node, instance_ref)
# persist changes to the compute node:
self._update(context, self.compute_node)
return claim
else:
raise exception.ComputeResourcesUnavailable()
def _set_instance_host(self, context, instance_uuid):
"""Tag the instance as belonging to this host. This should be done
@@ -204,130 +119,15 @@ class ResourceTracker(object):
notifications.send_update(context, old_ref, instance_ref)
return instance_ref
def _can_claim_memory(self, memory_mb, memory_mb_limit):
"""Test if memory needed for a claim can be safely allocated"""
# Installed memory and usage info:
msg = _("Total memory: %(total_mem)d MB, used: %(used_mem)d MB, free: "
"%(free_mem)d MB") % dict(
total_mem=self.compute_node['memory_mb'],
used_mem=self.compute_node['memory_mb_used'],
free_mem=self.compute_node['local_gb_used'])
LOG.audit(msg)
def abort_instance_claim(self, instance):
"""Remove usage from the given instance"""
# flag the instance as deleted to revert the resource usage
# and associated stats:
instance['vm_state'] = vm_states.DELETED
self._update_usage_from_instance(self.compute_node, instance)
if memory_mb_limit is None:
# treat memory as unlimited:
LOG.audit(_("Memory limit not specified, defaulting to unlimited"))
return True
free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used']
# Oversubscribed memory policy info:
msg = _("Memory limit: %(memory_mb_limit)d MB, free: "
"%(free_ram_mb)d MB") % locals()
LOG.audit(msg)
can_claim_mem = memory_mb <= free_ram_mb
if not can_claim_mem:
msg = _("Unable to claim resources. Free memory %(free_ram_mb)d "
"MB < requested memory %(memory_mb)d MB") % locals()
LOG.info(msg)
return can_claim_mem
def _can_claim_disk(self, disk_gb, disk_gb_limit):
"""Test if disk space needed can be safely allocated"""
# Installed disk and usage info:
msg = _("Total disk: %(total_disk)d GB, used: %(used_disk)d GB, free: "
"%(free_disk)d GB") % dict(
total_disk=self.compute_node['local_gb'],
used_disk=self.compute_node['local_gb_used'],
free_disk=self.compute_node['free_disk_gb'])
LOG.audit(msg)
if disk_gb_limit is None:
# treat disk as unlimited:
LOG.audit(_("Disk limit not specified, defaulting to unlimited"))
return True
free_disk_gb = disk_gb_limit - self.compute_node['local_gb_used']
# Oversubscribed disk policy info:
msg = _("Disk limit: %(disk_gb_limit)d GB, free: "
"%(free_disk_gb)d GB") % locals()
LOG.audit(msg)
can_claim_disk = disk_gb <= free_disk_gb
if not can_claim_disk:
msg = _("Unable to claim resources. Free disk %(free_disk_gb)d GB"
" < requested disk %(disk_gb)d GB") % dict(
free_disk_gb=self.compute_node['free_disk_gb'],
disk_gb=disk_gb)
LOG.info(msg)
return can_claim_disk
def _can_claim_cpu(self, vcpus, vcpu_limit):
"""Test if CPUs can be safely allocated according to given policy."""
msg = _("Total VCPUs: %(total_vcpus)d, used: %(used_vcpus)d") \
% dict(total_vcpus=self.compute_node['vcpus'],
used_vcpus=self.compute_node['vcpus_used'])
LOG.audit(msg)
if vcpu_limit is None:
# treat cpu as unlimited:
LOG.audit(_("VCPU limit not specified, defaulting to unlimited"))
return True
# Oversubscribed disk policy info:
msg = _("CPU limit: %(vcpu_limit)d") % locals()
LOG.audit(msg)
free_vcpus = vcpu_limit - self.compute_node['vcpus_used']
can_claim_cpu = vcpus <= free_vcpus
if not can_claim_cpu:
msg = _("Unable to claim resources. Free CPU %(free_vcpus)d < "
"requested CPU %(vcpus)d") % locals()
LOG.info(msg)
return can_claim_cpu
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def finish_resource_claim(self, claim):
"""Indicate that the compute operation that previously claimed the
resources identified by 'claim' has now completed and the resources
have been allocated at the virt layer.
:param claim: A claim indicating a set of resources that were
previously claimed.
"""
if self.disabled:
return
if self.claims.pop(claim.claim_id, None):
LOG.debug(_("Finishing claim: %s") % claim)
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def abort_resource_claim(self, context, claim):
"""Indicate that the operation that claimed the resources identified by
'claim_id' has either failed or been aborted and the resources are no
longer needed.
:param claim: A claim ticket indicating a set of resources that were
previously claimed.
"""
if self.disabled:
return
if self.claims.pop(claim.claim_id, None):
LOG.debug(_("Aborting claim: %s") % claim)
# flag the instance as deleted to revert the resource usage
# and associated stats:
claim.instance['vm_state'] = vm_states.DELETED
self._update_usage_from_instance(self.compute_node, claim.instance)
self._update(context, self.compute_node)
ctxt = context.get_admin_context()
self._update(ctxt, self.compute_node)
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def update_usage(self, context, instance):
@@ -363,15 +163,12 @@ class ResourceTracker(object):
LOG.audit(_("Virt driver does not support "
"'get_available_resource' Compute tracking is disabled."))
self.compute_node = None
self.claims = {}
return
self._verify_resources(resources)
self._report_hypervisor_resource_view(resources)
self._purge_claims()
# Grab all instances assigned to this host:
instances = db.instance_get_all_by_host(context, self.host)
@@ -405,12 +202,6 @@ class ResourceTracker(object):
self._update(context, resources, prune_stats=True)
LOG.info(_('Compute_service record updated for %s ') % self.host)
def _purge_claims(self):
"""Purge claims. They are no longer needed once the audit process
reconciles usage values from the database.
"""
self.claims.clear()
def _create(self, context, values):
"""Create the compute node in the DB"""
# initialize load stats from existing instances:

View File

@@ -0,0 +1,125 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for resource tracker claims"""
import uuid
from nova.compute import claims
from nova.openstack.common import log as logging
from nova import test
LOG = logging.getLogger(__name__)
class ClaimTestCase(test.TestCase):
def setUp(self):
super(ClaimTestCase, self).setUp()
self.resources = self._fake_resources()
def _claim(self, **kwargs):
instance = self._fake_instance(**kwargs)
return claims.Claim(instance, None)
def _fake_instance(self, **kwargs):
instance = {
'uuid': str(uuid.uuid1()),
'memory_mb': 1024,
'root_gb': 10,
'ephemeral_gb': 5,
'vcpus': 1
}
instance.update(**kwargs)
return instance
def _fake_resources(self, values=None):
resources = {
'memory_mb': 2048,
'memory_mb_used': 0,
'free_ram_mb': 2048,
'local_gb': 20,
'local_gb_used': 0,
'free_disk_gb': 20,
'vcpus': 2,
'vcpus_used': 0
}
if values:
resources.update(values)
return resources
def test_cpu_unlimited(self):
claim = self._claim(vcpus=100000)
self.assertTrue(claim.test(self.resources))
def test_memory_unlimited(self):
claim = self._claim(memory_mb=99999999)
self.assertTrue(claim.test(self.resources))
def test_disk_unlimited_root(self):
claim = self._claim(root_gb=999999)
self.assertTrue(claim.test(self.resources))
def test_disk_unlimited_ephemeral(self):
claim = self._claim(ephemeral_gb=999999)
self.assertTrue(claim.test(self.resources))
def test_cpu_oversubscription(self):
claim = self._claim(vcpus=8)
limits = {'vcpu': 16}
self.assertTrue(claim.test(self.resources, limits))
def test_cpu_insufficient(self):
claim = self._claim(vcpus=17)
limits = {'vcpu': 16}
self.assertFalse(claim.test(self.resources, limits))
def test_memory_oversubscription(self):
claim = self._claim(memory_mb=4096)
limits = {'memory_mb': 8192}
self.assertTrue(claim.test(self.resources, limits))
def test_memory_insufficient(self):
claim = self._claim(memory_mb=16384)
limits = {'memory_mb': 8192}
self.assertFalse(claim.test(self.resources, limits))
def test_disk_oversubscription(self):
claim = self._claim(root_gb=10, ephemeral_gb=40)
limits = {'disk_gb': 60}
self.assertTrue(claim.test(self.resources, limits))
def test_disk_insufficient(self):
claim = self._claim(root_gb=10, ephemeral_gb=40)
limits = {'disk_gb': 45}
self.assertFalse(claim.test(self.resources, limits))
def test_abort(self):
instance = self._fake_instance(root_gb=10, ephemeral_gb=40)
def fake_abort(self):
self._called = True
self.stubs.Set(claims.Claim, 'abort', fake_abort)
claim = None
try:
with claims.Claim(instance, None) as claim:
raise test.TestingException("abort")
except test.TestingException:
pass
self.assertTrue(claim._called)

View File

@@ -33,8 +33,16 @@ from nova.virt import driver
LOG = logging.getLogger(__name__)
FAKE_VIRT_MEMORY_MB = 5
FAKE_VIRT_LOCAL_GB = 6
FAKE_VIRT_VCPUS = 1
class UnsupportedVirtDriver(driver.ComputeDriver):
"""Pretend version of a lame virt driver"""
def __init__(self):
super(UnsupportedVirtDriver, self).__init__(None)
def get_available_resource(self):
# no support for getting resource usage info
return {}
@@ -42,10 +50,11 @@ class UnsupportedVirtDriver(driver.ComputeDriver):
class FakeVirtDriver(driver.ComputeDriver):
def __init__(self, virtapi):
self.memory_mb = 5
self.local_gb = 6
self.vcpus = 1
def __init__(self):
super(FakeVirtDriver, self).__init__(None)
self.memory_mb = FAKE_VIRT_MEMORY_MB
self.local_gb = FAKE_VIRT_LOCAL_GB
self.vcpus = FAKE_VIRT_VCPUS
self.memory_mb_used = 0
self.local_gb_used = 0
@@ -148,9 +157,9 @@ class BaseTestCase(test.TestCase):
host = "fakehost"
if unsupported:
driver = UnsupportedVirtDriver(None)
driver = UnsupportedVirtDriver()
else:
driver = FakeVirtDriver(None)
driver = FakeVirtDriver()
tracker = resource_tracker.ResourceTracker(host, driver)
return tracker
@@ -166,38 +175,31 @@ class UnsupportedDriverTestCase(BaseTestCase):
# seed tracker with data:
self.tracker.update_available_resource(self.context)
def testDisabled(self):
def test_disabled(self):
# disabled = no compute node stats
self.assertTrue(self.tracker.disabled)
self.assertEqual(None, self.tracker.compute_node)
def testDisabledClaim(self):
def test_disabled_claim(self):
# basic claim:
instance = self._fake_instance()
claim = self.tracker.begin_resource_claim(self.context, instance)
self.assertEqual(None, claim)
claim = self.tracker.instance_claim(self.context, instance)
self.assertEqual(0, claim.memory_mb)
def testDisabledInstanceClaim(self):
def test_disabled_instance_claim(self):
# instance variation:
instance = self._fake_instance()
claim = self.tracker.begin_resource_claim(self.context, instance)
self.assertEqual(None, claim)
claim = self.tracker.instance_claim(self.context, instance)
self.assertEqual(0, claim.memory_mb)
def testDisabledInstanceContextClaim(self):
def test_disabled_instance_context_claim(self):
# instance context manager variation:
instance = self._fake_instance()
with self.tracker.resource_claim(self.context, instance):
pass
self.assertEqual(0, len(self.tracker.claims))
claim = self.tracker.instance_claim(self.context, instance)
with self.tracker.instance_claim(self.context, instance) as claim:
self.assertEqual(0, claim.memory_mb)
def testDisabledFinishClaim(self):
self.assertEqual(None, self.tracker.finish_resource_claim(None))
def testDisabledAbortClaim(self):
self.assertEqual(None, self.tracker.abort_resource_claim(self.context,
None))
def testDisabledUpdateUsage(self):
def test_disabled_updated_usage(self):
instance = self._fake_instance(host='fakehost', memory_mb=5,
root_gb=10)
self.tracker.update_usage(self.context, instance)
@@ -209,8 +211,7 @@ class MissingServiceTestCase(BaseTestCase):
self.context = context.get_admin_context()
self.tracker = self._tracker()
def testMissingService(self):
"""No service record in DB."""
def test_missing_service(self):
self.tracker.update_available_resource(self.context)
self.assertTrue(self.tracker.disabled)
@@ -234,11 +235,11 @@ class MissingComputeNodeTestCase(BaseTestCase):
service = self._create_service()
return [service]
def testCreatedComputeNode(self):
def test_create_compute_node(self):
self.tracker.update_available_resource(self.context)
self.assertTrue(self.created)
def testEnabled(self):
def test_enabled(self):
self.tracker.update_available_resource(self.context)
self.assertFalse(self.tracker.disabled)
@@ -253,6 +254,7 @@ class ResourceTestCase(BaseTestCase):
self._fake_compute_node_update)
self.tracker.update_available_resource(self.context)
self.limits = self._basic_limits()
def _fake_service_get_all_compute_by_host(self, ctx, host):
self.compute = self._create_compute_node()
@@ -267,10 +269,15 @@ class ResourceTestCase(BaseTestCase):
self.compute.update(values)
return self.compute
def testUpdateUseOnlyForTracked(self):
"""Only update usage is a previous claim has added instance to
list of tracked instances.
"""
def _basic_limits(self):
"""Get basic limits, no oversubscription"""
return {
'memory_mb': FAKE_VIRT_MEMORY_MB * 2,
'disk_gb': FAKE_VIRT_LOCAL_GB,
'vcpu': FAKE_VIRT_VCPUS,
}
def test_update_usage_only_for_tracked(self):
instance = self._fake_instance(memory_mb=3, root_gb=1, ephemeral_gb=1,
task_state=None)
self.tracker.update_usage(self.context, instance)
@@ -279,8 +286,9 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
self.assertEqual(0, self.tracker.compute_node['current_workload'])
claim = self.tracker.begin_resource_claim(self.context, instance)
self.assertNotEqual(None, claim)
claim = self.tracker.instance_claim(self.context, instance,
self.limits)
self.assertNotEqual(0, claim.memory_mb)
self.assertEqual(3, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
@@ -292,126 +300,21 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
self.assertEqual(1, self.tracker.compute_node['current_workload'])
def testFreeRamResourceValue(self):
driver = FakeVirtDriver(None)
def test_free_ram_resource_value(self):
driver = FakeVirtDriver()
mem_free = driver.memory_mb - driver.memory_mb_used
self.assertEqual(mem_free, self.tracker.compute_node['free_ram_mb'])
def testFreeDiskResourceValue(self):
driver = FakeVirtDriver(None)
def test_free_disk_resource_value(self):
driver = FakeVirtDriver()
mem_free = driver.local_gb - driver.local_gb_used
self.assertEqual(mem_free, self.tracker.compute_node['free_disk_gb'])
def testUpdateComputeNode(self):
def test_update_compute_node(self):
self.assertFalse(self.tracker.disabled)
self.assertTrue(self.updated)
def testCpuUnlimited(self):
"""Test default of unlimited CPU"""
self.assertEqual(0, self.tracker.compute_node['vcpus_used'])
instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1,
vcpus=100000)
claim = self.tracker.begin_resource_claim(self.context, instance)
self.assertNotEqual(None, claim)
self.assertEqual(100000, self.tracker.compute_node['vcpus_used'])
def testCpuOversubscription(self):
"""Test client-supplied oversubscription of CPU"""
self.assertEqual(1, self.tracker.compute_node['vcpus'])
instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1,
vcpus=3)
limits = {'vcpu': 5}
claim = self.tracker.begin_resource_claim(self.context, instance,
limits)
self.assertNotEqual(None, claim)
self.assertEqual(3, self.tracker.compute_node['vcpus_used'])
def testMemoryOversubscription(self):
"""Test client-supplied oversubscription of memory"""
instance = self._fake_instance(memory_mb=8, root_gb=1, ephemeral_gb=1)
limits = {'memory_mb': 8}
claim = self.tracker.begin_resource_claim(self.context, instance,
limits)
self.assertNotEqual(None, claim)
self.assertEqual(8, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
def testDiskOversubscription(self):
"""Test client-supplied oversubscription of disk space"""
instance = self._fake_instance(memory_mb=1, root_gb=10, ephemeral_gb=1)
limits = {'disk_gb': 12}
claim = self.tracker.begin_resource_claim(self.context, instance,
limits)
self.assertNotEqual(None, claim)
self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(11, self.tracker.compute_node['local_gb_used'])
def testUnlimitedMemoryClaim(self):
"""Test default of unlimited memory"""
instance = self._fake_instance(memory_mb=200000000000, root_gb=1,
ephemeral_gb=1)
claim = self.tracker.begin_resource_claim(self.context, instance)
self.assertNotEqual(None, claim)
self.assertEqual(200000000000,
self.tracker.compute_node['memory_mb_used'])
def testInsufficientMemoryClaimWithOversubscription(self):
"""Exceed oversubscribed memory limit of 10MB"""
instance = self._fake_instance(memory_mb=10, root_gb=0,
ephemeral_gb=0)
limits = {'memory_mb': 10}
claim = self.tracker.begin_resource_claim(self.context, instance,
limits)
self.assertNotEqual(None, claim)
instance = self._fake_instance(memory_mb=1, root_gb=0,
ephemeral_gb=0)
limits = {'memory_mb': 10}
claim = self.tracker.begin_resource_claim(self.context, instance,
limits)
self.assertEqual(None, claim)
def testUnlimitDiskClaim(self):
"""Test default of unlimited disk space"""
instance = self._fake_instance(memory_mb=0, root_gb=200000000,
ephemeral_gb=0)
claim = self.tracker.begin_resource_claim(self.context, instance)
self.assertNotEqual(None, claim)
self.assertEqual(200000000, self.tracker.compute_node['local_gb_used'])
def testInsufficientDiskClaimWithOversubscription(self):
"""Exceed oversubscribed disk limit of 10GB"""
instance = self._fake_instance(memory_mb=1, root_gb=4,
ephemeral_gb=5) # 9 GB
limits = {'disk_gb': 10}
claim = self.tracker.begin_resource_claim(self.context, instance,
limits)
self.assertNotEqual(None, claim)
instance = self._fake_instance(memory_mb=1, root_gb=1,
ephemeral_gb=1) # 2 GB
limits = {'disk_gb': 10}
claim = self.tracker.begin_resource_claim(self.context, instance,
limits)
self.assertEqual(None, claim)
def testInsufficientCpuClaim(self):
instance = self._fake_instance(memory_mb=0, root_gb=0,
ephemeral_gb=0, vcpus=1)
claim = self.tracker.begin_resource_claim(self.context, instance)
self.assertNotEqual(None, claim)
self.assertEqual(1, self.tracker.compute_node['vcpus_used'])
instance = self._fake_instance(memory_mb=0, root_gb=0,
ephemeral_gb=0, vcpus=1)
limits = {'vcpu': 1}
claim = self.tracker.begin_resource_claim(self.context, instance,
limits)
self.assertEqual(None, claim)
def testClaimAndFinish(self):
def test_claim_and_audit(self):
self.assertEqual(5, self.tracker.compute_node['memory_mb'])
self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
@@ -422,7 +325,9 @@ class ResourceTestCase(BaseTestCase):
claim_disk = 2
instance = self._fake_instance(memory_mb=claim_mem, root_gb=claim_disk,
ephemeral_gb=0)
claim = self.tracker.begin_resource_claim(self.context, instance)
claim = self.tracker.instance_claim(self.context, instance,
self.limits)
self.assertEqual(5, self.compute["memory_mb"])
self.assertEqual(claim_mem, self.compute["memory_mb_used"])
@@ -448,18 +353,7 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(claim_disk, self.compute['local_gb_used'])
self.assertEqual(6 - claim_disk, self.compute['free_disk_gb'])
# Finally, finish the claimm and update from the virt layer again.
# Resource usage will be consistent again:
self.tracker.finish_resource_claim(claim)
self.tracker.update_available_resource(self.context)
self.assertEqual(claim_mem, self.compute['memory_mb_used'])
self.assertEqual(5 - claim_mem, self.compute['free_ram_mb'])
self.assertEqual(claim_disk, self.compute['local_gb_used'])
self.assertEqual(6 - claim_disk, self.compute['free_disk_gb'])
def testClaimAndAbort(self):
def test_claim_and_abort(self):
self.assertEqual(5, self.tracker.compute_node['memory_mb'])
self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
@@ -470,7 +364,8 @@ class ResourceTestCase(BaseTestCase):
claim_disk = 2
instance = self._fake_instance(memory_mb=claim_mem,
root_gb=claim_disk, ephemeral_gb=0)
claim = self.tracker.begin_resource_claim(self.context, instance)
claim = self.tracker.instance_claim(self.context, instance,
self.limits)
self.assertNotEqual(None, claim)
self.assertEqual(5, self.compute["memory_mb"])
@@ -481,7 +376,7 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(claim_disk, self.compute["local_gb_used"])
self.assertEqual(6 - claim_disk, self.compute["free_disk_gb"])
self.tracker.abort_resource_claim(self.context, claim)
claim.abort()
self.assertEqual(5, self.compute["memory_mb"])
self.assertEqual(0, self.compute["memory_mb_used"])
@@ -491,25 +386,42 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(0, self.compute["local_gb_used"])
self.assertEqual(6, self.compute["free_disk_gb"])
def testClaimsPurge(self):
"""Test that claims get get purged when the audit process runs"""
def test_instance_claim_with_oversubscription(self):
memory_mb = FAKE_VIRT_MEMORY_MB * 2
root_gb = ephemeral_gb = FAKE_VIRT_LOCAL_GB
vcpus = FAKE_VIRT_VCPUS * 2
instance = self._fake_instance(memory_mb=2, root_gb=2, ephemeral_gb=0)
claim = self.tracker.begin_resource_claim(self.context, instance)
limits = {'memory_mb': memory_mb, 'disk_gb': root_gb * 2,
'vcpu': vcpus}
instance = self._fake_instance(memory_mb=memory_mb,
root_gb=root_gb, ephemeral_gb=ephemeral_gb)
self.tracker.update_available_resource(self.context)
self.assertEqual({}, self.tracker.claims)
self.tracker.instance_claim(self.context, instance, limits)
self.assertEqual(memory_mb,
self.tracker.compute_node['memory_mb_used'])
self.assertEqual(root_gb * 2,
self.tracker.compute_node['local_gb_used'])
def testInstanceClaim(self):
instance = self._fake_instance(memory_mb=1, root_gb=0, ephemeral_gb=2)
self.tracker.begin_resource_claim(self.context, instance)
self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
def test_additive_claims(self):
self.limits['vcpu'] = 2
def testContextClaimWithException(self):
instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1,
vcpus=1)
with self.tracker.instance_claim(self.context, instance, self.limits):
pass
instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1,
vcpus=1)
with self.tracker.instance_claim(self.context, instance, self.limits):
pass
self.assertEqual(2, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(4, self.tracker.compute_node['local_gb_used'])
self.assertEqual(2, self.tracker.compute_node['vcpus_used'])
def test_context_claim_with_exception(self):
instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1)
try:
with self.tracker.resource_claim(self.context, instance):
with self.tracker.instance_claim(self.context, instance):
# <insert exciting things that utilize resources>
raise test.TestingException()
except test.TestingException:
@@ -520,9 +432,9 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(0, self.compute['memory_mb_used'])
self.assertEqual(0, self.compute['local_gb_used'])
def testInstanceContextClaim(self):
def test_instance_context_claim(self):
instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1)
with self.tracker.resource_claim(self.context, instance):
with self.tracker.instance_claim(self.context, instance):
# <insert exciting things that utilize resources>
self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
@@ -537,12 +449,12 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(1, self.compute['memory_mb_used'])
self.assertEqual(2, self.compute['local_gb_used'])
def testUpdateLoadStatsForInstance(self):
def test_update_load_stats_for_instance(self):
self.assertFalse(self.tracker.disabled)
self.assertEqual(0, self.tracker.compute_node['current_workload'])
instance = self._fake_instance(task_state=task_states.SCHEDULING)
with self.tracker.resource_claim(self.context, instance):
with self.tracker.instance_claim(self.context, instance):
pass
self.assertEqual(1, self.tracker.compute_node['current_workload'])
@@ -554,7 +466,7 @@ class ResourceTestCase(BaseTestCase):
self.tracker.update_usage(self.context, instance)
self.assertEqual(0, self.tracker.compute_node['current_workload'])
def testCpuStats(self):
def test_cpu_stats(self):
limits = {'disk_gb': 100, 'memory_mb': 100}
self.assertEqual(0, self.tracker.compute_node['vcpus_used'])
@@ -564,7 +476,7 @@ class ResourceTestCase(BaseTestCase):
self.tracker.update_usage(self.context, instance)
self.assertEqual(0, self.tracker.compute_node['vcpus_used'])
with self.tracker.resource_claim(self.context, instance, limits):
with self.tracker.instance_claim(self.context, instance, limits):
pass
self.assertEqual(1, self.tracker.compute_node['vcpus_used'])
@@ -574,7 +486,7 @@ class ResourceTestCase(BaseTestCase):
self.assertEqual(1, self.tracker.compute_node['vcpus_used'])
instance = self._fake_instance(vcpus=10)
with self.tracker.resource_claim(self.context, instance, limits):
with self.tracker.instance_claim(self.context, instance, limits):
pass
self.assertEqual(11, self.tracker.compute_node['vcpus_used'])