Merge "Fix bugs in resource tracker and cleanup"
This commit is contained in:
commit
59ad151a29
@ -251,8 +251,7 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
|
||||
(old_ref, instance_ref) = self.db.instance_update_and_get_original(
|
||||
context, instance_uuid, kwargs)
|
||||
self.resource_tracker.update_load_stats_for_instance(context,
|
||||
instance_ref)
|
||||
self.resource_tracker.update_usage(context, instance_ref)
|
||||
notifications.send_update(context, old_ref, instance_ref)
|
||||
|
||||
return instance_ref
|
||||
@ -480,10 +479,14 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
network_info = self._allocate_network(context, instance,
|
||||
requested_networks)
|
||||
try:
|
||||
memory_mb_limit = filter_properties.get('memory_mb_limit',
|
||||
None)
|
||||
with self.resource_tracker.instance_resource_claim(context,
|
||||
instance, memory_mb_limit=memory_mb_limit):
|
||||
limits = filter_properties.get('limits', {})
|
||||
with self.resource_tracker.resource_claim(context, instance,
|
||||
limits):
|
||||
# Resources are available to build this instance here,
|
||||
# mark it as belonging to this host:
|
||||
self._instance_update(context, instance['uuid'],
|
||||
host=self.host, launched_on=self.host)
|
||||
|
||||
block_device_info = self._prep_block_device(context,
|
||||
instance)
|
||||
instance = self._spawn(context, instance, image_meta,
|
||||
@ -684,7 +687,6 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
LOG.audit(_('Starting instance...'), context=context,
|
||||
instance=instance)
|
||||
self._instance_update(context, instance['uuid'],
|
||||
host=self.host, launched_on=self.host,
|
||||
vm_state=vm_states.BUILDING,
|
||||
task_state=None,
|
||||
expected_task_state=(task_states.SCHEDULING,
|
||||
@ -889,8 +891,6 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
self.db.instance_destroy(context, instance_uuid)
|
||||
system_meta = self.db.instance_system_metadata_get(context,
|
||||
instance_uuid)
|
||||
# mark resources free
|
||||
self.resource_tracker.free_resources(context)
|
||||
self._notify_about_instance_usage(context, instance, "delete.end",
|
||||
system_metadata=system_meta)
|
||||
|
||||
|
@ -19,17 +19,22 @@ scheduler with useful information about availability through the ComputeNode
|
||||
model.
|
||||
"""
|
||||
|
||||
from nova import context
|
||||
from nova.compute import vm_states
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova import flags
|
||||
from nova.openstack.common import cfg
|
||||
from nova.openstack.common import importutils
|
||||
from nova.openstack.common import jsonutils
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common import timeutils
|
||||
from nova import utils
|
||||
|
||||
resource_tracker_opts = [
|
||||
cfg.IntOpt('reserved_host_disk_mb', default=0,
|
||||
help='Amount of disk in MB to reserve for the host'),
|
||||
cfg.IntOpt('reserved_host_memory_mb', default=512,
|
||||
help='Amount of memory in MB to reserve for the host'),
|
||||
cfg.IntOpt('claim_timeout_seconds', default=600,
|
||||
help='How long, in seconds, before a resource claim times out'),
|
||||
cfg.StrOpt('compute_stats_class',
|
||||
@ -52,46 +57,36 @@ class Claim(object):
|
||||
correct decisions with respect to host selection.
|
||||
"""
|
||||
|
||||
def __init__(self, claim_id, memory_mb, disk_gb, timeout, *args, **kwargs):
|
||||
self.claim_id = claim_id
|
||||
self.memory_mb = memory_mb
|
||||
self.disk_gb = disk_gb
|
||||
def __init__(self, instance, timeout):
|
||||
self.instance = jsonutils.to_primitive(instance)
|
||||
self.timeout = timeout
|
||||
self.expire_ts = timeutils.utcnow_ts() + timeout
|
||||
|
||||
def apply_claim(self, resources):
|
||||
"""Adjust the resources required from available resources.
|
||||
|
||||
:param resources: Should be a dictionary-like object that
|
||||
has fields like a compute node
|
||||
"""
|
||||
return self._apply(resources)
|
||||
|
||||
def undo_claim(self, resources):
|
||||
return self._apply(resources, sign=-1)
|
||||
|
||||
def is_expired(self):
|
||||
"""Determine if this adjustment is old enough that we can assume it's
|
||||
no longer needed.
|
||||
"""
|
||||
return timeutils.utcnow_ts() > self.expire_ts
|
||||
|
||||
def _apply(self, resources, sign=1):
|
||||
values = {}
|
||||
values['memory_mb_used'] = (resources['memory_mb_used'] + sign *
|
||||
self.memory_mb)
|
||||
values['free_ram_mb'] = (resources['free_ram_mb'] - sign *
|
||||
self.memory_mb)
|
||||
values['local_gb_used'] = (resources['local_gb_used'] + sign *
|
||||
self.disk_gb)
|
||||
values['free_disk_gb'] = (resources['free_disk_gb'] - sign *
|
||||
self.disk_gb)
|
||||
@property
|
||||
def claim_id(self):
|
||||
return self.instance['uuid']
|
||||
|
||||
return values
|
||||
@property
|
||||
def disk_gb(self):
|
||||
return self.instance['root_gb'] + self.instance['ephemeral_gb']
|
||||
|
||||
@property
|
||||
def memory_mb(self):
|
||||
return self.instance['memory_mb']
|
||||
|
||||
@property
|
||||
def vcpus(self):
|
||||
return self.instance['vcpus']
|
||||
|
||||
def __str__(self):
|
||||
return "[Claim %d: %d MB memory, %d GB disk]" % (self.claim_id,
|
||||
self.memory_mb, self.disk_gb)
|
||||
return "[Claim %s: %d MB memory, %d GB disk, %d VCPUS]" % \
|
||||
(self.claim_id, self.memory_mb, self.disk_gb, self.vcpus)
|
||||
|
||||
|
||||
class ResourceContextManager(object):
|
||||
@ -127,46 +122,25 @@ class ResourceTracker(object):
|
||||
self.next_claim_id = 1
|
||||
self.claims = {}
|
||||
self.stats = importutils.import_object(FLAGS.compute_stats_class)
|
||||
self.tracked_instances = {}
|
||||
|
||||
def resource_claim(self, context, *args, **kwargs):
|
||||
claim = self.begin_resource_claim(context, *args, **kwargs)
|
||||
return ResourceContextManager(context, claim, self)
|
||||
|
||||
def instance_resource_claim(self, context, instance_ref, *args, **kwargs):
|
||||
claim = self.begin_instance_resource_claim(context, instance_ref,
|
||||
*args, **kwargs)
|
||||
def resource_claim(self, context, instance_ref, limits=None):
|
||||
claim = self.begin_resource_claim(context, instance_ref, limits)
|
||||
return ResourceContextManager(context, claim, self)
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def begin_instance_resource_claim(self, context, instance_ref, *args,
|
||||
**kwargs):
|
||||
"""Method to begin a resource claim for a new instance."""
|
||||
memory_mb = instance_ref['memory_mb']
|
||||
disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb']
|
||||
|
||||
claim = self._do_begin_resource_claim(context, memory_mb, disk_gb,
|
||||
*args, **kwargs)
|
||||
if claim:
|
||||
# also update load stats related to new instances firing up -
|
||||
values = self._create_load_stats(context, instance_ref)
|
||||
self.compute_node = self._update(context, values)
|
||||
return claim
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def begin_resource_claim(self, context, memory_mb, disk_gb,
|
||||
memory_mb_limit=None, timeout=None, *args, **kwargs):
|
||||
def begin_resource_claim(self, context, instance_ref, limits=None,
|
||||
timeout=None):
|
||||
"""Indicate that some resources are needed for an upcoming compute
|
||||
host operation.
|
||||
instance build operation.
|
||||
|
||||
This should be called any time the compute node is about to perform
|
||||
an operation that will consume resources.
|
||||
This should be called before the compute node is about to perform
|
||||
an instance build operation that will consume additional resources.
|
||||
|
||||
:param memory_mb: security context
|
||||
:param memory_mb: Memory in MB to be claimed
|
||||
:param root_gb: Disk in GB to be claimed
|
||||
:param memory_mb_limit: Memory in MB that is the maximum to allocate on
|
||||
this node. May exceed installed physical memory if
|
||||
oversubscription is the desired behavior.
|
||||
:param context: security context
|
||||
:param instance_ref: instance to reserve resources for
|
||||
:param limits: Dict of oversubscription limits for memory, disk,
|
||||
and CPUs.
|
||||
:param timeout: How long, in seconds, the operation that requires
|
||||
these resources should take to actually allocate what
|
||||
it needs from the hypervisor. If the timeout is
|
||||
@ -177,71 +151,142 @@ class ResourceTracker(object):
|
||||
compute operation is finished. Returns None if the claim
|
||||
failed.
|
||||
"""
|
||||
|
||||
return self._do_begin_resource_claim(context, memory_mb, disk_gb,
|
||||
memory_mb_limit, timeout, *args, **kwargs)
|
||||
|
||||
def _do_begin_resource_claim(self, context, memory_mb, disk_gb,
|
||||
memory_mb_limit=None, timeout=None, *args, **kwargs):
|
||||
|
||||
if self.disabled:
|
||||
return
|
||||
|
||||
if not limits:
|
||||
limits = {}
|
||||
|
||||
if not timeout:
|
||||
timeout = FLAGS.claim_timeout_seconds
|
||||
|
||||
memory_mb = abs(memory_mb)
|
||||
disk_gb = abs(disk_gb)
|
||||
# If an individual limit is None, the resource will be considered
|
||||
# unlimited:
|
||||
memory_mb_limit = limits.get('memory_mb')
|
||||
disk_gb_limit = limits.get('disk_gb')
|
||||
vcpu_limit = limits.get('vcpu')
|
||||
|
||||
memory_mb = instance_ref['memory_mb']
|
||||
disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb']
|
||||
vcpus = instance_ref['vcpus']
|
||||
|
||||
msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d "
|
||||
"GB, mem limit %(memory_mb_limit)s") % locals()
|
||||
"GB, VCPUs %(vcpus)d") % locals()
|
||||
LOG.audit(msg)
|
||||
|
||||
if not memory_mb_limit:
|
||||
# default to total memory:
|
||||
memory_mb_limit = self.compute_node['memory_mb']
|
||||
# Test for resources:
|
||||
if not self._can_claim_memory(memory_mb, memory_mb_limit):
|
||||
return
|
||||
|
||||
free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used']
|
||||
if not self._can_claim_disk(disk_gb, disk_gb_limit):
|
||||
return
|
||||
|
||||
if not self._can_claim_cpu(vcpus, vcpu_limit):
|
||||
return
|
||||
|
||||
# keep track of this claim until we know whether the compute operation
|
||||
# was successful/completed:
|
||||
claim = Claim(instance_ref, timeout)
|
||||
self.claims[claim.claim_id] = claim
|
||||
|
||||
# Mark resources in-use and update stats
|
||||
self._update_usage_from_instance(self.compute_node, instance_ref)
|
||||
|
||||
# persist changes to the compute node:
|
||||
self._update(context, self.compute_node)
|
||||
return claim
|
||||
|
||||
def _can_claim_memory(self, memory_mb, memory_mb_limit):
|
||||
"""Test if memory needed for a claim can be safely allocated"""
|
||||
# Installed memory and usage info:
|
||||
msg = _("Total memory: %(total_mem)d MB, used: %(used_mem)d MB, free: "
|
||||
"%(free_mem)d") % dict(
|
||||
"%(free_mem)d MB") % dict(
|
||||
total_mem=self.compute_node['memory_mb'],
|
||||
used_mem=self.compute_node['memory_mb_used'],
|
||||
free_mem=self.compute_node['local_gb_used'])
|
||||
LOG.audit(msg)
|
||||
|
||||
if memory_mb_limit is None:
|
||||
# treat memory as unlimited:
|
||||
LOG.audit(_("Memory limit not specified, defaulting to unlimited"))
|
||||
return True
|
||||
|
||||
free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used']
|
||||
|
||||
# Oversubscribed memory policy info:
|
||||
msg = _("Limit: %(memory_mb_limit)d MB, free: %(free_ram_mb)d") % \
|
||||
locals()
|
||||
msg = _("Memory limit: %(memory_mb_limit)d MB, free: "
|
||||
"%(free_ram_mb)d MB") % locals()
|
||||
LOG.audit(msg)
|
||||
|
||||
if memory_mb > free_ram_mb:
|
||||
can_claim_mem = memory_mb <= free_ram_mb
|
||||
|
||||
if not can_claim_mem:
|
||||
msg = _("Unable to claim resources. Free memory %(free_ram_mb)d "
|
||||
"MB < requested memory %(memory_mb)d MB") % locals()
|
||||
LOG.info(msg)
|
||||
return None
|
||||
|
||||
if disk_gb > self.compute_node['free_disk_gb']:
|
||||
return can_claim_mem
|
||||
|
||||
def _can_claim_disk(self, disk_gb, disk_gb_limit):
|
||||
"""Test if disk space needed can be safely allocated"""
|
||||
# Installed disk and usage info:
|
||||
msg = _("Total disk: %(total_disk)d GB, used: %(used_disk)d GB, free: "
|
||||
"%(free_disk)d GB") % dict(
|
||||
total_disk=self.compute_node['local_gb'],
|
||||
used_disk=self.compute_node['local_gb_used'],
|
||||
free_disk=self.compute_node['free_disk_gb'])
|
||||
LOG.audit(msg)
|
||||
|
||||
if disk_gb_limit is None:
|
||||
# treat disk as unlimited:
|
||||
LOG.audit(_("Disk limit not specified, defaulting to unlimited"))
|
||||
return True
|
||||
|
||||
free_disk_gb = disk_gb_limit - self.compute_node['local_gb_used']
|
||||
|
||||
# Oversubscribed disk policy info:
|
||||
msg = _("Disk limit: %(disk_gb_limit)d GB, free: "
|
||||
"%(free_disk_gb)d GB") % locals()
|
||||
LOG.audit(msg)
|
||||
|
||||
can_claim_disk = disk_gb <= free_disk_gb
|
||||
if not can_claim_disk:
|
||||
msg = _("Unable to claim resources. Free disk %(free_disk_gb)d GB"
|
||||
" < requested disk %(disk_gb)d GB") % dict(
|
||||
free_disk_gb=self.compute_node['free_disk_gb'],
|
||||
disk_gb=disk_gb)
|
||||
LOG.info(msg)
|
||||
return None
|
||||
|
||||
claim_id = self._get_next_id()
|
||||
c = Claim(claim_id, memory_mb, disk_gb, timeout, *args, **kwargs)
|
||||
return can_claim_disk
|
||||
|
||||
# adjust compute node usage values and save so scheduler will see it:
|
||||
values = c.apply_claim(self.compute_node)
|
||||
self.compute_node = self._update(context, values)
|
||||
def _can_claim_cpu(self, vcpus, vcpu_limit):
|
||||
"""Test if CPUs can be safely allocated according to given policy."""
|
||||
|
||||
# keep track of this claim until we know whether the compute operation
|
||||
# was successful/completed:
|
||||
self.claims[claim_id] = c
|
||||
return c
|
||||
msg = _("Total VCPUs: %(total_vcpus)d, used: %(used_vcpus)d") \
|
||||
% dict(total_vcpus=self.compute_node['vcpus'],
|
||||
used_vcpus=self.compute_node['vcpus_used'])
|
||||
LOG.audit(msg)
|
||||
|
||||
if vcpu_limit is None:
|
||||
# treat cpu as unlimited:
|
||||
LOG.audit(_("VCPU limit not specified, defaulting to unlimited"))
|
||||
return True
|
||||
|
||||
# Oversubscribed disk policy info:
|
||||
msg = _("CPU limit: %(vcpu_limit)d") % locals()
|
||||
LOG.audit(msg)
|
||||
|
||||
free_vcpus = vcpu_limit - self.compute_node['vcpus_used']
|
||||
can_claim_cpu = vcpus <= free_vcpus
|
||||
|
||||
if not can_claim_cpu:
|
||||
msg = _("Unable to claim resources. Free CPU %(free_vcpus)d < "
|
||||
"requested CPU %(vcpus)d") % locals()
|
||||
LOG.info(msg)
|
||||
|
||||
return can_claim_cpu
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def finish_resource_claim(self, claim):
|
||||
"""Indicate that the compute operation that previously claimed the
|
||||
resources identified by 'claim' has now completed and the resources
|
||||
@ -260,7 +305,7 @@ class ResourceTracker(object):
|
||||
if self.claims.pop(claim.claim_id, None):
|
||||
LOG.info(_("Finishing claim: %s") % claim)
|
||||
else:
|
||||
LOG.info(_("Can't find claim %d. It may have been 'finished' "
|
||||
LOG.info(_("Can't find claim %s. It may have been 'finished' "
|
||||
"twice, or it has already timed out."), claim.claim_id)
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
@ -278,27 +323,37 @@ class ResourceTracker(object):
|
||||
# un-claim the resources:
|
||||
if self.claims.pop(claim.claim_id, None):
|
||||
LOG.info(_("Aborting claim: %s") % claim)
|
||||
values = claim.undo_claim(self.compute_node)
|
||||
self.compute_node = self._update(context, values)
|
||||
# flag the instance as deleted to revert the resource usage
|
||||
# and associated stats:
|
||||
claim.instance['vm_state'] = vm_states.DELETED
|
||||
self._update_usage_from_instance(self.compute_node, claim.instance)
|
||||
self._update(context, self.compute_node)
|
||||
|
||||
else:
|
||||
# can't find the claim. this may mean the claim already timed
|
||||
# out or it was already explicitly finished/aborted.
|
||||
LOG.info(_("Claim %d not found. It either timed out or was "
|
||||
LOG.audit(_("Claim %s not found. It either timed out or was "
|
||||
"already explicitly finished/aborted"), claim.claim_id)
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def update_usage(self, context, instance):
|
||||
"""Update the resource usage and stats after a change in an
|
||||
instance
|
||||
"""
|
||||
if self.disabled:
|
||||
return
|
||||
|
||||
# don't update usage for this instance unless it submitted a resource
|
||||
# claim first:
|
||||
uuid = instance['uuid']
|
||||
if uuid in self.tracked_instances:
|
||||
self._update_usage_from_instance(self.compute_node, instance)
|
||||
self._update(context.elevated(), self.compute_node)
|
||||
|
||||
@property
|
||||
def disabled(self):
|
||||
return self.compute_node is None
|
||||
|
||||
def free_resources(self, context):
|
||||
"""A compute operation finished freeing up resources. Update compute
|
||||
model to reflect updated resource usage.
|
||||
|
||||
(The hypervisor may not immediately 'GC' all resources, so ask directly
|
||||
to see what's available to update the compute node model.)
|
||||
"""
|
||||
self.update_available_resource(context.elevated())
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def update_available_resource(self, context):
|
||||
"""Override in-memory calculations of compute node resource usage based
|
||||
@ -308,36 +363,33 @@ class ResourceTracker(object):
|
||||
declared a need for resources, but not necessarily retrieved them from
|
||||
the hypervisor layer yet.
|
||||
"""
|
||||
# ask hypervisor for its view of resource availability &
|
||||
# usage:
|
||||
resources = self.driver.get_available_resource()
|
||||
if not resources:
|
||||
# The virt driver does not support this function
|
||||
LOG.warn(_("Virt driver does not support "
|
||||
LOG.audit(_("Virt driver does not support "
|
||||
"'get_available_resource' Compute tracking is disabled."))
|
||||
self.compute_node = None
|
||||
self.claims = {}
|
||||
return
|
||||
|
||||
# Confirm resources dictionary contains expected keys:
|
||||
self._verify_resources(resources)
|
||||
|
||||
resources['free_ram_mb'] = resources['memory_mb'] - \
|
||||
resources['memory_mb_used']
|
||||
resources['free_disk_gb'] = resources['local_gb'] - \
|
||||
resources['local_gb_used']
|
||||
self._report_hypervisor_resource_view(resources)
|
||||
|
||||
LOG.audit(_("free_ram_mb: %s") % resources['free_ram_mb'])
|
||||
LOG.audit(_("free_disk_gb: %s") % resources['free_disk_gb'])
|
||||
# Apply resource claims representing in-progress operations to
|
||||
# 'resources'. This may over-estimate the amount of resources in use,
|
||||
# at least until the next time 'update_available_resource' runs.
|
||||
self._apply_claims(resources)
|
||||
self._purge_expired_claims()
|
||||
|
||||
# also generate all load stats:
|
||||
values = self._create_load_stats(context)
|
||||
resources.update(values)
|
||||
# Grab all instances assigned to this host:
|
||||
filters = {'host': self.host, 'deleted': False}
|
||||
instances = db.instance_get_all_by_filters(context, filters)
|
||||
|
||||
# Now calculate usage based on instance utilization:
|
||||
self._update_usage_from_instances(resources, instances)
|
||||
self._report_final_resource_view(resources)
|
||||
|
||||
self._sync_compute_node(context, resources)
|
||||
|
||||
def _sync_compute_node(self, context, resources):
|
||||
"""Create or update the compute node DB record"""
|
||||
if not self.compute_node:
|
||||
# we need a copy of the ComputeNode record:
|
||||
service = self._get_service(context)
|
||||
@ -352,80 +404,30 @@ class ResourceTracker(object):
|
||||
if not self.compute_node:
|
||||
# Need to create the ComputeNode record:
|
||||
resources['service_id'] = service['id']
|
||||
self.compute_node = self._create(context, resources)
|
||||
self._create(context, resources)
|
||||
LOG.info(_('Compute_service record created for %s ') % self.host)
|
||||
|
||||
else:
|
||||
# just update the record:
|
||||
self.compute_node = self._update(context, resources,
|
||||
prune_stats=True)
|
||||
self._update(context, resources, prune_stats=True)
|
||||
LOG.info(_('Compute_service record updated for %s ') % self.host)
|
||||
|
||||
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
|
||||
def update_load_stats_for_instance(self, context, instance_ref):
|
||||
"""Update workload stats for the local compute host."""
|
||||
|
||||
if self.disabled:
|
||||
return
|
||||
|
||||
values = {}
|
||||
self.stats.update_stats_for_instance(instance_ref)
|
||||
values['stats'] = self.stats
|
||||
|
||||
values['current_workload'] = self.stats.calculate_workload()
|
||||
values['running_vms'] = self.stats.num_instances
|
||||
values['vcpus_used'] = self.stats.num_vcpus_used
|
||||
|
||||
self.compute_node = self._update(context.elevated(), values)
|
||||
|
||||
def _apply_claims(self, resources):
|
||||
"""Apply in-progress resource claims to the 'resources' dict from the
|
||||
virt layer
|
||||
"""
|
||||
def _purge_expired_claims(self):
|
||||
"""Purge expired resource claims"""
|
||||
for claim_id in self.claims.keys():
|
||||
c = self.claims[claim_id]
|
||||
if c.is_expired():
|
||||
# if this claim is expired, just expunge it
|
||||
LOG.info(_("Expiring resource claim %d"), claim_id)
|
||||
# if this claim is expired, just expunge it.
|
||||
# it is assumed that the instance will eventually get built
|
||||
# successfully.
|
||||
LOG.audit(_("Expiring resource claim %s"), claim_id)
|
||||
self.claims.pop(claim_id)
|
||||
else:
|
||||
values = c.apply_claim(resources)
|
||||
resources.update(values)
|
||||
|
||||
def _create(self, context, values):
|
||||
"""Create the compute node in the DB"""
|
||||
# initialize load stats from existing instances:
|
||||
compute_node = db.compute_node_create(context, values)
|
||||
return compute_node
|
||||
|
||||
def _create_load_stats(self, context, instance=None):
|
||||
"""For each existing instance generate load stats for the compute
|
||||
node record.
|
||||
"""
|
||||
values = {}
|
||||
|
||||
if instance:
|
||||
instances = [instance]
|
||||
else:
|
||||
self.stats.clear() # re-generating all, so clear old stats
|
||||
|
||||
# grab all instances that are not yet DELETED
|
||||
filters = {'host': self.host, 'deleted': False}
|
||||
instances = db.instance_get_all_by_filters(context, filters)
|
||||
|
||||
for instance in instances:
|
||||
self.stats.update_stats_for_instance(instance)
|
||||
|
||||
values['current_workload'] = self.stats.calculate_workload()
|
||||
values['running_vms'] = self.stats.num_instances
|
||||
values['vcpus_used'] = self.stats.num_vcpus_used
|
||||
values['stats'] = self.stats
|
||||
return values
|
||||
|
||||
def _get_next_id(self):
|
||||
next_id = self.next_claim_id
|
||||
self.next_claim_id += 1
|
||||
return next_id
|
||||
self.compute_node = dict(compute_node)
|
||||
|
||||
def _get_service(self, context):
|
||||
try:
|
||||
@ -434,10 +436,105 @@ class ResourceTracker(object):
|
||||
except exception.NotFound:
|
||||
LOG.warn(_("No service record for host %s"), self.host)
|
||||
|
||||
def _report_hypervisor_resource_view(self, resources):
|
||||
"""Log the hypervisor's view of free memory in and free disk.
|
||||
This is just a snapshot of resource usage recorded by the
|
||||
virt driver.
|
||||
"""
|
||||
free_ram_mb = resources['memory_mb'] - resources['memory_mb_used']
|
||||
free_disk_gb = resources['local_gb'] - resources['local_gb_used']
|
||||
|
||||
LOG.debug(_("Hypervisor: free ram (MB): %s") % free_ram_mb)
|
||||
LOG.debug(_("Hypervisor: free disk (GB): %s") % free_disk_gb)
|
||||
|
||||
vcpus = resources['vcpus']
|
||||
if vcpus:
|
||||
free_vcpus = vcpus - resources['vcpus_used']
|
||||
LOG.debug(_("Hypervisor: free VCPUs: %s") % free_vcpus)
|
||||
else:
|
||||
LOG.debug(_("Hypervisor: VCPU information unavailable"))
|
||||
|
||||
def _report_final_resource_view(self, resources):
|
||||
"""Report final calculate of free memory and free disk including
|
||||
instance calculations and in-progress resource claims. These
|
||||
values will be exposed via the compute node table to the scheduler.
|
||||
"""
|
||||
LOG.audit(_("Free ram (MB): %s") % resources['free_ram_mb'])
|
||||
LOG.audit(_("Free disk (GB): %s") % resources['free_disk_gb'])
|
||||
|
||||
vcpus = resources['vcpus']
|
||||
if vcpus:
|
||||
free_vcpus = vcpus - resources['vcpus_used']
|
||||
LOG.audit(_("Free VCPUS: %s") % free_vcpus)
|
||||
else:
|
||||
LOG.audit(_("Free VCPU information unavailable"))
|
||||
|
||||
def _update(self, context, values, prune_stats=False):
|
||||
"""Persist the compute node updates to the DB"""
|
||||
return db.compute_node_update(context, self.compute_node['id'],
|
||||
values, prune_stats)
|
||||
compute_node = db.compute_node_update(context,
|
||||
self.compute_node['id'], values, prune_stats)
|
||||
self.compute_node = dict(compute_node)
|
||||
|
||||
def _update_usage_from_instance(self, resources, instance):
|
||||
"""Update usage for a single instance."""
|
||||
|
||||
uuid = instance['uuid']
|
||||
is_new_instance = uuid not in self.tracked_instances
|
||||
is_deleted_instance = instance['vm_state'] == vm_states.DELETED
|
||||
|
||||
if is_new_instance:
|
||||
self.tracked_instances[uuid] = 1
|
||||
sign = 1
|
||||
|
||||
if instance['vm_state'] == vm_states.DELETED:
|
||||
self.tracked_instances.pop(uuid)
|
||||
sign = -1
|
||||
|
||||
self.stats.update_stats_for_instance(instance)
|
||||
|
||||
# if it's a new or deleted instance:
|
||||
if is_new_instance or is_deleted_instance:
|
||||
# new instance, update compute node resource usage:
|
||||
resources['memory_mb_used'] += sign * instance['memory_mb']
|
||||
resources['local_gb_used'] += sign * instance['root_gb']
|
||||
resources['local_gb_used'] += sign * instance['ephemeral_gb']
|
||||
|
||||
# free ram and disk may be negative, depending on policy:
|
||||
resources['free_ram_mb'] = (resources['memory_mb'] -
|
||||
resources['memory_mb_used'])
|
||||
resources['free_disk_gb'] = (resources['local_gb'] -
|
||||
resources['local_gb_used'])
|
||||
|
||||
resources['running_vms'] = self.stats.num_instances
|
||||
resources['vcpus_used'] = self.stats.num_vcpus_used
|
||||
|
||||
resources['current_workload'] = self.stats.calculate_workload()
|
||||
resources['stats'] = self.stats
|
||||
|
||||
def _update_usage_from_instances(self, resources, instances):
|
||||
"""Calculate resource usage based on instance utilization. This is
|
||||
different than the hypervisor's view as it will account for all
|
||||
instances assigned to the local compute host, even if they are not
|
||||
currently powered on.
|
||||
"""
|
||||
self.tracked_instances.clear()
|
||||
|
||||
# purge old stats
|
||||
self.stats.clear()
|
||||
|
||||
# set some intiial values, reserve room for host/hypervisor:
|
||||
resources['local_gb_used'] = FLAGS.reserved_host_disk_mb / 1024
|
||||
resources['memory_mb_used'] = FLAGS.reserved_host_memory_mb
|
||||
resources['vcpus_used'] = 0
|
||||
resources['free_ram_mb'] = (resources['memory_mb'] -
|
||||
resources['memory_mb_used'])
|
||||
resources['free_disk_gb'] = (resources['local_gb'] -
|
||||
resources['local_gb_used'])
|
||||
resources['current_workload'] = 0
|
||||
resources['running_vms'] = 0
|
||||
|
||||
for instance in instances:
|
||||
self._update_usage_from_instance(resources, instance)
|
||||
|
||||
def _verify_resources(self, resources):
|
||||
resource_keys = ["vcpus", "memory_mb", "local_gb", "cpu_info",
|
||||
|
@ -47,8 +47,6 @@ from sqlalchemy.sql.expression import literal_column
|
||||
from sqlalchemy.sql import func
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DECLARE('reserved_host_disk_mb', 'nova.scheduler.host_manager')
|
||||
flags.DECLARE('reserved_host_memory_mb', 'nova.scheduler.host_manager')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -132,6 +132,9 @@ class FilterScheduler(driver.Scheduler):
|
||||
# Add a retry entry for the selected compute host:
|
||||
self._add_retry_host(filter_properties, weighted_host.host_state.host)
|
||||
|
||||
self._add_oversubscription_policy(filter_properties,
|
||||
weighted_host.host_state)
|
||||
|
||||
payload = dict(request_spec=request_spec,
|
||||
weighted_host=weighted_host.to_dict(),
|
||||
instance_id=instance_uuid)
|
||||
@ -160,6 +163,9 @@ class FilterScheduler(driver.Scheduler):
|
||||
hosts = retry['hosts']
|
||||
hosts.append(host)
|
||||
|
||||
def _add_oversubscription_policy(self, filter_properties, host_state):
|
||||
filter_properties['limits'] = host_state.limits
|
||||
|
||||
def _get_configuration_options(self):
|
||||
"""Fetch options dictionary. Broken out for testing."""
|
||||
return self.options.get_configuration()
|
||||
|
@ -47,4 +47,10 @@ class CoreFilter(filters.BaseHostFilter):
|
||||
|
||||
instance_vcpus = instance_type['vcpus']
|
||||
vcpus_total = host_state.vcpus_total * FLAGS.cpu_allocation_ratio
|
||||
|
||||
# Only provide a VCPU limit to compute if the virt driver is reporting
|
||||
# an accurate count of installed VCPUs. (XenServer driver does not)
|
||||
if vcpus_total > 0:
|
||||
host_state.limits['vcpu'] = vcpus_total
|
||||
|
||||
return (vcpus_total - host_state.vcpus_used) >= instance_vcpus
|
||||
|
54
nova/scheduler/filters/disk_filter.py
Normal file
54
nova/scheduler/filters/disk_filter.py
Normal file
@ -0,0 +1,54 @@
|
||||
# Copyright (c) 2012 OpenStack, LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from nova import flags
|
||||
from nova.openstack.common import cfg
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.scheduler import filters
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
disk_allocation_ratio_opt = cfg.FloatOpt("disk_allocation_ratio", default=1.0,
|
||||
help="virtual disk to physical disk allocation ratio")
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
FLAGS.register_opt(disk_allocation_ratio_opt)
|
||||
|
||||
|
||||
class DiskFilter(filters.BaseHostFilter):
|
||||
"""Disk Filter with over subscription flag"""
|
||||
|
||||
def host_passes(self, host_state, filter_properties):
|
||||
"""Filter based on disk usage"""
|
||||
instance_type = filter_properties.get('instance_type')
|
||||
requested_disk = 1024 * (instance_type['root_gb'] +
|
||||
instance_type['ephemeral_gb'])
|
||||
|
||||
free_disk_mb = host_state.free_disk_mb
|
||||
total_usable_disk_mb = host_state.total_usable_disk_gb * 1024
|
||||
|
||||
disk_mb_limit = total_usable_disk_mb * FLAGS.disk_allocation_ratio
|
||||
used_disk_mb = total_usable_disk_mb - free_disk_mb
|
||||
usable_disk_mb = disk_mb_limit - used_disk_mb
|
||||
|
||||
if not usable_disk_mb >= requested_disk:
|
||||
LOG.debug(_("%(host_state)s does not have %(requested_disk)s MB "
|
||||
"usable disk, it only has %(usable_disk_mb)s MB usable "
|
||||
"disk."), locals())
|
||||
return False
|
||||
|
||||
disk_gb_limit = disk_mb_limit / 1024
|
||||
host_state.limits['disk_gb'] = disk_gb_limit
|
||||
return True
|
@ -39,17 +39,15 @@ class RamFilter(filters.BaseHostFilter):
|
||||
free_ram_mb = host_state.free_ram_mb
|
||||
total_usable_ram_mb = host_state.total_usable_ram_mb
|
||||
|
||||
oversubscribed_ram_limit_mb = (total_usable_ram_mb *
|
||||
FLAGS.ram_allocation_ratio)
|
||||
memory_mb_limit = total_usable_ram_mb * FLAGS.ram_allocation_ratio
|
||||
used_ram_mb = total_usable_ram_mb - free_ram_mb
|
||||
usable_ram = oversubscribed_ram_limit_mb - used_ram_mb
|
||||
usable_ram = memory_mb_limit - used_ram_mb
|
||||
if not usable_ram >= requested_ram:
|
||||
LOG.debug(_("%(host_state)s does not have %(requested_ram)s MB "
|
||||
"usable ram, it only has %(usable_ram)s MB usable ram."),
|
||||
locals())
|
||||
return False
|
||||
|
||||
# save oversubscribe ram limit so the compute host can verify
|
||||
# memory availability on builds:
|
||||
filter_properties['memory_mb_limit'] = oversubscribed_ram_limit_mb
|
||||
# save oversubscription limit for compute node to test against:
|
||||
host_state.limits['memory_mb'] = memory_mb_limit
|
||||
return True
|
||||
|
@ -27,14 +27,7 @@ from nova.openstack.common import log as logging
|
||||
from nova.openstack.common import timeutils
|
||||
from nova.scheduler import filters
|
||||
|
||||
|
||||
host_manager_opts = [
|
||||
cfg.IntOpt('reserved_host_disk_mb',
|
||||
default=0,
|
||||
help='Amount of disk in MB to reserve for host/dom0'),
|
||||
cfg.IntOpt('reserved_host_memory_mb',
|
||||
default=512,
|
||||
help='Amount of memory in MB to reserve for host/dom0'),
|
||||
cfg.MultiStrOpt('scheduler_available_filters',
|
||||
default=['nova.scheduler.filters.standard_filters'],
|
||||
help='Filter classes available to the scheduler which may '
|
||||
@ -112,32 +105,31 @@ class HostState(object):
|
||||
self.service = ReadOnlyDict(service)
|
||||
# Mutable available resources.
|
||||
# These will change as resources are virtually "consumed".
|
||||
self.total_usable_disk_gb = 0
|
||||
self.disk_mb_used = 0
|
||||
self.free_ram_mb = 0
|
||||
self.free_disk_mb = 0
|
||||
self.vcpus_total = 0
|
||||
self.vcpus_used = 0
|
||||
|
||||
# Resource oversubscription values for the compute host:
|
||||
self.limits = {}
|
||||
|
||||
def update_from_compute_node(self, compute):
|
||||
"""Update information about a host from its compute_node info."""
|
||||
all_disk_mb = compute['local_gb'] * 1024
|
||||
all_ram_mb = compute['memory_mb']
|
||||
|
||||
# Assume virtual size is all consumed by instances if use qcow2 disk.
|
||||
least = compute.get('disk_available_least')
|
||||
free_disk_mb = least if least is not None else compute['free_disk_gb']
|
||||
free_disk_mb *= 1024
|
||||
free_ram_mb = compute['free_ram_mb']
|
||||
|
||||
if FLAGS.reserved_host_disk_mb > 0:
|
||||
all_disk_mb -= FLAGS.reserved_host_disk_mb
|
||||
free_disk_mb -= FLAGS.reserved_host_disk_mb
|
||||
if FLAGS.reserved_host_memory_mb > 0:
|
||||
all_ram_mb -= FLAGS.reserved_host_memory_mb
|
||||
free_ram_mb -= FLAGS.reserved_host_memory_mb
|
||||
self.disk_mb_used = compute['local_gb_used'] * 1024
|
||||
|
||||
#NOTE(jogo) free_ram_mb can be negative
|
||||
self.free_ram_mb = free_ram_mb
|
||||
self.free_ram_mb = compute['free_ram_mb']
|
||||
self.total_usable_ram_mb = all_ram_mb
|
||||
self.total_usable_disk_gb = compute['local_gb']
|
||||
self.free_disk_mb = free_disk_mb
|
||||
self.vcpus_total = compute['vcpus']
|
||||
self.vcpus_used = compute['vcpus_used']
|
||||
|
@ -22,11 +22,10 @@ class FakeResourceTracker(resource_tracker.ResourceTracker):
|
||||
"""Version without a DB requirement"""
|
||||
|
||||
def _create(self, context, values):
|
||||
return values
|
||||
self.compute_node = values
|
||||
|
||||
def _update(self, context, values, prune_stats=False):
|
||||
self.compute_node.update(values)
|
||||
return self.compute_node
|
||||
|
||||
def _get_service(self, context):
|
||||
return {
|
||||
|
@ -275,23 +275,36 @@ class ComputeTestCase(BaseTestCase):
|
||||
finally:
|
||||
db.instance_destroy(self.context, instance['uuid'])
|
||||
|
||||
def test_create_instance_insufficient_memory(self):
|
||||
def test_create_instance_unlimited_memory(self):
|
||||
"""Default of memory limit=None is unlimited"""
|
||||
self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0)
|
||||
self.compute.resource_tracker.update_available_resource(self.context)
|
||||
params = {"memory_mb": 999999999999}
|
||||
filter_properties = {'limits': {'memory_mb': None}}
|
||||
instance = self._create_fake_instance(params)
|
||||
self.assertRaises(exception.ComputeResourcesUnavailable,
|
||||
self.compute.run_instance, self.context, instance=instance)
|
||||
self.compute.run_instance(self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
self.assertEqual(999999999999,
|
||||
self.compute.resource_tracker.compute_node['memory_mb_used'])
|
||||
|
||||
def test_create_instance_insufficient_disk(self):
|
||||
def test_create_instance_unlimited_disk(self):
|
||||
self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0)
|
||||
self.compute.resource_tracker.update_available_resource(self.context)
|
||||
params = {"root_gb": 999999999999,
|
||||
"ephemeral_gb": 99999999999}
|
||||
filter_properties = {'limits': {'disk_gb': None}}
|
||||
instance = self._create_fake_instance(params)
|
||||
self.assertRaises(exception.ComputeResourcesUnavailable,
|
||||
self.compute.run_instance, self.context, instance=instance)
|
||||
self.compute.run_instance(self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
def test_create_multiple_instances_then_starve(self):
|
||||
self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0)
|
||||
self.compute.resource_tracker.update_available_resource(self.context)
|
||||
filter_properties = {'limits': {'memory_mb': 4096, 'disk_gb': 1000}}
|
||||
params = {"memory_mb": 1024, "root_gb": 128, "ephemeral_gb": 128}
|
||||
instance = self._create_fake_instance(params)
|
||||
self.compute.run_instance(self.context, instance=instance)
|
||||
self.compute.run_instance(self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
self.assertEquals(1024,
|
||||
self.compute.resource_tracker.compute_node['memory_mb_used'])
|
||||
self.assertEquals(256,
|
||||
@ -299,7 +312,8 @@ class ComputeTestCase(BaseTestCase):
|
||||
|
||||
params = {"memory_mb": 2048, "root_gb": 256, "ephemeral_gb": 256}
|
||||
instance = self._create_fake_instance(params)
|
||||
self.compute.run_instance(self.context, instance=instance)
|
||||
self.compute.run_instance(self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
self.assertEquals(3072,
|
||||
self.compute.resource_tracker.compute_node['memory_mb_used'])
|
||||
self.assertEquals(768,
|
||||
@ -308,11 +322,15 @@ class ComputeTestCase(BaseTestCase):
|
||||
params = {"memory_mb": 8192, "root_gb": 8192, "ephemeral_gb": 8192}
|
||||
instance = self._create_fake_instance(params)
|
||||
self.assertRaises(exception.ComputeResourcesUnavailable,
|
||||
self.compute.run_instance, self.context, instance=instance)
|
||||
self.compute.run_instance, self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
def test_create_instance_with_oversubscribed_ram(self):
|
||||
"""Test passing of oversubscribed ram policy from the scheduler."""
|
||||
|
||||
self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0)
|
||||
self.compute.resource_tracker.update_available_resource(self.context)
|
||||
|
||||
# get total memory as reported by virt driver:
|
||||
resources = self.compute.driver.get_available_resource()
|
||||
total_mem_mb = resources['memory_mb']
|
||||
@ -326,7 +344,8 @@ class ComputeTestCase(BaseTestCase):
|
||||
"ephemeral_gb": 128}
|
||||
instance = self._create_fake_instance(params)
|
||||
|
||||
filter_properties = dict(memory_mb_limit=oversub_limit_mb)
|
||||
limits = {'memory_mb': oversub_limit_mb}
|
||||
filter_properties = {'limits': limits}
|
||||
self.compute.run_instance(self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
@ -337,6 +356,9 @@ class ComputeTestCase(BaseTestCase):
|
||||
"""Test passing of oversubscribed ram policy from the scheduler, but
|
||||
with insufficient memory.
|
||||
"""
|
||||
self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0)
|
||||
self.compute.resource_tracker.update_available_resource(self.context)
|
||||
|
||||
# get total memory as reported by virt driver:
|
||||
resources = self.compute.driver.get_available_resource()
|
||||
total_mem_mb = resources['memory_mb']
|
||||
@ -350,12 +372,115 @@ class ComputeTestCase(BaseTestCase):
|
||||
"ephemeral_gb": 128}
|
||||
instance = self._create_fake_instance(params)
|
||||
|
||||
filter_properties = dict(memory_mb_limit=oversub_limit_mb)
|
||||
filter_properties = {'limits': {'memory_mb': oversub_limit_mb}}
|
||||
|
||||
self.assertRaises(exception.ComputeResourcesUnavailable,
|
||||
self.compute.run_instance, self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
def test_create_instance_with_oversubscribed_cpu(self):
|
||||
"""Test passing of oversubscribed cpu policy from the scheduler."""
|
||||
|
||||
self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0)
|
||||
self.compute.resource_tracker.update_available_resource(self.context)
|
||||
limits = {'vcpu': 3}
|
||||
filter_properties = {'limits': limits}
|
||||
|
||||
# get total memory as reported by virt driver:
|
||||
resources = self.compute.driver.get_available_resource()
|
||||
self.assertEqual(1, resources['vcpus'])
|
||||
|
||||
# build an instance, specifying an amount of memory that exceeds
|
||||
# total_mem_mb, but is less than the oversubscribed limit:
|
||||
params = {"memory_mb": 10, "root_gb": 1,
|
||||
"ephemeral_gb": 1, "vcpus": 2}
|
||||
instance = self._create_fake_instance(params)
|
||||
self.compute.run_instance(self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
self.assertEqual(2,
|
||||
self.compute.resource_tracker.compute_node['vcpus_used'])
|
||||
|
||||
# create one more instance:
|
||||
params = {"memory_mb": 10, "root_gb": 1,
|
||||
"ephemeral_gb": 1, "vcpus": 1}
|
||||
instance = self._create_fake_instance(params)
|
||||
self.compute.run_instance(self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
self.assertEqual(3,
|
||||
self.compute.resource_tracker.compute_node['vcpus_used'])
|
||||
|
||||
# delete the instance:
|
||||
instance['vm_state'] = vm_states.DELETED
|
||||
self.compute.resource_tracker.update_usage(self.context,
|
||||
instance=instance)
|
||||
|
||||
self.assertEqual(2,
|
||||
self.compute.resource_tracker.compute_node['vcpus_used'])
|
||||
|
||||
# now oversubscribe vcpus and fail:
|
||||
params = {"memory_mb": 10, "root_gb": 1,
|
||||
"ephemeral_gb": 1, "vcpus": 2}
|
||||
instance = self._create_fake_instance(params)
|
||||
|
||||
limits = {'vcpu': 3}
|
||||
filter_properties = {'limits': limits}
|
||||
self.assertRaises(exception.ComputeResourcesUnavailable,
|
||||
self.compute.run_instance, self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
def test_create_instance_with_oversubscribed_disk(self):
|
||||
"""Test passing of oversubscribed disk policy from the scheduler."""
|
||||
|
||||
self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0)
|
||||
self.compute.resource_tracker.update_available_resource(self.context)
|
||||
|
||||
# get total memory as reported by virt driver:
|
||||
resources = self.compute.driver.get_available_resource()
|
||||
total_disk_gb = resources['local_gb']
|
||||
|
||||
oversub_limit_gb = total_disk_gb * 1.5
|
||||
instance_gb = int(total_disk_gb * 1.45)
|
||||
|
||||
# build an instance, specifying an amount of disk that exceeds
|
||||
# total_disk_gb, but is less than the oversubscribed limit:
|
||||
params = {"root_gb": instance_gb, "memory_mb": 10}
|
||||
instance = self._create_fake_instance(params)
|
||||
|
||||
limits = {'disk_gb': oversub_limit_gb}
|
||||
filter_properties = {'limits': limits}
|
||||
self.compute.run_instance(self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
self.assertEqual(instance_gb,
|
||||
self.compute.resource_tracker.compute_node['local_gb_used'])
|
||||
|
||||
def test_create_instance_with_oversubscribed_disk_fail(self):
|
||||
"""Test passing of oversubscribed disk policy from the scheduler, but
|
||||
with insufficient disk.
|
||||
"""
|
||||
self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0)
|
||||
self.compute.resource_tracker.update_available_resource(self.context)
|
||||
|
||||
# get total memory as reported by virt driver:
|
||||
resources = self.compute.driver.get_available_resource()
|
||||
total_disk_gb = resources['local_gb']
|
||||
|
||||
oversub_limit_gb = total_disk_gb * 1.5
|
||||
instance_gb = int(total_disk_gb * 1.55)
|
||||
|
||||
# build an instance, specifying an amount of disk that exceeds
|
||||
# total_disk_gb, but is less than the oversubscribed limit:
|
||||
params = {"root_gb": instance_gb, "memory_mb": 10}
|
||||
instance = self._create_fake_instance(params)
|
||||
|
||||
limits = {'disk_gb': oversub_limit_gb}
|
||||
filter_properties = {'limits': limits}
|
||||
self.assertRaises(exception.ComputeResourcesUnavailable,
|
||||
self.compute.run_instance, self.context, instance=instance,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
def test_default_access_ip(self):
|
||||
self.flags(default_access_ip_network_name='test1')
|
||||
fake_network.unset_stub_network_methods(self.stubs)
|
||||
|
@ -17,17 +17,20 @@
|
||||
|
||||
"""Tests for compute resource tracking"""
|
||||
|
||||
import copy
|
||||
import uuid
|
||||
|
||||
from nova.compute import resource_tracker
|
||||
from nova.compute import task_states
|
||||
from nova.compute import vm_states
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common import timeutils
|
||||
from nova import test
|
||||
from nova.virt import driver
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FakeContext(object):
|
||||
def __init__(self, is_admin=False):
|
||||
@ -75,20 +78,12 @@ class BaseTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(BaseTestCase, self).setUp()
|
||||
|
||||
self.flags(reserved_host_disk_mb=0,
|
||||
reserved_host_memory_mb=0)
|
||||
|
||||
self.context = FakeContext()
|
||||
|
||||
self.instance_ref = {
|
||||
"memory_mb": 1,
|
||||
"root_gb": 1,
|
||||
"ephemeral_gb": 1,
|
||||
"vm_state": vm_states.BUILDING,
|
||||
"task_state": None,
|
||||
"os_type": "Linux",
|
||||
"project_id": "1234",
|
||||
"vcpus": 1,
|
||||
"uuid": "12-34-56-78-90",
|
||||
}
|
||||
|
||||
self._instances = []
|
||||
self.stubs.Set(db, 'instance_get_all_by_filters',
|
||||
self._fake_instance_get_all_by_filters)
|
||||
|
||||
@ -126,8 +121,26 @@ class BaseTestCase(test.TestCase):
|
||||
}
|
||||
return service
|
||||
|
||||
def _fake_instance(self, *args, **kwargs):
|
||||
instance = {
|
||||
'uuid': str(uuid.uuid1()),
|
||||
'vm_state': vm_states.BUILDING,
|
||||
'task_state': None,
|
||||
'memory_mb': 2,
|
||||
'root_gb': 3,
|
||||
'ephemeral_gb': 1,
|
||||
'os_type': 'Linux',
|
||||
'project_id': '123456',
|
||||
'vcpus': 1,
|
||||
'host': None,
|
||||
}
|
||||
instance.update(kwargs)
|
||||
|
||||
self._instances.append(instance)
|
||||
return instance
|
||||
|
||||
def _fake_instance_get_all_by_filters(self, ctx, filters, **kwargs):
|
||||
return []
|
||||
return self._instances
|
||||
|
||||
def _tracker(self, unsupported=False):
|
||||
host = "fakehost"
|
||||
@ -161,22 +174,16 @@ class UnsupportedDriverTestCase(BaseTestCase):
|
||||
claim = self.tracker.begin_resource_claim(self.context, 1, 1)
|
||||
self.assertEqual(None, claim)
|
||||
|
||||
def testDisabledContextClaim(self):
|
||||
# basic context manager variation:
|
||||
with self.tracker.resource_claim(self.context, 1, 1):
|
||||
pass
|
||||
self.assertEqual(0, len(self.tracker.claims))
|
||||
|
||||
def testDisabledInstanceClaim(self):
|
||||
# instance variation:
|
||||
claim = self.tracker.begin_instance_resource_claim(self.context,
|
||||
self.instance_ref)
|
||||
instance = self._fake_instance()
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance)
|
||||
self.assertEqual(None, claim)
|
||||
|
||||
def testDisabledInstanceContextClaim(self):
|
||||
# instance context manager variation:
|
||||
with self.tracker.instance_resource_claim(self.context,
|
||||
self.instance_ref):
|
||||
instance = self._fake_instance()
|
||||
with self.tracker.resource_claim(self.context, instance):
|
||||
pass
|
||||
self.assertEqual(0, len(self.tracker.claims))
|
||||
|
||||
@ -187,10 +194,10 @@ class UnsupportedDriverTestCase(BaseTestCase):
|
||||
self.assertEqual(None, self.tracker.abort_resource_claim(self.context,
|
||||
None))
|
||||
|
||||
def testDisabledFreeResources(self):
|
||||
self.tracker.free_resources(self.context)
|
||||
self.assertTrue(self.tracker.disabled)
|
||||
self.assertEqual(None, self.tracker.compute_node)
|
||||
def testDisabledUpdateUsage(self):
|
||||
instance = self._fake_instance(host='fakehost', memory_mb=5,
|
||||
root_gb=10)
|
||||
self.tracker.update_usage(self.context, instance)
|
||||
|
||||
|
||||
class MissingServiceTestCase(BaseTestCase):
|
||||
@ -253,9 +260,35 @@ class ResourceTestCase(BaseTestCase):
|
||||
prune_stats=False):
|
||||
self.updated = True
|
||||
values['stats'] = [{"key": "num_instances", "value": "1"}]
|
||||
|
||||
self.compute.update(values)
|
||||
return self.compute
|
||||
|
||||
def testUpdateUseOnlyForTracked(self):
|
||||
"""Only update usage is a previous claim has added instance to
|
||||
list of tracked instances.
|
||||
"""
|
||||
instance = self._fake_instance(memory_mb=3, root_gb=1, ephemeral_gb=1,
|
||||
task_state=None)
|
||||
self.tracker.update_usage(self.context, instance)
|
||||
|
||||
self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
|
||||
self.assertEqual(0, self.tracker.compute_node['current_workload'])
|
||||
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance)
|
||||
self.assertNotEqual(None, claim)
|
||||
self.assertEqual(3, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
|
||||
|
||||
# now update should actually take effect
|
||||
instance['task_state'] = task_states.SCHEDULING
|
||||
self.tracker.update_usage(self.context, instance)
|
||||
|
||||
self.assertEqual(3, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
|
||||
self.assertEqual(1, self.tracker.compute_node['current_workload'])
|
||||
|
||||
def testFreeRamResourceValue(self):
|
||||
driver = FakeVirtDriver()
|
||||
mem_free = driver.memory_mb - driver.memory_mb_used
|
||||
@ -270,42 +303,109 @@ class ResourceTestCase(BaseTestCase):
|
||||
self.assertFalse(self.tracker.disabled)
|
||||
self.assertTrue(self.updated)
|
||||
|
||||
def testInsufficientMemoryClaim(self):
|
||||
"""Exceed memory limit of 5MB"""
|
||||
claim = self.tracker.begin_resource_claim(self.context, memory_mb=2,
|
||||
disk_gb=0)
|
||||
def testCpuUnlimited(self):
|
||||
"""Test default of unlimited CPU"""
|
||||
self.assertEqual(0, self.tracker.compute_node['vcpus_used'])
|
||||
instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1,
|
||||
vcpus=100000)
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance)
|
||||
self.assertNotEqual(None, claim)
|
||||
self.assertEqual(100000, self.tracker.compute_node['vcpus_used'])
|
||||
|
||||
claim = self.tracker.begin_resource_claim(self.context, memory_mb=3,
|
||||
disk_gb=0)
|
||||
def testCpuOversubscription(self):
|
||||
"""Test client-supplied oversubscription of CPU"""
|
||||
self.assertEqual(1, self.tracker.compute_node['vcpus'])
|
||||
|
||||
instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1,
|
||||
vcpus=3)
|
||||
limits = {'vcpu': 5}
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance,
|
||||
limits)
|
||||
self.assertNotEqual(None, claim)
|
||||
self.assertEqual(3, self.tracker.compute_node['vcpus_used'])
|
||||
|
||||
claim = self.tracker.begin_resource_claim(self.context, memory_mb=1,
|
||||
disk_gb=0)
|
||||
self.assertEqual(None, claim)
|
||||
def testMemoryOversubscription(self):
|
||||
"""Test client-supplied oversubscription of memory"""
|
||||
instance = self._fake_instance(memory_mb=8, root_gb=1, ephemeral_gb=1)
|
||||
limits = {'memory_mb': 8}
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance,
|
||||
limits)
|
||||
self.assertNotEqual(None, claim)
|
||||
self.assertEqual(8, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
|
||||
|
||||
def testDiskOversubscription(self):
|
||||
"""Test client-supplied oversubscription of disk space"""
|
||||
instance = self._fake_instance(memory_mb=1, root_gb=10, ephemeral_gb=1)
|
||||
limits = {'disk_gb': 12}
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance,
|
||||
limits)
|
||||
self.assertNotEqual(None, claim)
|
||||
self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(11, self.tracker.compute_node['local_gb_used'])
|
||||
|
||||
def testUnlimitedMemoryClaim(self):
|
||||
"""Test default of unlimited memory"""
|
||||
instance = self._fake_instance(memory_mb=200000000000, root_gb=1,
|
||||
ephemeral_gb=1)
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance)
|
||||
self.assertNotEqual(None, claim)
|
||||
self.assertEqual(200000000000,
|
||||
self.tracker.compute_node['memory_mb_used'])
|
||||
|
||||
def testInsufficientMemoryClaimWithOversubscription(self):
|
||||
"""Exceed oversubscribed memory limit of 10MB"""
|
||||
claim = self.tracker.begin_resource_claim(self.context, memory_mb=10,
|
||||
disk_gb=0, memory_mb_limit=10)
|
||||
instance = self._fake_instance(memory_mb=10, root_gb=0,
|
||||
ephemeral_gb=0)
|
||||
limits = {'memory_mb': 10}
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance,
|
||||
limits)
|
||||
self.assertNotEqual(None, claim)
|
||||
|
||||
claim = self.tracker.begin_resource_claim(self.context, memory_mb=1,
|
||||
disk_gb=0, memory_mb_limit=10)
|
||||
instance = self._fake_instance(memory_mb=1, root_gb=0,
|
||||
ephemeral_gb=0)
|
||||
limits = {'memory_mb': 10}
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance,
|
||||
limits)
|
||||
self.assertEqual(None, claim)
|
||||
|
||||
def testInsufficientDiskClaim(self):
|
||||
"""Exceed disk limit of 5GB"""
|
||||
claim = self.tracker.begin_resource_claim(self.context, memory_mb=0,
|
||||
disk_gb=2)
|
||||
def testUnlimitDiskClaim(self):
|
||||
"""Test default of unlimited disk space"""
|
||||
instance = self._fake_instance(memory_mb=0, root_gb=200000000,
|
||||
ephemeral_gb=0)
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance)
|
||||
self.assertNotEqual(None, claim)
|
||||
self.assertEqual(200000000, self.tracker.compute_node['local_gb_used'])
|
||||
|
||||
def testInsufficientDiskClaimWithOversubscription(self):
|
||||
"""Exceed oversubscribed disk limit of 10GB"""
|
||||
instance = self._fake_instance(memory_mb=1, root_gb=4,
|
||||
ephemeral_gb=5) # 9 GB
|
||||
limits = {'disk_gb': 10}
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance,
|
||||
limits)
|
||||
self.assertNotEqual(None, claim)
|
||||
|
||||
claim = self.tracker.begin_resource_claim(self.context, memory_mb=0,
|
||||
disk_gb=3)
|
||||
self.assertNotEqual(None, claim)
|
||||
instance = self._fake_instance(memory_mb=1, root_gb=1,
|
||||
ephemeral_gb=1) # 2 GB
|
||||
limits = {'disk_gb': 10}
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance,
|
||||
limits)
|
||||
self.assertEqual(None, claim)
|
||||
|
||||
claim = self.tracker.begin_resource_claim(self.context, memory_mb=0,
|
||||
disk_gb=5)
|
||||
def testInsufficientCpuClaim(self):
|
||||
instance = self._fake_instance(memory_mb=0, root_gb=0,
|
||||
ephemeral_gb=0, vcpus=1)
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance)
|
||||
self.assertNotEqual(None, claim)
|
||||
self.assertEqual(1, self.tracker.compute_node['vcpus_used'])
|
||||
|
||||
instance = self._fake_instance(memory_mb=0, root_gb=0,
|
||||
ephemeral_gb=0, vcpus=1)
|
||||
|
||||
limits = {'vcpu': 1}
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance,
|
||||
limits)
|
||||
self.assertEqual(None, claim)
|
||||
|
||||
def testClaimAndFinish(self):
|
||||
@ -317,8 +417,9 @@ class ResourceTestCase(BaseTestCase):
|
||||
|
||||
claim_mem = 3
|
||||
claim_disk = 2
|
||||
claim = self.tracker.begin_resource_claim(self.context, claim_mem,
|
||||
claim_disk)
|
||||
instance = self._fake_instance(memory_mb=claim_mem, root_gb=claim_disk,
|
||||
ephemeral_gb=0)
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance)
|
||||
|
||||
self.assertEqual(5, self.compute["memory_mb"])
|
||||
self.assertEqual(claim_mem, self.compute["memory_mb_used"])
|
||||
@ -334,35 +435,26 @@ class ResourceTestCase(BaseTestCase):
|
||||
driver.memory_mb_used = claim_mem
|
||||
driver.local_gb_used = claim_disk
|
||||
|
||||
# 2nd update compute node from the virt layer. because the claim is
|
||||
# in-progress (unfinished), the audit will actually mark the resources
|
||||
# as unsubscribed:
|
||||
self.tracker.update_available_resource(self.context)
|
||||
|
||||
self.assertEqual(2 * claim_mem,
|
||||
self.compute['memory_mb_used'])
|
||||
self.assertEqual(5 - (2 * claim_mem),
|
||||
self.compute['free_ram_mb'])
|
||||
# confirm that resource usage is derived from instance usages,
|
||||
# not virt layer:
|
||||
self.assertEqual(claim_mem, self.compute['memory_mb_used'])
|
||||
self.assertEqual(5 - claim_mem, self.compute['free_ram_mb'])
|
||||
|
||||
self.assertEqual(2 * claim_disk,
|
||||
self.compute['local_gb_used'])
|
||||
self.assertEqual(6 - (2 * claim_disk),
|
||||
self.compute['free_disk_gb'])
|
||||
self.assertEqual(claim_disk, self.compute['local_gb_used'])
|
||||
self.assertEqual(6 - claim_disk, self.compute['free_disk_gb'])
|
||||
|
||||
# Finally, finish the claimm and update from the virt layer again.
|
||||
# Resource usage will be consistent again:
|
||||
self.tracker.finish_resource_claim(claim)
|
||||
self.tracker.update_available_resource(self.context)
|
||||
|
||||
self.assertEqual(claim_mem,
|
||||
self.compute['memory_mb_used'])
|
||||
self.assertEqual(5 - claim_mem,
|
||||
self.compute['free_ram_mb'])
|
||||
self.assertEqual(claim_mem, self.compute['memory_mb_used'])
|
||||
self.assertEqual(5 - claim_mem, self.compute['free_ram_mb'])
|
||||
|
||||
self.assertEqual(claim_disk,
|
||||
self.compute['local_gb_used'])
|
||||
self.assertEqual(6 - claim_disk,
|
||||
self.compute['free_disk_gb'])
|
||||
self.assertEqual(claim_disk, self.compute['local_gb_used'])
|
||||
self.assertEqual(6 - claim_disk, self.compute['free_disk_gb'])
|
||||
|
||||
def testClaimAndAbort(self):
|
||||
self.assertEqual(5, self.tracker.compute_node['memory_mb'])
|
||||
@ -373,8 +465,10 @@ class ResourceTestCase(BaseTestCase):
|
||||
|
||||
claim_mem = 3
|
||||
claim_disk = 2
|
||||
claim = self.tracker.begin_resource_claim(self.context, claim_mem,
|
||||
claim_disk)
|
||||
instance = self._fake_instance(memory_mb=claim_mem,
|
||||
root_gb=claim_disk, ephemeral_gb=0)
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance)
|
||||
self.assertNotEqual(None, claim)
|
||||
|
||||
self.assertEqual(5, self.compute["memory_mb"])
|
||||
self.assertEqual(claim_mem, self.compute["memory_mb_used"])
|
||||
@ -398,14 +492,14 @@ class ResourceTestCase(BaseTestCase):
|
||||
"""Test that old claims get cleaned up automatically if not finished
|
||||
or aborted explicitly.
|
||||
"""
|
||||
claim = self.tracker.begin_resource_claim(self.context, memory_mb=2,
|
||||
disk_gb=2)
|
||||
instance = self._fake_instance(memory_mb=2, root_gb=2, ephemeral_gb=0)
|
||||
claim = self.tracker.begin_resource_claim(self.context, instance)
|
||||
claim.expire_ts = timeutils.utcnow_ts() - 1
|
||||
self.assertTrue(claim.is_expired())
|
||||
|
||||
# and an unexpired claim
|
||||
claim2 = self.tracker.begin_resource_claim(self.context, memory_mb=1,
|
||||
disk_gb=1)
|
||||
instance2 = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=0)
|
||||
claim2 = self.tracker.begin_resource_claim(self.context, instance2)
|
||||
|
||||
self.assertEqual(2, len(self.tracker.claims))
|
||||
self.assertEqual(2 + 1, self.tracker.compute_node['memory_mb_used'])
|
||||
@ -415,34 +509,30 @@ class ResourceTestCase(BaseTestCase):
|
||||
self.tracker.update_available_resource(self.context)
|
||||
|
||||
self.assertEqual(1, len(self.tracker.claims))
|
||||
self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(1, self.tracker.compute_node['local_gb_used'])
|
||||
self.assertEqual(2, len(self.tracker.tracked_instances))
|
||||
|
||||
# and just call finish & abort to ensure expired claims do not cause
|
||||
# any other explosions:
|
||||
# the expired claim's instance is assumed to still exist, so the
|
||||
# resources should be counted:
|
||||
self.assertEqual(2 + 1, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(2 + 1, self.tracker.compute_node['local_gb_used'])
|
||||
|
||||
# this abort should do nothing because the claim was purged due to
|
||||
# expiration:
|
||||
self.tracker.abort_resource_claim(self.context, claim)
|
||||
self.tracker.finish_resource_claim(claim)
|
||||
|
||||
# call finish on claim2:
|
||||
self.tracker.finish_resource_claim(claim2)
|
||||
|
||||
# should have usage from both instances:
|
||||
self.assertEqual(1 + 2, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(1 + 2, self.tracker.compute_node['local_gb_used'])
|
||||
|
||||
def testInstanceClaim(self):
|
||||
self.tracker.begin_instance_resource_claim(self.context,
|
||||
self.instance_ref)
|
||||
instance = self._fake_instance(memory_mb=1, root_gb=0, ephemeral_gb=2)
|
||||
self.tracker.begin_resource_claim(self.context, instance)
|
||||
self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
|
||||
|
||||
def testContextClaim(self):
|
||||
with self.tracker.resource_claim(self.context, memory_mb=1, disk_gb=1):
|
||||
# <insert exciting things that utilize resources>
|
||||
self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(1, self.tracker.compute_node['local_gb_used'])
|
||||
self.assertEqual(1, self.compute['memory_mb_used'])
|
||||
self.assertEqual(1, self.compute['local_gb_used'])
|
||||
|
||||
self.tracker.update_available_resource(self.context)
|
||||
self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
|
||||
self.assertEqual(0, self.compute['memory_mb_used'])
|
||||
self.assertEqual(0, self.compute['local_gb_used'])
|
||||
|
||||
def testContextClaimWithException(self):
|
||||
try:
|
||||
with self.tracker.resource_claim(self.context, memory_mb=1,
|
||||
@ -459,34 +549,63 @@ class ResourceTestCase(BaseTestCase):
|
||||
self.assertEqual(0, self.compute['local_gb_used'])
|
||||
|
||||
def testInstanceContextClaim(self):
|
||||
with self.tracker.instance_resource_claim(self.context,
|
||||
self.instance_ref):
|
||||
instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1)
|
||||
with self.tracker.resource_claim(self.context, instance):
|
||||
# <insert exciting things that utilize resources>
|
||||
self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
|
||||
self.assertEqual(1, self.compute['memory_mb_used'])
|
||||
self.assertEqual(2, self.compute['local_gb_used'])
|
||||
|
||||
# after exiting claim context, build is marked as finished. usage
|
||||
# totals should be same:
|
||||
self.tracker.update_available_resource(self.context)
|
||||
self.assertEqual(0, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(0, self.tracker.compute_node['local_gb_used'])
|
||||
self.assertEqual(0, self.compute['memory_mb_used'])
|
||||
self.assertEqual(0, self.compute['local_gb_used'])
|
||||
self.assertEqual(1, self.tracker.compute_node['memory_mb_used'])
|
||||
self.assertEqual(2, self.tracker.compute_node['local_gb_used'])
|
||||
self.assertEqual(1, self.compute['memory_mb_used'])
|
||||
self.assertEqual(2, self.compute['local_gb_used'])
|
||||
|
||||
def testUpdateLoadStatsForInstance(self):
|
||||
self.assertFalse(self.tracker.disabled)
|
||||
self.assertEqual(0, self.tracker.compute_node['current_workload'])
|
||||
|
||||
self.instance_ref['task_state'] = task_states.SCHEDULING
|
||||
with self.tracker.instance_resource_claim(self.context,
|
||||
self.instance_ref):
|
||||
instance = self._fake_instance(task_state=task_states.SCHEDULING)
|
||||
with self.tracker.resource_claim(self.context, instance):
|
||||
pass
|
||||
|
||||
self.assertEqual(1, self.tracker.compute_node['current_workload'])
|
||||
|
||||
self.instance_ref['vm_state'] = vm_states.ACTIVE
|
||||
self.instance_ref['task_state'] = None
|
||||
instance['vm_state'] = vm_states.ACTIVE
|
||||
instance['task_state'] = None
|
||||
instance['host'] = 'fakehost'
|
||||
|
||||
self.tracker.update_load_stats_for_instance(self.context,
|
||||
self.instance_ref)
|
||||
self.tracker.update_usage(self.context, instance)
|
||||
self.assertEqual(0, self.tracker.compute_node['current_workload'])
|
||||
|
||||
def testCpuStats(self):
|
||||
limits = {'disk_gb': 100, 'memory_mb': 100}
|
||||
self.assertEqual(0, self.tracker.compute_node['vcpus_used'])
|
||||
|
||||
instance = self._fake_instance(vcpus=1)
|
||||
|
||||
# should not do anything until a claim is made:
|
||||
self.tracker.update_usage(self.context, instance)
|
||||
self.assertEqual(0, self.tracker.compute_node['vcpus_used'])
|
||||
|
||||
with self.tracker.resource_claim(self.context, instance, limits):
|
||||
pass
|
||||
self.assertEqual(1, self.tracker.compute_node['vcpus_used'])
|
||||
|
||||
# instance state can change without modifying vcpus in use:
|
||||
instance['task_state'] = task_states.SCHEDULING
|
||||
self.tracker.update_usage(self.context, instance)
|
||||
self.assertEqual(1, self.tracker.compute_node['vcpus_used'])
|
||||
|
||||
instance = self._fake_instance(vcpus=10)
|
||||
with self.tracker.resource_claim(self.context, instance, limits):
|
||||
pass
|
||||
self.assertEqual(11, self.tracker.compute_node['vcpus_used'])
|
||||
|
||||
instance['vm_state'] = vm_states.DELETED
|
||||
self.tracker.update_usage(self.context, instance)
|
||||
self.assertEqual(1, self.tracker.compute_node['vcpus_used'])
|
||||
|
@ -28,16 +28,20 @@ from nova.scheduler import host_manager
|
||||
COMPUTE_NODES = [
|
||||
dict(id=1, local_gb=1024, memory_mb=1024, vcpus=1,
|
||||
disk_available_least=512, free_ram_mb=512, vcpus_used=1,
|
||||
free_disk_mb=512, service=dict(host='host1', disabled=False)),
|
||||
free_disk_mb=512, local_gb_used=0,
|
||||
service=dict(host='host1', disabled=False)),
|
||||
dict(id=2, local_gb=2048, memory_mb=2048, vcpus=2,
|
||||
disk_available_least=1024, free_ram_mb=1024, vcpus_used=2,
|
||||
free_disk_mb=1024, service=dict(host='host2', disabled=True)),
|
||||
free_disk_mb=1024, local_gb_used=0,
|
||||
service=dict(host='host2', disabled=True)),
|
||||
dict(id=3, local_gb=4096, memory_mb=4096, vcpus=4,
|
||||
disk_available_least=3072, free_ram_mb=3072, vcpus_used=1,
|
||||
free_disk_mb=3072, service=dict(host='host3', disabled=False)),
|
||||
free_disk_mb=3072, local_gb_used=0,
|
||||
service=dict(host='host3', disabled=False)),
|
||||
dict(id=4, local_gb=8192, memory_mb=8192, vcpus=8,
|
||||
disk_available_least=8192, free_ram_mb=8192, vcpus_used=0,
|
||||
free_disk_mb=8192, service=dict(host='host4', disabled=False)),
|
||||
free_disk_mb=8192, local_gb_used=0,
|
||||
service=dict(host='host4', disabled=False)),
|
||||
# Broken entry
|
||||
dict(id=5, local_gb=1024, memory_mb=1024, vcpus=1, service=None),
|
||||
]
|
||||
|
@ -216,7 +216,6 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
||||
self.assertEqual(info['called'], 0)
|
||||
|
||||
def test_get_cost_functions(self):
|
||||
self.flags(reserved_host_memory_mb=128)
|
||||
fixture = fakes.FakeFilterScheduler()
|
||||
fns = fixture.get_cost_functions()
|
||||
self.assertEquals(len(fns), 1)
|
||||
@ -225,8 +224,9 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
||||
hostinfo = host_manager.HostState('host', 'compute')
|
||||
hostinfo.update_from_compute_node(dict(memory_mb=1000,
|
||||
local_gb=0, vcpus=1, disk_available_least=1000,
|
||||
free_disk_mb=1000, free_ram_mb=1000, vcpus_used=0))
|
||||
self.assertEquals(1000 - 128, fn(hostinfo, {}))
|
||||
free_disk_mb=1000, free_ram_mb=872, vcpus_used=0,
|
||||
local_gb_used=0))
|
||||
self.assertEquals(872, fn(hostinfo, {}))
|
||||
|
||||
def test_max_attempts(self):
|
||||
self.flags(scheduler_max_attempts=4)
|
||||
|
@ -514,6 +514,18 @@ class HostFiltersTestCase(test.TestCase):
|
||||
'capabilities': capabilities, 'service': service})
|
||||
self.assertFalse(filt_cls.host_passes(host, filter_properties))
|
||||
|
||||
def test_ram_filter_passes(self):
|
||||
self._stub_service_is_up(True)
|
||||
filt_cls = self.class_map['RamFilter']()
|
||||
self.flags(ram_allocation_ratio=1.0)
|
||||
filter_properties = {'instance_type': {'memory_mb': 1024}}
|
||||
capabilities = {'enabled': True}
|
||||
service = {'disabled': False}
|
||||
host = fakes.FakeHostState('host1', 'compute',
|
||||
{'free_ram_mb': 1024, 'total_usable_ram_mb': 1024,
|
||||
'capabilities': capabilities, 'service': service})
|
||||
self.assertTrue(filt_cls.host_passes(host, filter_properties))
|
||||
|
||||
def test_ram_filter_oversubscribe(self):
|
||||
self._stub_service_is_up(True)
|
||||
filt_cls = self.class_map['RamFilter']()
|
||||
@ -525,24 +537,62 @@ class HostFiltersTestCase(test.TestCase):
|
||||
{'free_ram_mb': -1024, 'total_usable_ram_mb': 2048,
|
||||
'capabilities': capabilities, 'service': service})
|
||||
self.assertTrue(filt_cls.host_passes(host, filter_properties))
|
||||
self.assertEqual(2048 * 2.0, host.limits['memory_mb'])
|
||||
|
||||
def test_ram_filter_sets_memory_limit(self):
|
||||
"""Test that ram filter sets a filter_property denoting the memory
|
||||
ceiling.
|
||||
"""
|
||||
def test_disk_filter_passes(self):
|
||||
self._stub_service_is_up(True)
|
||||
filt_cls = self.class_map['RamFilter']()
|
||||
self.flags(ram_allocation_ratio=2.0)
|
||||
filter_properties = {'instance_type': {'memory_mb': 1024}}
|
||||
filt_cls = self.class_map['DiskFilter']()
|
||||
self.flags(disk_allocation_ratio=1.0)
|
||||
filter_properties = {'instance_type': {'root_gb': 1,
|
||||
'ephemeral_gb': 1}}
|
||||
capabilities = {'enabled': True}
|
||||
service = {'disabled': False}
|
||||
host = fakes.FakeHostState('host1', 'compute',
|
||||
{'free_ram_mb': -1024, 'total_usable_ram_mb': 2048,
|
||||
{'free_disk_mb': 11 * 1024, 'total_usable_disk_gb': 13,
|
||||
'capabilities': capabilities, 'service': service})
|
||||
filt_cls.host_passes(host, filter_properties)
|
||||
self.assertTrue(filt_cls.host_passes(host, filter_properties))
|
||||
|
||||
self.assertEqual(host.total_usable_ram_mb * 2.0,
|
||||
filter_properties['memory_mb_limit'])
|
||||
def test_disk_filter_fails(self):
|
||||
self._stub_service_is_up(True)
|
||||
filt_cls = self.class_map['DiskFilter']()
|
||||
self.flags(disk_allocation_ratio=1.0)
|
||||
filter_properties = {'instance_type': {'root_gb': 2,
|
||||
'ephemeral_gb': 1}}
|
||||
capabilities = {'enabled': True}
|
||||
service = {'disabled': False}
|
||||
host = fakes.FakeHostState('host1', 'compute',
|
||||
{'free_disk_mb': 11 * 1024, 'total_usable_disk_gb': 13,
|
||||
'capabilities': capabilities, 'service': service})
|
||||
self.assertTrue(filt_cls.host_passes(host, filter_properties))
|
||||
|
||||
def test_disk_filter_oversubscribe(self):
|
||||
self._stub_service_is_up(True)
|
||||
filt_cls = self.class_map['DiskFilter']()
|
||||
self.flags(disk_allocation_ratio=10.0)
|
||||
filter_properties = {'instance_type': {'root_gb': 100,
|
||||
'ephemeral_gb': 19}}
|
||||
capabilities = {'enabled': True}
|
||||
service = {'disabled': False}
|
||||
# 1GB used... so 119GB allowed...
|
||||
host = fakes.FakeHostState('host1', 'compute',
|
||||
{'free_disk_mb': 11 * 1024, 'total_usable_disk_gb': 12,
|
||||
'capabilities': capabilities, 'service': service})
|
||||
self.assertTrue(filt_cls.host_passes(host, filter_properties))
|
||||
self.assertEqual(12 * 10.0, host.limits['disk_gb'])
|
||||
|
||||
def test_disk_filter_oversubscribe_fail(self):
|
||||
self._stub_service_is_up(True)
|
||||
filt_cls = self.class_map['DiskFilter']()
|
||||
self.flags(disk_allocation_ratio=10.0)
|
||||
filter_properties = {'instance_type': {'root_gb': 100,
|
||||
'ephemeral_gb': 20}}
|
||||
capabilities = {'enabled': True}
|
||||
service = {'disabled': False}
|
||||
# 1GB used... so 119GB allowed...
|
||||
host = fakes.FakeHostState('host1', 'compute',
|
||||
{'free_disk_mb': 11 * 1024, 'total_usable_disk_gb': 12,
|
||||
'capabilities': capabilities, 'service': service})
|
||||
self.assertFalse(filt_cls.host_passes(host, filter_properties))
|
||||
|
||||
def test_compute_filter_fails_on_service_disabled(self):
|
||||
self._stub_service_is_up(True)
|
||||
|
@ -122,8 +122,6 @@ class HostManagerTestCase(test.TestCase):
|
||||
self.assertDictMatch(service_states, expected)
|
||||
|
||||
def test_get_all_host_states(self):
|
||||
self.flags(reserved_host_memory_mb=512,
|
||||
reserved_host_disk_mb=1024)
|
||||
|
||||
context = 'fake_context'
|
||||
topic = 'compute'
|
||||
@ -145,18 +143,18 @@ class HostManagerTestCase(test.TestCase):
|
||||
host = compute_node['service']['host']
|
||||
self.assertEqual(host_states[host].service,
|
||||
compute_node['service'])
|
||||
self.assertEqual(host_states['host1'].free_ram_mb, 0)
|
||||
self.assertEqual(host_states['host1'].free_ram_mb, 512)
|
||||
# 511GB
|
||||
self.assertEqual(host_states['host1'].free_disk_mb, 523264)
|
||||
self.assertEqual(host_states['host2'].free_ram_mb, 512)
|
||||
self.assertEqual(host_states['host1'].free_disk_mb, 524288)
|
||||
self.assertEqual(host_states['host2'].free_ram_mb, 1024)
|
||||
# 1023GB
|
||||
self.assertEqual(host_states['host2'].free_disk_mb, 1047552)
|
||||
self.assertEqual(host_states['host3'].free_ram_mb, 2560)
|
||||
self.assertEqual(host_states['host2'].free_disk_mb, 1048576)
|
||||
self.assertEqual(host_states['host3'].free_ram_mb, 3072)
|
||||
# 3071GB
|
||||
self.assertEqual(host_states['host3'].free_disk_mb, 3144704)
|
||||
self.assertEqual(host_states['host4'].free_ram_mb, 7680)
|
||||
self.assertEqual(host_states['host3'].free_disk_mb, 3145728)
|
||||
self.assertEqual(host_states['host4'].free_ram_mb, 8192)
|
||||
# 8191GB
|
||||
self.assertEqual(host_states['host4'].free_disk_mb, 8387584)
|
||||
self.assertEqual(host_states['host4'].free_disk_mb, 8388608)
|
||||
|
||||
|
||||
class HostStateTestCase(test.TestCase):
|
||||
|
@ -231,7 +231,7 @@ class FakeDriver(driver.ComputeDriver):
|
||||
"""
|
||||
|
||||
dic = {'vcpus': 1,
|
||||
'memory_mb': 4096,
|
||||
'memory_mb': 8192,
|
||||
'local_gb': 1028,
|
||||
'vcpus_used': 0,
|
||||
'memory_mb_used': 0,
|
||||
|
Loading…
x
Reference in New Issue
Block a user