ComputeNode Capacity support

The table represents the current state of compute nodes and will be
used by the scheduler when selecting a host.

Re: https://blueprints.launchpad.net/nova/+spec/scaling-zones

This is just the db & notification portion of the branch. The scheduler
portion is being deferring until comstuds branch gets merged since it
conflicts heavily.

NOTE: Compute notifications are now two-part.
There is a compute.instance.XXX.start event and a compute.instance.XXX.end
message instead of the previous compute.instance.XXX event (which is
the same as the .end message)

Change-Id: Ia8e68680cb0924c59df84f2eec858febf4926d65
This commit is contained in:
Sandy Walsh 2011-12-13 15:27:41 -08:00
parent b21b83a961
commit 72d710217e
3 changed files with 178 additions and 7 deletions
nova

@ -187,12 +187,31 @@ def compute_node_update(context, compute_id, values):
"""Set the given properties on an computeNode and update it.
Raises NotFound if computeNode does not exist.
"""
return IMPL.compute_node_update(context, compute_id, values)
def compute_node_get_by_host(context, host):
return IMPL.compute_node_get_by_host(context, host)
def compute_node_capacity_find(context, minimum_ram_mb, minimum_disk_gb):
return IMPL.compute_node_capacity_find(context, minimum_ram_mb,
minimum_disk_gb)
def compute_node_utilization_update(context, host, free_ram_mb_delta=0,
free_disk_gb_delta=0, work_delta=0, vm_delta=0):
return IMPL.compute_node_utilization_update(context, host,
free_ram_mb_delta, free_disk_gb_delta, work_delta,
vm_delta)
def compute_node_utilization_set(context, host, free_ram_mb=None,
free_disk_gb=None, work=None, vms=None):
return IMPL.compute_node_utilization_set(context, host, free_ram_mb,
free_disk_gb, work, vms)
###################

@ -44,6 +44,9 @@ from sqlalchemy.sql.expression import desc
from sqlalchemy.sql.expression import literal_column
FLAGS = flags.FLAGS
flags.DECLARE('reserved_host_disk_mb', 'nova.scheduler.host_manager')
flags.DECLARE('reserved_host_memory_mb', 'nova.scheduler.host_manager')
LOG = logging.getLogger("nova.db.sqlalchemy")
@ -387,6 +390,21 @@ def compute_node_get(context, compute_id, session=None):
return result
@require_admin_context
def compute_node_get_by_service(context, service_id, session=None):
if not session:
session = get_session()
result = model_query(context, models.ComputeNode, session=session).\
filter_by(service_id=service_id).\
first()
if not result:
raise exception.ComputeHostNotFound(host="ServiceID=%s" % service_id)
return result
@require_admin_context
def compute_node_get_all(context, session=None):
return model_query(context, models.ComputeNode, session=session).\
@ -402,23 +420,155 @@ def compute_node_get_for_service(context, service_id):
first()
def _get_host_utilization(context, host, ram_mb, disk_gb):
"""Compute the current utilization of a given host."""
instances = instance_get_all_by_host(context, host)
vms = len(instances)
free_ram_mb = ram_mb - FLAGS.reserved_host_memory_mb
free_disk_gb = disk_gb - (FLAGS.reserved_host_disk_mb * 1024)
work = 0
for instance in instances:
free_ram_mb -= instance.memory_mb
free_disk_gb -= instance.local_gb
if instance.vm_state in [vm_states.BUILDING, vm_states.REBUILDING,
vm_states.MIGRATING, vm_states.RESIZING]:
work += 1
return dict(free_ram_mb=free_ram_mb,
free_disk_gb=free_disk_gb,
current_workload=work,
running_vms=vms)
def _adjust_compute_node_values_for_utilization(context, values, session):
service_ref = service_get(context, values['service_id'], session=session)
host = service_ref['host']
ram_mb = values['memory_mb']
disk_gb = values['local_gb']
values.update(_get_host_utilization(context, host, ram_mb, disk_gb))
@require_admin_context
def compute_node_create(context, values):
compute_node_ref = models.ComputeNode()
compute_node_ref.update(values)
compute_node_ref.save()
def compute_node_create(context, values, session=None):
"""Creates a new ComputeNode and populates the capacity fields
with the most recent data."""
if not session:
session = get_session()
_adjust_compute_node_values_for_utilization(context, values, session)
with session.begin(subtransactions=True):
compute_node_ref = models.ComputeNode()
session.add(compute_node_ref)
compute_node_ref.update(values)
return compute_node_ref
@require_admin_context
def compute_node_update(context, compute_id, values):
"""Creates a new ComputeNode and populates the capacity fields
with the most recent data."""
session = get_session()
with session.begin():
_adjust_compute_node_values_for_utilization(context, values, session)
with session.begin(subtransactions=True):
compute_ref = compute_node_get(context, compute_id, session=session)
compute_ref.update(values)
compute_ref.save(session=session)
# Note: these operations use with_lockmode() ... so this will only work
# reliably with engines that support row-level locking
# (postgres, mysql+innodb and above).
def compute_node_get_by_host(context, host):
"""Get all capacity entries for the given host."""
session = get_session()
with session.begin():
node = session.query(models.ComputeNode).\
options(joinedload('service')).\
filter(models.Service.host == host).\
filter_by(deleted=False).\
with_lockmode('update')
return node.first()
def compute_node_capacity_find(context, minimum_ram_mb, minimum_disk_gb):
"""Get all enabled hosts with enough ram and disk."""
session = get_session()
with session.begin():
return session.query(models.ComputeNode).\
options(joinedload('service')).\
filter(models.ComputeNode.free_ram_mb >= minimum_ram_mb).\
filter(models.ComputeNode.free_disk_gb >= minimum_disk_gb).\
filter(models.Service.disabled == False).\
filter_by(deleted=False).\
with_lockmode('update').all()
def compute_node_utilization_update(context, host, free_ram_mb_delta=0,
free_disk_gb_delta=0, work_delta=0, vm_delta=0):
"""Update a specific ComputeNode entry by a series of deltas.
Do this as a single atomic action and lock the row for the
duration of the operation. Requires that ComputeNode record exist."""
session = get_session()
compute_node = None
with session.begin(subtransactions=True):
compute_node = session.query(models.ComputeNode).\
options(joinedload('service')).\
filter(models.Service.host == host).\
filter_by(deleted=False).\
with_lockmode('update').\
first()
if compute_node is None:
raise exception.NotFound(_("No ComputeNode for %(host)s" %
locals()))
# This table thingy is how we get atomic UPDATE x = x + 1
# semantics.
table = models.ComputeNode.__table__
if free_ram_mb_delta != 0:
compute_node.free_ram_mb = table.c.free_ram_mb + free_ram_mb_delta
if free_disk_gb_delta != 0:
compute_node.free_disk_gb = table.c.free_disk_gb + \
free_disk_gb_delta
if work_delta != 0:
compute_node.current_workload = table.c.current_workload + \
work_delta
if vm_delta != 0:
compute_node.running_vms = table.c.running_vms + vm_delta
return compute_node
def compute_node_utilization_set(context, host, free_ram_mb=None,
free_disk_gb=None, work=None, vms=None):
"""Like compute_node_utilization_update() modify a specific host
entry. But this function will set the metrics absolutely
(vs. a delta update).
"""
session = get_session()
compute_node = None
with session.begin(subtransactions=True):
compute_node = session.query(models.ComputeNode).\
options(joinedload('service')).\
filter(models.Service.host == host).\
filter_by(deleted=False).\
with_lockmode('update').\
first()
if compute_node is None:
raise exception.NotFound(_("No ComputeNode for %(host)s" %
locals()))
if free_ram_mb != None:
compute_node.free_ram_mb = free_ram_mb
if free_disk_gb != None:
compute_node.free_disk_gb = free_disk_gb
if work != None:
compute_node.current_workload = work
if vms != None:
compute_node.running_vms = vms
return compute_node
###################

@ -408,6 +408,8 @@ def usage_from_instance(instance_ref, network_info=None, **kw):
instance_id=instance_ref['uuid'],
instance_type=instance_ref['instance_type']['name'],
instance_type_id=instance_ref['instance_type_id'],
memory_mb=instance_ref['memory_mb'],
disk_gb=instance_ref['local_gb'],
display_name=instance_ref['display_name'],
created_at=str(instance_ref['created_at']),
launched_at=str(instance_ref['launched_at']) \