ComputeNode Capacity support
The table represents the current state of compute nodes and will be used by the scheduler when selecting a host. Re: https://blueprints.launchpad.net/nova/+spec/scaling-zones This is just the db & notification portion of the branch. The scheduler portion is being deferring until comstuds branch gets merged since it conflicts heavily. NOTE: Compute notifications are now two-part. There is a compute.instance.XXX.start event and a compute.instance.XXX.end message instead of the previous compute.instance.XXX event (which is the same as the .end message) Change-Id: Ia8e68680cb0924c59df84f2eec858febf4926d65
This commit is contained in:
81
nova/notifier/capacity_notifier.py
Normal file
81
nova/notifier/capacity_notifier.py
Normal file
@@ -0,0 +1,81 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger('nova.notifier.capacity_notifier')
|
||||
|
||||
|
||||
def notify(message):
|
||||
"""Look for specific compute manager events and interprete them
|
||||
so as to keep the Capacity table up to date.
|
||||
|
||||
NOTE: the True/False return codes are only for testing.
|
||||
"""
|
||||
|
||||
# The event_type must start with 'compute.instance.'
|
||||
event_type = message.get('event_type', None)
|
||||
preamble = 'compute.instance.'
|
||||
if not event_type or not event_type.startswith(preamble):
|
||||
return False
|
||||
|
||||
# Events we're interested in end with .start and .end
|
||||
event = event_type[len(preamble):]
|
||||
parts = event.split('.')
|
||||
suffix = parts[-1].lower()
|
||||
event = event[:(-len(suffix) - 1)]
|
||||
|
||||
if suffix not in ['start', 'end']:
|
||||
return False
|
||||
started = suffix == 'start'
|
||||
ended = suffix == 'end'
|
||||
|
||||
if started and event == 'create':
|
||||
# We've already updated this stuff in the scheduler. Don't redo the
|
||||
# work here.
|
||||
return False
|
||||
|
||||
work = 1 if started else -1
|
||||
|
||||
# Extract the host name from the publisher id ...
|
||||
publisher_preamble = 'compute.'
|
||||
publisher = message.get('publisher_id', None)
|
||||
if not publisher or not publisher.startswith(publisher_preamble):
|
||||
return False
|
||||
host = publisher[len(publisher_preamble):]
|
||||
|
||||
# If we deleted an instance, make sure we reclaim the resources.
|
||||
# We may need to do something explicit for rebuild/migrate.
|
||||
free_ram_mb = 0
|
||||
free_disk_gb = 0
|
||||
vms = 0
|
||||
if ended and event == 'delete':
|
||||
vms = -1
|
||||
payload = message.get('payload', {})
|
||||
free_ram_mb = payload.get('memory_mb', 0)
|
||||
free_disk_gb = payload.get('disk_gb', 0)
|
||||
|
||||
LOG.debug("EventType=%(event_type)s -> host %(host)s: "
|
||||
"ram %(free_ram_mb)d, disk %(free_disk_gb)d, "
|
||||
"work %(work)d, vms%(vms)d" % locals())
|
||||
|
||||
db.api.compute_node_utilization_update(context.get_admin_context(), host,
|
||||
free_ram_mb_delta=free_ram_mb, free_disk_gb_delta=free_disk_gb,
|
||||
work_delta=work, vm_delta=vms)
|
||||
|
||||
return True
|
59
nova/tests/notifier/test_capacity_notifier.py
Normal file
59
nova/tests/notifier/test_capacity_notifier.py
Normal file
@@ -0,0 +1,59 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import nova.db.api
|
||||
from nova.notifier import capacity_notifier as cn
|
||||
from nova import test
|
||||
from nova import utils
|
||||
|
||||
|
||||
class CapacityNotifierTestCase(test.TestCase):
|
||||
"""Test case for the Capacity updating notifier."""
|
||||
|
||||
def _make_msg(self, host, event):
|
||||
usage_info = dict(memory_mb=123, disk_gb=456)
|
||||
payload = utils.to_primitive(usage_info, convert_instances=True)
|
||||
return dict(
|
||||
publisher_id="compute.%s" % host,
|
||||
event_type="compute.instance.%s" % event,
|
||||
payload=payload
|
||||
)
|
||||
|
||||
def test_event_type(self):
|
||||
msg = self._make_msg("myhost", "mymethod")
|
||||
msg['event_type'] = 'random'
|
||||
self.assertFalse(cn.notify(msg))
|
||||
|
||||
def test_bad_event_suffix(self):
|
||||
msg = self._make_msg("myhost", "mymethod.badsuffix")
|
||||
self.assertFalse(cn.notify(msg))
|
||||
|
||||
def test_bad_publisher_id(self):
|
||||
msg = self._make_msg("myhost", "mymethod.start")
|
||||
msg['publisher_id'] = 'badpublisher'
|
||||
self.assertFalse(cn.notify(msg))
|
||||
|
||||
def test_update_called(self):
|
||||
def _verify_called(host, context, free_ram_mb_delta,
|
||||
free_disk_gb_delta, work_delta, vm_delta):
|
||||
self.assertEquals(free_ram_mb_delta, 123)
|
||||
self.assertEquals(free_disk_gb_delta, 456)
|
||||
self.assertEquals(vm_delta, -1)
|
||||
self.assertEquals(work_delta, -1)
|
||||
|
||||
self.stubs.Set(nova.db.api, "compute_node_utilization_update",
|
||||
_verify_called)
|
||||
msg = self._make_msg("myhost", "delete.end")
|
||||
self.assertTrue(cn.notify(msg))
|
@@ -774,7 +774,7 @@ class ComputeTestCase(BaseTestCase):
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
|
||||
self.compute.add_fixed_ip_to_instance(self.context, instance_uuid, 1)
|
||||
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
|
||||
self.compute.terminate_instance(self.context, instance_uuid)
|
||||
|
||||
def test_remove_fixed_ip_usage_notification(self):
|
||||
@@ -796,7 +796,7 @@ class ComputeTestCase(BaseTestCase):
|
||||
instance_uuid,
|
||||
1)
|
||||
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
|
||||
self.compute.terminate_instance(self.context, instance_uuid)
|
||||
|
||||
def test_run_instance_usage_notification(self):
|
||||
@@ -804,11 +804,14 @@ class ComputeTestCase(BaseTestCase):
|
||||
inst_ref = self._create_fake_instance()
|
||||
instance_uuid = inst_ref['uuid']
|
||||
self.compute.run_instance(self.context, instance_uuid)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
|
||||
inst_ref = db.instance_get_by_uuid(self.context, instance_uuid)
|
||||
msg = test_notifier.NOTIFICATIONS[0]
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.create.start')
|
||||
# The last event is the one with the sugar in it.
|
||||
msg = test_notifier.NOTIFICATIONS[1]
|
||||
self.assertEquals(msg['priority'], 'INFO')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.create')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.create.end')
|
||||
payload = msg['payload']
|
||||
self.assertEquals(payload['tenant_id'], self.project_id)
|
||||
self.assertEquals(payload['user_id'], self.user_id)
|
||||
@@ -832,14 +835,16 @@ class ComputeTestCase(BaseTestCase):
|
||||
test_notifier.NOTIFICATIONS = []
|
||||
self.compute.terminate_instance(self.context, inst_ref['uuid'])
|
||||
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 3)
|
||||
msg = test_notifier.NOTIFICATIONS[0]
|
||||
self.assertEquals(msg['priority'], 'INFO')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.exists')
|
||||
|
||||
msg = test_notifier.NOTIFICATIONS[1]
|
||||
self.assertEquals(msg['priority'], 'INFO')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.delete')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.delete.start')
|
||||
msg1 = test_notifier.NOTIFICATIONS[2]
|
||||
self.assertEquals(msg1['event_type'], 'compute.instance.delete.end')
|
||||
payload = msg['payload']
|
||||
self.assertEquals(payload['tenant_id'], self.project_id)
|
||||
self.assertEquals(payload['user_id'], self.user_id)
|
||||
@@ -1006,10 +1011,14 @@ class ComputeTestCase(BaseTestCase):
|
||||
instance_uuid,
|
||||
'pre-migrating')
|
||||
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
|
||||
msg = test_notifier.NOTIFICATIONS[0]
|
||||
self.assertEquals(msg['event_type'],
|
||||
'compute.instance.resize.prep.start')
|
||||
msg = test_notifier.NOTIFICATIONS[1]
|
||||
self.assertEquals(msg['event_type'],
|
||||
'compute.instance.resize.prep.end')
|
||||
self.assertEquals(msg['priority'], 'INFO')
|
||||
self.assertEquals(msg['event_type'], 'compute.instance.resize.prep')
|
||||
payload = msg['payload']
|
||||
self.assertEquals(payload['tenant_id'], self.project_id)
|
||||
self.assertEquals(payload['user_id'], self.user_id)
|
||||
|
@@ -22,8 +22,8 @@ import datetime
|
||||
|
||||
from nova import test
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova import flags
|
||||
from nova import utils
|
||||
|
||||
@@ -545,3 +545,100 @@ class AggregateDBApiTestCase(test.TestCase):
|
||||
|
||||
db.dnsdomain_unregister(ctxt, domain1)
|
||||
db.dnsdomain_unregister(ctxt, domain2)
|
||||
|
||||
|
||||
class CapacityTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(CapacityTestCase, self).setUp()
|
||||
|
||||
self.ctxt = context.get_admin_context()
|
||||
|
||||
service_dict = dict(host='host1', binary='binary1',
|
||||
topic='compute', report_count=1,
|
||||
disabled=False)
|
||||
self.service = db.service_create(self.ctxt, service_dict)
|
||||
|
||||
self.compute_node_dict = dict(vcpus=2, memory_mb=1024, local_gb=2048,
|
||||
vcpus_used=0, memory_mb_used=0,
|
||||
local_gb_used=0, hypervisor_type="xen",
|
||||
hypervisor_version=1, cpu_info="",
|
||||
service_id=self.service.id)
|
||||
|
||||
self.flags(reserved_host_memory_mb=0)
|
||||
self.flags(reserved_host_disk_mb=0)
|
||||
|
||||
def _create_helper(self, host):
|
||||
self.compute_node_dict['host'] = host
|
||||
return db.compute_node_create(self.ctxt, self.compute_node_dict)
|
||||
|
||||
def test_compute_node_create(self):
|
||||
item = self._create_helper('host1')
|
||||
self.assertEquals(item.free_ram_mb, 1024)
|
||||
self.assertEquals(item.free_disk_gb, 2048)
|
||||
self.assertEquals(item.running_vms, 0)
|
||||
self.assertEquals(item.current_workload, 0)
|
||||
|
||||
def test_compute_node_create_with_reservations(self):
|
||||
self.flags(reserved_host_memory_mb=256)
|
||||
item = self._create_helper('host1')
|
||||
self.assertEquals(item.free_ram_mb, 1024 - 256)
|
||||
|
||||
def test_compute_node_set(self):
|
||||
item = self._create_helper('host1')
|
||||
|
||||
x = db.compute_node_utilization_set(self.ctxt, 'host1',
|
||||
free_ram_mb=2048, free_disk_gb=4096)
|
||||
self.assertEquals(x.free_ram_mb, 2048)
|
||||
self.assertEquals(x.free_disk_gb, 4096)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
self.assertEquals(x.current_workload, 0)
|
||||
|
||||
x = db.compute_node_utilization_set(self.ctxt, 'host1', work=3)
|
||||
self.assertEquals(x.free_ram_mb, 2048)
|
||||
self.assertEquals(x.free_disk_gb, 4096)
|
||||
self.assertEquals(x.current_workload, 3)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
|
||||
x = db.compute_node_utilization_set(self.ctxt, 'host1', vms=5)
|
||||
self.assertEquals(x.free_ram_mb, 2048)
|
||||
self.assertEquals(x.free_disk_gb, 4096)
|
||||
self.assertEquals(x.current_workload, 3)
|
||||
self.assertEquals(x.running_vms, 5)
|
||||
|
||||
def test_compute_node_utilization_update(self):
|
||||
item = self._create_helper('host1')
|
||||
|
||||
x = db.compute_node_utilization_update(self.ctxt, 'host1',
|
||||
free_ram_mb_delta=-24)
|
||||
self.assertEquals(x.free_ram_mb, 1000)
|
||||
self.assertEquals(x.free_disk_gb, 2048)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
self.assertEquals(x.current_workload, 0)
|
||||
|
||||
x = db.compute_node_utilization_update(self.ctxt, 'host1',
|
||||
free_disk_gb_delta=-48)
|
||||
self.assertEquals(x.free_ram_mb, 1000)
|
||||
self.assertEquals(x.free_disk_gb, 2000)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
self.assertEquals(x.current_workload, 0)
|
||||
|
||||
x = db.compute_node_utilization_update(self.ctxt, 'host1',
|
||||
work_delta=3)
|
||||
self.assertEquals(x.free_ram_mb, 1000)
|
||||
self.assertEquals(x.free_disk_gb, 2000)
|
||||
self.assertEquals(x.current_workload, 3)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
|
||||
x = db.compute_node_utilization_update(self.ctxt, 'host1',
|
||||
work_delta=-1)
|
||||
self.assertEquals(x.free_ram_mb, 1000)
|
||||
self.assertEquals(x.free_disk_gb, 2000)
|
||||
self.assertEquals(x.current_workload, 2)
|
||||
self.assertEquals(x.running_vms, 0)
|
||||
|
||||
x = db.compute_node_utilization_update(self.ctxt, 'host1',
|
||||
vm_delta=5)
|
||||
self.assertEquals(x.free_ram_mb, 1000)
|
||||
self.assertEquals(x.free_disk_gb, 2000)
|
||||
self.assertEquals(x.current_workload, 2)
|
||||
self.assertEquals(x.running_vms, 5)
|
||||
|
Reference in New Issue
Block a user