ComputeNode Capacity support
The table represents the current state of compute nodes and will be used by the scheduler when selecting a host. Re: https://blueprints.launchpad.net/nova/+spec/scaling-zones This is just the db & notification portion of the branch. The scheduler portion is being deferring until comstuds branch gets merged since it conflicts heavily. NOTE: Compute notifications are now two-part. There is a compute.instance.XXX.start event and a compute.instance.XXX.end message instead of the previous compute.instance.XXX event (which is the same as the .end message) Change-Id: Ia8e68680cb0924c59df84f2eec858febf4926d65
This commit is contained in:
parent
b3a41b7229
commit
317286832f
|
@ -370,6 +370,7 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
self._check_instance_not_already_created(context, instance)
|
||||
image_meta = self._check_image_size(context, instance)
|
||||
self._start_building(context, instance)
|
||||
self._notify_about_instance_usage(instance, "create.start")
|
||||
network_info = self._allocate_network(context, instance,
|
||||
requested_networks)
|
||||
try:
|
||||
|
@ -380,7 +381,10 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
except Exception:
|
||||
with utils.save_and_reraise_exception():
|
||||
self._deallocate_network(context, instance)
|
||||
self._notify_about_instance_usage(instance, network_info)
|
||||
|
||||
self._notify_about_instance_usage(instance, "create.end",
|
||||
network_info=network_info)
|
||||
|
||||
if self._is_instance_terminated(instance_uuid):
|
||||
raise exception.InstanceNotFound
|
||||
except exception.InstanceNotFound:
|
||||
|
@ -522,10 +526,13 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
task_state=None,
|
||||
launched_at=utils.utcnow())
|
||||
|
||||
def _notify_about_instance_usage(self, instance, network_info=None):
|
||||
usage_info = utils.usage_from_instance(instance, network_info)
|
||||
def _notify_about_instance_usage(self, instance, event_suffix,
|
||||
usage_info=None, network_info=None):
|
||||
if not usage_info:
|
||||
usage_info = utils.usage_from_instance(instance,
|
||||
network_info=network_info)
|
||||
notifier.notify('compute.%s' % self.host,
|
||||
'compute.instance.create',
|
||||
'compute.instance.%s' % event_suffix,
|
||||
notifier.INFO, usage_info)
|
||||
|
||||
def _deallocate_network(self, context, instance):
|
||||
|
@ -623,6 +630,7 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
def _delete_instance(self, context, instance):
|
||||
"""Delete an instance on this host."""
|
||||
instance_id = instance['id']
|
||||
self._notify_about_instance_usage(instance, "delete.start")
|
||||
self._shutdown_instance(context, instance, 'Terminating')
|
||||
self._cleanup_volumes(context, instance_id)
|
||||
self._instance_update(context,
|
||||
|
@ -632,11 +640,7 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
terminated_at=utils.utcnow())
|
||||
|
||||
self.db.instance_destroy(context, instance_id)
|
||||
|
||||
usage_info = utils.usage_from_instance(instance)
|
||||
notifier.notify('compute.%s' % self.host,
|
||||
'compute.instance.delete',
|
||||
notifier.INFO, usage_info)
|
||||
self._notify_about_instance_usage(instance, "delete.end")
|
||||
|
||||
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
|
||||
@checks_instance_lock
|
||||
|
@ -705,6 +709,7 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
LOG.audit(_("Rebuilding instance %s"), instance_uuid, context=context)
|
||||
|
||||
instance = self.db.instance_get_by_uuid(context, instance_uuid)
|
||||
self._notify_about_instance_usage(instance, "rebuild.start")
|
||||
current_power_state = self._get_power_state(context, instance)
|
||||
self._instance_update(context,
|
||||
instance_uuid,
|
||||
|
@ -746,11 +751,8 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
task_state=None,
|
||||
launched_at=utils.utcnow())
|
||||
|
||||
usage_info = utils.usage_from_instance(instance, network_info)
|
||||
notifier.notify('compute.%s' % self.host,
|
||||
'compute.instance.rebuild',
|
||||
notifier.INFO,
|
||||
usage_info)
|
||||
self._notify_about_instance_usage(instance, "rebuild.end",
|
||||
network_info=network_info)
|
||||
|
||||
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
|
||||
@checks_instance_lock
|
||||
|
@ -761,6 +763,8 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
context = context.elevated()
|
||||
instance = self.db.instance_get_by_uuid(context, instance_uuid)
|
||||
|
||||
self._notify_about_instance_usage(instance, "reboot.start")
|
||||
|
||||
current_power_state = self._get_power_state(context, instance)
|
||||
self._instance_update(context,
|
||||
instance_uuid,
|
||||
|
@ -785,6 +789,8 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
vm_state=vm_states.ACTIVE,
|
||||
task_state=None)
|
||||
|
||||
self._notify_about_instance_usage(instance, "reboot.end")
|
||||
|
||||
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
|
||||
@wrap_instance_fault
|
||||
def snapshot_instance(self, context, instance_uuid, image_id,
|
||||
|
@ -1045,15 +1051,15 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
instance_ref = self.db.instance_get_by_uuid(context,
|
||||
migration_ref.instance_uuid)
|
||||
|
||||
self._notify_about_instance_usage(instance_ref,
|
||||
"resize.confirm.start")
|
||||
|
||||
network_info = self._get_instance_nw_info(context, instance_ref)
|
||||
self.driver.confirm_migration(
|
||||
migration_ref, instance_ref, network_info)
|
||||
|
||||
usage_info = utils.usage_from_instance(instance_ref, network_info)
|
||||
notifier.notify('compute.%s' % self.host,
|
||||
'compute.instance.resize.confirm',
|
||||
notifier.INFO,
|
||||
usage_info)
|
||||
self._notify_about_instance_usage(instance_ref, "resize.confirm.end",
|
||||
network_info=network_info)
|
||||
|
||||
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
|
||||
@checks_instance_lock
|
||||
|
@ -1093,6 +1099,8 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
instance_ref = self.db.instance_get_by_uuid(context,
|
||||
migration_ref.instance_uuid)
|
||||
|
||||
self._notify_about_instance_usage(instance_ref, "resize.revert.start")
|
||||
|
||||
old_instance_type = migration_ref['old_instance_type_id']
|
||||
instance_type = instance_types.get_instance_type(old_instance_type)
|
||||
|
||||
|
@ -1109,11 +1117,8 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
self.driver.finish_revert_migration(instance_ref)
|
||||
self.db.migration_update(context, migration_id,
|
||||
{'status': 'reverted'})
|
||||
usage_info = utils.usage_from_instance(instance_ref)
|
||||
notifier.notify('compute.%s' % self.host,
|
||||
'compute.instance.resize.revert',
|
||||
notifier.INFO,
|
||||
usage_info)
|
||||
|
||||
self._notify_about_instance_usage(instance_ref, "resize.revert.end")
|
||||
|
||||
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
|
||||
@checks_instance_lock
|
||||
|
@ -1128,6 +1133,8 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
|
||||
instance_ref = self.db.instance_get_by_uuid(context, instance_uuid)
|
||||
|
||||
self._notify_about_instance_usage(instance_ref, "resize.prep.start")
|
||||
|
||||
same_host = instance_ref['host'] == FLAGS.host
|
||||
if same_host and not FLAGS.allow_resize_to_same_host:
|
||||
self._instance_update(context,
|
||||
|
@ -1162,10 +1169,8 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
usage_info = utils.usage_from_instance(instance_ref,
|
||||
new_instance_type=new_instance_type['name'],
|
||||
new_instance_type_id=new_instance_type['id'])
|
||||
notifier.notify('compute.%s' % self.host,
|
||||
'compute.instance.resize.prep',
|
||||
notifier.INFO,
|
||||
usage_info)
|
||||
self._notify_about_instance_usage(instance_ref, "resize.prep.end",
|
||||
usage_info=usage_info)
|
||||
|
||||
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
|
||||
@checks_instance_lock
|
||||
|
@ -1274,6 +1279,8 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
|
||||
"""
|
||||
instance_ref = self.db.instance_get_by_uuid(context, instance_uuid)
|
||||
self._notify_about_instance_usage(instance_ref, "create_ip.start")
|
||||
|
||||
instance_id = instance_ref['id']
|
||||
self.network_api.add_fixed_ip_to_instance(context, instance_id,
|
||||
self.host, network_id)
|
||||
|
@ -1282,10 +1289,8 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
instance_ref['uuid'])
|
||||
self.reset_network(context, instance_ref['uuid'])
|
||||
|
||||
usage = utils.usage_from_instance(instance_ref, network_info)
|
||||
notifier.notify('compute.%s' % self.host,
|
||||
'compute.instance.create_ip',
|
||||
notifier.INFO, usage)
|
||||
self._notify_about_instance_usage(instance_ref, "create_ip.end",
|
||||
network_info=network_info)
|
||||
|
||||
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
|
||||
@checks_instance_lock
|
||||
|
@ -1296,6 +1301,8 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
instance networking.
|
||||
"""
|
||||
instance_ref = self.db.instance_get_by_uuid(context, instance_uuid)
|
||||
self._notify_about_instance_usage(instance_ref, "delete_ip.start")
|
||||
|
||||
instance_id = instance_ref['id']
|
||||
self.network_api.remove_fixed_ip_from_instance(context, instance_id,
|
||||
address)
|
||||
|
@ -1304,10 +1311,8 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
instance_ref['uuid'])
|
||||
self.reset_network(context, instance_ref['uuid'])
|
||||
|
||||
usage = utils.usage_from_instance(instance_ref, network_info)
|
||||
notifier.notify('compute.%s' % self.host,
|
||||
'compute.instance.delete_ip',
|
||||
notifier.INFO, usage)
|
||||
self._notify_about_instance_usage(instance_ref, "delete_ip.end",
|
||||
network_info=network_info)
|
||||
|
||||
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
|
||||
@checks_instance_lock
|
||||
|
@ -1697,7 +1702,7 @@ class ComputeManager(manager.SchedulerDependentManager):
|
|||
:returns: See driver.update_available_resource()
|
||||
|
||||
"""
|
||||
return self.driver.update_available_resource(context, self.host)
|
||||
self.driver.update_available_resource(context, self.host)
|
||||
|
||||
def get_instance_disk_info(self, context, instance_name):
|
||||
"""Getting infomation of instance's current disk.
|
||||
|
|
|
@ -187,12 +187,31 @@ def compute_node_update(context, compute_id, values):
|
|||
"""Set the given properties on an computeNode and update it.
|
||||
|
||||
Raises NotFound if computeNode does not exist.
|
||||
|
||||
"""
|
||||
|
||||
return IMPL.compute_node_update(context, compute_id, values)
|
||||
|
||||
|
||||
def compute_node_get_by_host(context, host):
|
||||
return IMPL.compute_node_get_by_host(context, host)
|
||||
|
||||
|
||||
def compute_node_capacity_find(context, minimum_ram_mb, minimum_disk_gb):
|
||||
return IMPL.compute_node_capacity_find(context, minimum_ram_mb,
|
||||
minimum_disk_gb)
|
||||
|
||||
|
||||
def compute_node_utilization_update(context, host, free_ram_mb_delta=0,
|
||||
free_disk_gb_delta=0, work_delta=0, vm_delta=0):
|
||||
return IMPL.compute_node_utilization_update(context, host,
|
||||
free_ram_mb_delta, free_disk_gb_delta, work_delta,
|
||||
vm_delta)
|
||||
|
||||
|
||||
def compute_node_utilization_set(context, host, free_ram_mb=None,
|
||||
free_disk_gb=None, work=None, vms=None):
|
||||
return IMPL.compute_node_utilization_set(context, host, free_ram_mb,
|
||||
free_disk_gb, work, vms)
|
||||
|
||||
###################
|
||||
|
||||
|
||||
|
|
|
@ -44,6 +44,9 @@ from sqlalchemy.sql.expression import desc
|
|||
from sqlalchemy.sql.expression import literal_column
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DECLARE('reserved_host_disk_mb', 'nova.scheduler.host_manager')
|
||||
flags.DECLARE('reserved_host_memory_mb', 'nova.scheduler.host_manager')
|
||||
|
||||
LOG = logging.getLogger("nova.db.sqlalchemy")
|
||||
|
||||
|
||||
|
@ -387,6 +390,21 @@ def compute_node_get(context, compute_id, session=None):
|
|||
return result
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def compute_node_get_by_service(context, service_id, session=None):
|
||||
if not session:
|
||||
session = get_session()
|
||||
|
||||
result = model_query(context, models.ComputeNode, session=session).\
|
||||
filter_by(service_id=service_id).\
|
||||
first()
|
||||
|
||||
if not result:
|
||||
raise exception.ComputeHostNotFound(host="ServiceID=%s" % service_id)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def compute_node_get_all(context, session=None):
|
||||
return model_query(context, models.ComputeNode, session=session).\
|
||||
|
@ -402,23 +420,155 @@ def compute_node_get_for_service(context, service_id):
|
|||
first()
|
||||
|
||||
|
||||
def _get_host_utilization(context, host, ram_mb, disk_gb):
|
||||
"""Compute the current utilization of a given host."""
|
||||
instances = instance_get_all_by_host(context, host)
|
||||
vms = len(instances)
|
||||
free_ram_mb = ram_mb - FLAGS.reserved_host_memory_mb
|
||||
free_disk_gb = disk_gb - (FLAGS.reserved_host_disk_mb * 1024)
|
||||
|
||||
work = 0
|
||||
for instance in instances:
|
||||
free_ram_mb -= instance.memory_mb
|
||||
free_disk_gb -= instance.local_gb
|
||||
if instance.vm_state in [vm_states.BUILDING, vm_states.REBUILDING,
|
||||
vm_states.MIGRATING, vm_states.RESIZING]:
|
||||
work += 1
|
||||
return dict(free_ram_mb=free_ram_mb,
|
||||
free_disk_gb=free_disk_gb,
|
||||
current_workload=work,
|
||||
running_vms=vms)
|
||||
|
||||
|
||||
def _adjust_compute_node_values_for_utilization(context, values, session):
|
||||
service_ref = service_get(context, values['service_id'], session=session)
|
||||
host = service_ref['host']
|
||||
ram_mb = values['memory_mb']
|
||||
disk_gb = values['local_gb']
|
||||
values.update(_get_host_utilization(context, host, ram_mb, disk_gb))
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def compute_node_create(context, values):
|
||||
compute_node_ref = models.ComputeNode()
|
||||
compute_node_ref.update(values)
|
||||
compute_node_ref.save()
|
||||
def compute_node_create(context, values, session=None):
|
||||
"""Creates a new ComputeNode and populates the capacity fields
|
||||
with the most recent data."""
|
||||
if not session:
|
||||
session = get_session()
|
||||
|
||||
_adjust_compute_node_values_for_utilization(context, values, session)
|
||||
with session.begin(subtransactions=True):
|
||||
compute_node_ref = models.ComputeNode()
|
||||
session.add(compute_node_ref)
|
||||
compute_node_ref.update(values)
|
||||
return compute_node_ref
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def compute_node_update(context, compute_id, values):
|
||||
"""Creates a new ComputeNode and populates the capacity fields
|
||||
with the most recent data."""
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
_adjust_compute_node_values_for_utilization(context, values, session)
|
||||
with session.begin(subtransactions=True):
|
||||
compute_ref = compute_node_get(context, compute_id, session=session)
|
||||
compute_ref.update(values)
|
||||
compute_ref.save(session=session)
|
||||
|
||||
|
||||
# Note: these operations use with_lockmode() ... so this will only work
|
||||
# reliably with engines that support row-level locking
|
||||
# (postgres, mysql+innodb and above).
|
||||
|
||||
def compute_node_get_by_host(context, host):
|
||||
"""Get all capacity entries for the given host."""
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
node = session.query(models.ComputeNode).\
|
||||
options(joinedload('service')).\
|
||||
filter(models.Service.host == host).\
|
||||
filter_by(deleted=False).\
|
||||
with_lockmode('update')
|
||||
return node.first()
|
||||
|
||||
|
||||
def compute_node_capacity_find(context, minimum_ram_mb, minimum_disk_gb):
|
||||
"""Get all enabled hosts with enough ram and disk."""
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
return session.query(models.ComputeNode).\
|
||||
options(joinedload('service')).\
|
||||
filter(models.ComputeNode.free_ram_mb >= minimum_ram_mb).\
|
||||
filter(models.ComputeNode.free_disk_gb >= minimum_disk_gb).\
|
||||
filter(models.Service.disabled == False).\
|
||||
filter_by(deleted=False).\
|
||||
with_lockmode('update').all()
|
||||
|
||||
|
||||
def compute_node_utilization_update(context, host, free_ram_mb_delta=0,
|
||||
free_disk_gb_delta=0, work_delta=0, vm_delta=0):
|
||||
"""Update a specific ComputeNode entry by a series of deltas.
|
||||
Do this as a single atomic action and lock the row for the
|
||||
duration of the operation. Requires that ComputeNode record exist."""
|
||||
session = get_session()
|
||||
compute_node = None
|
||||
with session.begin(subtransactions=True):
|
||||
compute_node = session.query(models.ComputeNode).\
|
||||
options(joinedload('service')).\
|
||||
filter(models.Service.host == host).\
|
||||
filter_by(deleted=False).\
|
||||
with_lockmode('update').\
|
||||
first()
|
||||
if compute_node is None:
|
||||
raise exception.NotFound(_("No ComputeNode for %(host)s" %
|
||||
locals()))
|
||||
|
||||
# This table thingy is how we get atomic UPDATE x = x + 1
|
||||
# semantics.
|
||||
table = models.ComputeNode.__table__
|
||||
if free_ram_mb_delta != 0:
|
||||
compute_node.free_ram_mb = table.c.free_ram_mb + free_ram_mb_delta
|
||||
if free_disk_gb_delta != 0:
|
||||
compute_node.free_disk_gb = table.c.free_disk_gb + \
|
||||
free_disk_gb_delta
|
||||
if work_delta != 0:
|
||||
compute_node.current_workload = table.c.current_workload + \
|
||||
work_delta
|
||||
if vm_delta != 0:
|
||||
compute_node.running_vms = table.c.running_vms + vm_delta
|
||||
return compute_node
|
||||
|
||||
|
||||
def compute_node_utilization_set(context, host, free_ram_mb=None,
|
||||
free_disk_gb=None, work=None, vms=None):
|
||||
"""Like compute_node_utilization_update() modify a specific host
|
||||
entry. But this function will set the metrics absolutely
|
||||
(vs. a delta update).
|
||||
"""
|
||||
session = get_session()
|
||||
compute_node = None
|
||||
with session.begin(subtransactions=True):
|
||||
compute_node = session.query(models.ComputeNode).\
|
||||
options(joinedload('service')).\
|
||||
filter(models.Service.host == host).\
|
||||
filter_by(deleted=False).\
|
||||
with_lockmode('update').\
|
||||
first()
|
||||
if compute_node is None:
|
||||
raise exception.NotFound(_("No ComputeNode for %(host)s" %
|
||||
locals()))
|
||||
|
||||
if free_ram_mb != None:
|
||||
compute_node.free_ram_mb = free_ram_mb
|
||||
if free_disk_gb != None:
|
||||
compute_node.free_disk_gb = free_disk_gb
|
||||
if work != None:
|
||||
compute_node.current_workload = work
|
||||
if vms != None:
|
||||
compute_node.running_vms = vms
|
||||
|
||||
return compute_node
|
||||
|
||||
|
||||
###################
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
# Copyright 2011 OpenStack LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from sqlalchemy import Table, Column, MetaData, Integer
|
||||
|
||||
from nova import log as logging
|
||||
|
||||
|
||||
new_columns = [
|
||||
Column('free_ram_mb', Integer()),
|
||||
Column('free_disk_gb', Integer()),
|
||||
Column('current_workload', Integer()),
|
||||
Column('running_vms', Integer()),
|
||||
]
|
||||
|
||||
|
||||
def upgrade(migrate_engine):
|
||||
# Upgrade operations go here. Don't create your own engine;
|
||||
# bind migrate_engine to your metadata
|
||||
meta = MetaData()
|
||||
meta.bind = migrate_engine
|
||||
compute_nodes = Table('compute_nodes', meta, autoload=True)
|
||||
|
||||
for column in new_columns:
|
||||
compute_nodes.create_column(column)
|
||||
|
||||
|
||||
def downgrade(migrate_engine):
|
||||
meta = MetaData()
|
||||
meta.bind = migrate_engine
|
||||
compute_nodes = Table('compute_nodes', meta, autoload=True)
|
||||
|
||||
for column in new_columns:
|
||||
compute_notes.drop_column(column)
|
|
@ -136,6 +136,16 @@ class ComputeNode(BASE, NovaBase):
|
|||
hypervisor_type = Column(Text)
|
||||
hypervisor_version = Column(Integer)
|
||||
|
||||
# Free Ram, amount of activity (resize, migration, boot, etc) and
|
||||
# the number of running VM's are a good starting point for what's
|
||||
# important when making scheduling decisions.
|
||||
#
|
||||
# NOTE(sandy): We'll need to make this extensible for other schedulers.
|
||||
free_ram_mb = Column(Integer)
|
||||
free_disk_gb = Column(Integer)
|
||||
current_workload = Column(Integer)
|
||||
running_vms = Column(Integer)
|
||||
|
||||
# Note(masumotok): Expected Strings example:
|
||||
#
|
||||
# '{"arch":"x86_64",
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger('nova.notifier.capacity_notifier')
|
||||
|
||||
|
||||
def notify(message):
|
||||
"""Look for specific compute manager events and interprete them
|
||||
so as to keep the Capacity table up to date.
|
||||
|
||||
NOTE: the True/False return codes are only for testing.
|
||||
"""
|
||||
|
||||
# The event_type must start with 'compute.instance.'
|
||||
event_type = message.get('event_type', None)
|
||||
preamble = 'compute.instance.'
|
||||
if not event_type or not event_type.startswith(preamble):
|
||||
return False
|
||||
|
||||
# Events we're interested in end with .start and .end
|
||||
event = event_type[len(preamble):]
|
||||
parts = event.split('.')
|
||||
suffix = parts[-1].lower()
|
||||
event = event[:(-len(suffix) - 1)]
|
||||
|
||||
if suffix not in ['start', 'end']:
|
||||
return False
|
||||
started = suffix == 'start'
|
||||
ended = suffix == 'end'
|
||||
|
||||
if started and event == 'create':
|
||||
# We've already updated this stuff in the scheduler. Don't redo the
|
||||
# work here.
|
||||
return False
|
||||
|
||||
work = 1 if started else -1
|
||||
|
||||
# Extract the host name from the publisher id ...
|
||||
publisher_preamble = 'compute.'
|
||||
publisher = message.get('publisher_id', None)
|
||||
if not publisher or not publisher.startswith(publisher_preamble):
|
||||
return False
|
||||
host = publisher[len(publisher_preamble):]
|
||||
|
||||
# If we deleted an instance, make sure we reclaim the resources.
|
||||
# We may need to do something explicit for rebuild/migrate.
|
||||
free_ram_mb = 0
|
||||
free_disk_gb = 0
|
||||
vms = 0
|
||||
if ended and event == 'delete':
|
||||
vms = -1
|
||||
payload = message.get('payload', {})
|
||||
free_ram_mb = payload.get('memory_mb', 0)
|
||||
free_disk_gb = payload.get('disk_gb', 0)
|
||||
|
||||
LOG.debug("EventType=%(event_type)s -> host %(host)s: "
|
||||
"ram %(free_ram_mb)d, disk %(free_disk_gb)d, "
|
||||
"work %(work)d, vms%(vms)d" % locals())
|
||||
|
||||
db.api.compute_node_utilization_update(context.get_admin_context(), host,
|
||||
free_ram_mb_delta=free_ram_mb, free_disk_gb_delta=free_disk_gb,
|
||||
work_delta=work, vm_delta=vms)
|
||||
|
||||
return True
|
|
@ -0,0 +1,59 @@
|
|||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import nova.db.api
|
||||
from nova.notifier import capacity_notifier as cn
|
||||
from nova import test
|
||||
from nova import utils
|
||||
|
||||
|
||||
class CapacityNotifierTestCase(test.TestCase):
|
||||
"""Test case for the Capacity updating notifier."""
|
||||
|
||||
def _make_msg(self, host, event):
|
||||
usage_info = dict(memory_mb=123, disk_gb=456)
|
||||
payload = utils.to_primitive(usage_info, convert_instances=True)
|
||||
return dict(
|
||||
publisher_id="compute.%s" % host,
|
||||
event_type="compute.instance.%s" % event,
|
||||
payload=payload
|
||||
)
|
||||
|
||||
def test_event_type(self):
|
||||
msg = self._make_msg("myhost", "mymethod")
|
||||
msg['event_type'] = 'random'
|
||||
self.assertFalse(cn.notify(msg))
|
||||
|
||||
def test_bad_event_suffix(self):
|
||||
msg = self._make_msg("myhost", "mymethod.badsuffix")
|
||||
self.assertFalse(cn.notify(msg))
|
||||
|
||||
def test_bad_publisher_id(self):
|
||||
msg = self._make_msg("myhost", "mymethod.start")
|
||||
msg['publisher_id'] = 'badpublisher'
|
||||
self.assertFalse(cn.notify(msg))
|
||||
|
||||
def test_update_called(self):
|
||||
def _verify_called(host, context, free_ram_mb_delta,
|
||||
free_disk_gb_delta, work_delta, vm_delta):
|
||||
self.assertEquals(free_ram_mb_delta, 123)
|
||||
self.assertEquals(free_disk_gb_delta, 456)
|
||||
self.assertEquals(vm_delta, -1)
|
||||
self.assertEquals(work_delta, -1)
|
||||
|
||||
self.stubs.Set(nova.db.api, "compute_node_utilization_update",
|
||||
_verify_called)
|
||||
msg = self._make_msg("myhost", "delete.end")
|
||||
self.assertTrue(cn.notify(msg))
|
|
@ -774,7 +774,7 @@ class ComputeTestCase(BaseTestCase):
|
|||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
|
||||
self.compute.add_fixed_ip_to_instance(self.context, instance_uuid, 1)
|
||||
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
|
||||
self.compute.terminate_instance(self.context, instance_uuid)
|
||||
|
||||
def test_remove_fixed_ip_usage_notification(self):
|
||||
|
@ -796,7 +796,7 @@ class ComputeTestCase(BaseTestCase):
|
|||
instance_uuid,
|
||||
1)
|
||||
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
|
||||
self.compute.terminate_instance(self.context, instance_uuid)
|
||||
|
||||
def test_run_instance_usage_notification(self):
|
||||
|
@ -804,11 +804,14 @@ class ComputeTestCase(BaseTestCase):
|
|||
inst_ref = self._create_fake_instance()
|
||||
instance_uuid = inst_ref['uuid']
|
||||
self.compute.run_instance(self.context, instance_uuid)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
|
||||
inst_ref = db.instance_get_by_uuid(self.context, instance_uuid)
|
||||
msg = test_notifier.NOTIFICATIONS[0]
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.create.start')
|
||||
# The last event is the one with the sugar in it.
|
||||
msg = test_notifier.NOTIFICATIONS[1]
|
||||
self.assertEquals(msg['priority'], 'INFO')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.create')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.create.end')
|
||||
payload = msg['payload']
|
||||
self.assertEquals(payload['tenant_id'], self.project_id)
|
||||
self.assertEquals(payload['user_id'], self.user_id)
|
||||
|
@ -832,14 +835,16 @@ class ComputeTestCase(BaseTestCase):
|
|||
test_notifier.NOTIFICATIONS = []
|
||||
self.compute.terminate_instance(self.context, inst_ref['uuid'])
|
||||
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 3)
|
||||
msg = test_notifier.NOTIFICATIONS[0]
|
||||
self.assertEquals(msg['priority'], 'INFO')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.exists')
|
||||
|
||||
msg = test_notifier.NOTIFICATIONS[1]
|
||||
self.assertEquals(msg['priority'], 'INFO')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.delete')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.delete.start')
|
||||
msg1 = test_notifier.NOTIFICATIONS[2]
|
||||
self.assertEquals(msg1['event_type'], 'compute.instance.delete.end')
|
||||
payload = msg['payload']
|
||||
self.assertEquals(payload['tenant_id'], self.project_id)
|
||||
self.assertEquals(payload['user_id'], self.user_id)
|
||||
|
@ -1006,10 +1011,14 @@ class ComputeTestCase(BaseTestCase):
|
|||
instance_uuid,
|
||||
'pre-migrating')
|
||||
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
|
||||
msg = test_notifier.NOTIFICATIONS[0]
|
||||
self.assertEquals(msg['event_type'],
|
||||
'compute.instance.resize.prep.start')
|
||||
msg = test_notifier.NOTIFICATIONS[1]
|
||||
self.assertEquals(msg['event_type'],
|
||||
'compute.instance.resize.prep.end')
|
||||
self.assertEquals(msg['priority'], 'INFO')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.resize.prep')
|
||||
payload = msg['payload']
|
||||
self.assertEquals(payload['tenant_id'], self.project_id)
|
||||
self.assertEquals(payload['user_id'], self.user_id)
|
||||
|
|
|
@ -22,8 +22,8 @@ import datetime
|
|||
|
||||
from nova import test
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova import flags
|
||||
from nova import utils
|
||||
|
||||
|
@ -545,3 +545,100 @@ class AggregateDBApiTestCase(test.TestCase):
|
|||
|
||||
db.dnsdomain_unregister(ctxt, domain1)
|
||||
db.dnsdomain_unregister(ctxt, domain2)
|
||||
|
||||
|
||||
class CapacityTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(CapacityTestCase, self).setUp()
|
||||
|
||||
self.ctxt = context.get_admin_context()
|
||||
|
||||
service_dict = dict(host='host1', binary='binary1',
|
||||
topic='compute', report_count=1,
|
||||
disabled=False)
|
||||
self.service = db.service_create(self.ctxt, service_dict)
|
||||
|
||||
self.compute_node_dict = dict(vcpus=2, memory_mb=1024, local_gb=2048,
|
||||
vcpus_used=0, memory_mb_used=0,
|
||||
local_gb_used=0, hypervisor_type="xen",
|
||||
hypervisor_version=1, cpu_info="",
|
||||
service_id=self.service.id)
|
||||
|
||||
self.flags(reserved_host_memory_mb=0)
|
||||
self.flags(reserved_host_disk_mb=0)
|
||||
|
||||
def _create_helper(self, host):
|
||||
self.compute_node_dict['host'] = host
|
||||
return db.compute_node_create(self.ctxt, self.compute_node_dict)
|
||||
|
||||
def test_compute_node_create(self):
|
||||
item = self._create_helper('host1')
|
||||
self.assertEquals(item.free_ram_mb, 1024)
|
||||
self.assertEquals(item.free_disk_gb, 2048)
|
||||
self.assertEquals(item.running_vms, 0)
|
||||
self.assertEquals(item.current_workload, 0)
|
||||
|
||||
def test_compute_node_create_with_reservations(self):
|
||||
self.flags(reserved_host_memory_mb=256)
|
||||
item = self._create_helper('host1')
|
||||
self.assertEquals(item.free_ram_mb, 1024 - 256)
|
||||
|
||||
def test_compute_node_set(self):
|
||||
item = self._create_helper('host1')
|
||||
|
||||
x = db.compute_node_utilization_set(self.ctxt, 'host1',
|
||||
free_ram_mb=2048, free_disk_gb=4096)
|
||||
self.assertEquals(x.free_ram_mb, 2048)
|
||||
self.assertEquals(x.free_disk_gb, 4096)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
self.assertEquals(x.current_workload, 0)
|
||||
|
||||
x = db.compute_node_utilization_set(self.ctxt, 'host1', work=3)
|
||||
self.assertEquals(x.free_ram_mb, 2048)
|
||||
self.assertEquals(x.free_disk_gb, 4096)
|
||||
self.assertEquals(x.current_workload, 3)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
|
||||
x = db.compute_node_utilization_set(self.ctxt, 'host1', vms=5)
|
||||
self.assertEquals(x.free_ram_mb, 2048)
|
||||
self.assertEquals(x.free_disk_gb, 4096)
|
||||
self.assertEquals(x.current_workload, 3)
|
||||
self.assertEquals(x.running_vms, 5)
|
||||
|
||||
def test_compute_node_utilization_update(self):
|
||||
item = self._create_helper('host1')
|
||||
|
||||
x = db.compute_node_utilization_update(self.ctxt, 'host1',
|
||||
free_ram_mb_delta=-24)
|
||||
self.assertEquals(x.free_ram_mb, 1000)
|
||||
self.assertEquals(x.free_disk_gb, 2048)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
self.assertEquals(x.current_workload, 0)
|
||||
|
||||
x = db.compute_node_utilization_update(self.ctxt, 'host1',
|
||||
free_disk_gb_delta=-48)
|
||||
self.assertEquals(x.free_ram_mb, 1000)
|
||||
self.assertEquals(x.free_disk_gb, 2000)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
self.assertEquals(x.current_workload, 0)
|
||||
|
||||
x = db.compute_node_utilization_update(self.ctxt, 'host1',
|
||||
work_delta=3)
|
||||
self.assertEquals(x.free_ram_mb, 1000)
|
||||
self.assertEquals(x.free_disk_gb, 2000)
|
||||
self.assertEquals(x.current_workload, 3)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
|
||||
x = db.compute_node_utilization_update(self.ctxt, 'host1',
|
||||
work_delta=-1)
|
||||
self.assertEquals(x.free_ram_mb, 1000)
|
||||
self.assertEquals(x.free_disk_gb, 2000)
|
||||
self.assertEquals(x.current_workload, 2)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
|
||||
x = db.compute_node_utilization_update(self.ctxt, 'host1',
|
||||
vm_delta=5)
|
||||
self.assertEquals(x.free_ram_mb, 1000)
|
||||
self.assertEquals(x.free_disk_gb, 2000)
|
||||
self.assertEquals(x.current_workload, 2)
|
||||
self.assertEquals(x.running_vms, 5)
|
||||
|
|
|
@ -489,8 +489,8 @@ class ToPrimitiveTestCase(test.TestCase):
|
|||
ret = utils.to_primitive(x)
|
||||
self.assertEquals(len(ret), 3)
|
||||
self.assertTrue(ret[0].startswith(u"<module 'datetime' from "))
|
||||
self.assertTrue(ret[1].startswith(u'<function foo at 0x'))
|
||||
self.assertEquals(ret[2], u'<built-in function dir>')
|
||||
self.assertTrue(ret[1].startswith('<function foo at 0x'))
|
||||
self.assertEquals(ret[2], '<built-in function dir>')
|
||||
|
||||
|
||||
class MonkeyPatchTestCase(test.TestCase):
|
||||
|
|
|
@ -408,6 +408,8 @@ def usage_from_instance(instance_ref, network_info=None, **kw):
|
|||
instance_id=instance_ref['uuid'],
|
||||
instance_type=instance_ref['instance_type']['name'],
|
||||
instance_type_id=instance_ref['instance_type_id'],
|
||||
memory_mb=instance_ref['memory_mb'],
|
||||
disk_gb=instance_ref['local_gb'],
|
||||
display_name=instance_ref['display_name'],
|
||||
created_at=str(instance_ref['created_at']),
|
||||
launched_at=str(instance_ref['launched_at']) \
|
||||
|
|
|
@ -264,12 +264,12 @@ class FakeConnection(driver.ComputeDriver):
|
|||
'local_gb_used': 0,
|
||||
'hypervisor_type': 'fake',
|
||||
'hypervisor_version': '1.0',
|
||||
'service_id': service_ref['id'],
|
||||
'cpu_info': '?'}
|
||||
|
||||
compute_node_ref = service_ref['compute_node']
|
||||
if not compute_node_ref:
|
||||
LOG.info(_('Compute_service record created for %s ') % host)
|
||||
dic['service_id'] = service_ref['id']
|
||||
db.compute_node_create(ctxt, dic)
|
||||
else:
|
||||
LOG.info(_('Compute_service record updated for %s ') % host)
|
||||
|
@ -292,6 +292,13 @@ class FakeConnection(driver.ComputeDriver):
|
|||
"""This method is supported only by libvirt."""
|
||||
return
|
||||
|
||||
def finish_migration(self, context, migration, instance, disk_info,
|
||||
network_info, image_meta, resize_instance):
|
||||
return
|
||||
|
||||
def confirm_migration(self, migration, instance, network_info):
|
||||
return
|
||||
|
||||
def pre_live_migration(self, block_device_info):
|
||||
"""This method is supported only by libvirt."""
|
||||
return
|
||||
|
|
|
@ -1564,12 +1564,12 @@ class LibvirtConnection(driver.ComputeDriver):
|
|||
'hypervisor_type': self.get_hypervisor_type(),
|
||||
'hypervisor_version': self.get_hypervisor_version(),
|
||||
'cpu_info': self.get_cpu_info(),
|
||||
'service_id': service_ref['id'],
|
||||
'disk_available_least': self.get_disk_available_least()}
|
||||
|
||||
compute_node_ref = service_ref['compute_node']
|
||||
if not compute_node_ref:
|
||||
LOG.info(_('Compute_service record created for %s ') % host)
|
||||
dic['service_id'] = service_ref['id']
|
||||
db.compute_node_create(ctxt, dic)
|
||||
else:
|
||||
LOG.info(_('Compute_service record updated for %s ') % host)
|
||||
|
|
|
@ -397,12 +397,12 @@ class XenAPIConnection(driver.ComputeDriver):
|
|||
'local_gb_used': used_disk_gb,
|
||||
'hypervisor_type': 'xen',
|
||||
'hypervisor_version': 0,
|
||||
'service_id': service_ref['id'],
|
||||
'cpu_info': host_stats['host_cpu_info']['cpu_count']}
|
||||
|
||||
compute_node_ref = service_ref['compute_node']
|
||||
if not compute_node_ref:
|
||||
LOG.info(_('Compute_service record created for %s ') % host)
|
||||
dic['service_id'] = service_ref['id']
|
||||
db.compute_node_create(ctxt, dic)
|
||||
else:
|
||||
LOG.info(_('Compute_service record updated for %s ') % host)
|
||||
|
|
Loading…
Reference in New Issue