Merge from trunk

This commit is contained in:
Cerberus
2011-05-11 10:41:06 -05:00
10 changed files with 199 additions and 24 deletions

View File

@@ -44,6 +44,7 @@ Josh Kearney <josh@jk0.org>
Josh Kleinpeter <josh@kleinpeter.org>
Joshua McKenty <jmckenty@gmail.com>
Justin Santa Barbara <justin@fathomdb.com>
Justin Shepherd <jshepher@rackspace.com>
Kei Masumoto <masumotok@nttdata.co.jp>
Ken Pepple <ken.pepple@gmail.com>
Kevin Bringard <kbringard@attinteractive.com>

View File

@@ -131,6 +131,7 @@ class ComputeManager(manager.SchedulerDependentManager):
self.network_manager = utils.import_object(FLAGS.network_manager)
self.volume_manager = utils.import_object(FLAGS.volume_manager)
self.network_api = network.API()
self._last_host_check = 0
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
@@ -1093,6 +1094,13 @@ class ComputeManager(manager.SchedulerDependentManager):
unicode(ex))
error_list.append(ex)
try:
self._report_driver_status()
except Exception as ex:
LOG.warning(_("Error during report_driver_status(): %s"),
unicode(ex))
error_list.append(ex)
try:
self._poll_instance_states(context)
except Exception as ex:
@@ -1102,6 +1110,16 @@ class ComputeManager(manager.SchedulerDependentManager):
return error_list
def _report_driver_status(self):
curr_time = time.time()
if curr_time - self._last_host_check > FLAGS.host_state_interval:
self._last_host_check = curr_time
LOG.info(_("Updating host status"))
# This will grab info about the host and queue it
# to be sent to the Schedulers.
self.update_service_capabilities(
self.driver.get_host_stats(refresh=True))
def _poll_instance_states(self, context):
vm_instances = self.driver.list_instances_detail()
vm_instances = dict((vm.name, vm) for vm in vm_instances)

View File

@@ -96,8 +96,8 @@ class FlavorFilter(HostFilter):
selected_hosts = []
for host, services in zone_manager.service_states.iteritems():
capabilities = services.get('compute', {})
host_ram_mb = capabilities['host_memory']['free']
disk_bytes = capabilities['disk']['available']
host_ram_mb = capabilities['host_memory_free']
disk_bytes = capabilities['disk_available']
if host_ram_mb >= instance_type['memory_mb'] and \
disk_bytes >= instance_type['local_gb']:
selected_hosts.append((host, capabilities))
@@ -106,16 +106,16 @@ class FlavorFilter(HostFilter):
#host entries (currently) are like:
# {'host_name-description': 'Default install of XenServer',
# 'host_hostname': 'xs-mini',
# 'host_memory': {'total': 8244539392,
# 'overhead': 184225792,
# 'free': 3868327936,
# 'free-computed': 3840843776},
# 'host_memory_total': 8244539392,
# 'host_memory_overhead': 184225792,
# 'host_memory_free': 3868327936,
# 'host_memory_free_computed': 3840843776},
# 'host_other-config': {},
# 'host_ip_address': '192.168.1.109',
# 'host_cpu_info': {},
# 'disk': {'available': 32954957824,
# 'total': 50394562560,
# 'used': 17439604736},
# 'disk_available': 32954957824,
# 'disk_total': 50394562560,
# 'disk_used': 17439604736},
# 'host_uuid': 'cedb9b39-9388-41df-8891-c5c9a0c0fe5f',
# 'host_name-label': 'xs-mini'}
@@ -221,8 +221,8 @@ class JsonFilter(HostFilter):
required_ram = instance_type['memory_mb']
required_disk = instance_type['local_gb']
query = ['and',
['>=', '$compute.host_memory.free', required_ram],
['>=', '$compute.disk.available', required_disk]
['>=', '$compute.host_memory_free', required_ram],
['>=', '$compute.disk_available', required_disk]
]
return (self._full_name(), json.dumps(query))

View File

@@ -21,6 +21,7 @@ Tests For Compute
import datetime
import mox
import stubout
from nova import compute
from nova import context
@@ -52,6 +53,10 @@ class FakeTime(object):
self.counter += t
def nop_report_driver_status(self):
pass
class ComputeTestCase(test.TestCase):
"""Test case for compute"""
def setUp(self):
@@ -649,6 +654,10 @@ class ComputeTestCase(test.TestCase):
def test_run_kill_vm(self):
"""Detect when a vm is terminated behind the scenes"""
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(compute_manager.ComputeManager,
'_report_driver_status', nop_report_driver_status)
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)

View File

@@ -43,16 +43,16 @@ class HostFilterTestCase(test.TestCase):
# which means ... don't go above 10 hosts.
return {'host_name-description': 'XenServer %s' % multiplier,
'host_hostname': 'xs-%s' % multiplier,
'host_memory': {'total': 100,
'overhead': 10,
'free': 10 + multiplier * 10,
'free-computed': 10 + multiplier * 10},
'host_memory_total': 100,
'host_memory_overhead': 10,
'host_memory_free': 10 + multiplier * 10,
'host_memory_free-computed': 10 + multiplier * 10,
'host_other-config': {},
'host_ip_address': '192.168.1.%d' % (100 + multiplier),
'host_cpu_info': {},
'disk': {'available': 100 + multiplier * 100,
'total': 1000,
'used': 0},
'disk_available': 100 + multiplier * 100,
'disk_total': 1000,
'disk_used': 0,
'host_uuid': 'xxx-%d' % multiplier,
'host_name-label': 'xs-%s' % multiplier}
@@ -131,12 +131,12 @@ class HostFilterTestCase(test.TestCase):
raw = ['or',
['and',
['<', '$compute.host_memory.free', 30],
['<', '$compute.disk.available', 300]
['<', '$compute.host_memory_free', 30],
['<', '$compute.disk_available', 300]
],
['and',
['>', '$compute.host_memory.free', 70],
['>', '$compute.disk.available', 700]
['>', '$compute.host_memory_free', 70],
['>', '$compute.disk_available', 700]
]
]
cooked = json.dumps(raw)
@@ -149,7 +149,7 @@ class HostFilterTestCase(test.TestCase):
self.assertEquals('host%02d' % index, host)
raw = ['not',
['=', '$compute.host_memory.free', 30],
['=', '$compute.host_memory_free', 30],
]
cooked = json.dumps(raw)
hosts = driver.filter_hosts(self.zone_manager, cooked)
@@ -160,7 +160,7 @@ class HostFilterTestCase(test.TestCase):
for index, host in zip([1, 2, 4, 5, 6, 7, 8, 9, 10], just_hosts):
self.assertEquals('host%02d' % index, host)
raw = ['in', '$compute.host_memory.free', 20, 40, 60, 80, 100]
raw = ['in', '$compute.host_memory_free', 20, 40, 60, 80, 100]
cooked = json.dumps(raw)
hosts = driver.filter_hosts(self.zone_manager, cooked)

View File

@@ -17,6 +17,7 @@
"""Test suite for XenAPI."""
import functools
import json
import os
import re
import stubout
@@ -665,3 +666,52 @@ class XenAPIDetermineDiskImageTestCase(test.TestCase):
self.fake_instance.image_id = glance_stubs.FakeGlance.IMAGE_VHD
self.fake_instance.kernel_id = None
self.assert_disk_type(vm_utils.ImageType.DISK_VHD)
class FakeXenApi(object):
"""Fake XenApi for testing HostState."""
class FakeSR(object):
def get_record(self, ref):
return {'virtual_allocation': 10000,
'physical_utilisation': 20000}
SR = FakeSR()
class FakeSession(object):
"""Fake Session class for HostState testing."""
def async_call_plugin(self, *args):
return None
def wait_for_task(self, *args):
vm = {'total': 10,
'overhead': 20,
'free': 30,
'free-computed': 40}
return json.dumps({'host_memory': vm})
def get_xenapi(self):
return FakeXenApi()
class HostStateTestCase(test.TestCase):
"""Tests HostState, which holds metrics from XenServer that get
reported back to the Schedulers."""
def _fake_safe_find_sr(self, session):
"""None SR ref since we're ignoring it in FakeSR."""
return None
def test_host_state(self):
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(vm_utils, 'safe_find_sr', self._fake_safe_find_sr)
host_state = xenapi_conn.HostState(FakeSession())
stats = host_state._stats
self.assertEquals(stats['disk_total'], 10000)
self.assertEquals(stats['disk_used'], 20000)
self.assertEquals(stats['host_memory_total'], 10)
self.assertEquals(stats['host_memory_overhead'], 20)
self.assertEquals(stats['host_memory_free'], 30)
self.assertEquals(stats['host_memory_free_computed'], 40)

View File

@@ -486,3 +486,11 @@ class HyperVConnection(driver.ComputeDriver):
def update_available_resource(self, ctxt, host):
"""This method is supported only by libvirt."""
return
def update_host_status(self):
"""See xenapi_conn.py implementation."""
pass
def get_host_stats(self, refresh=False):
"""See xenapi_conn.py implementation."""
pass

View File

@@ -1582,6 +1582,14 @@ class LibvirtConnection(driver.ComputeDriver):
"""See comments of same method in firewall_driver."""
self.firewall_driver.unfilter_instance(instance_ref)
def update_host_status(self):
"""See xenapi_conn.py implementation."""
pass
def get_host_stats(self, refresh=False):
"""See xenapi_conn.py implementation."""
pass
class FirewallDriver(object):
def prepare_instance_filter(self, instance, network_info=None):

View File

@@ -57,6 +57,8 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
- suffix "_rec" for record objects
"""
import json
import random
import sys
import urlparse
import xmlrpclib
@@ -67,10 +69,12 @@ from eventlet import timeout
from nova import context
from nova import db
from nova import exception
from nova import utils
from nova import flags
from nova import log as logging
from nova.virt import driver
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi.vmops import VMOps
from nova.virt.xenapi.volumeops import VolumeOps
@@ -168,6 +172,13 @@ class XenAPIConnection(driver.ComputeDriver):
session = XenAPISession(url, user, pw)
self._vmops = VMOps(session)
self._volumeops = VolumeOps(session)
self._host_state = None
@property
def HostState(self):
if not self._host_state:
self._host_state = HostState(self.session)
return self._host_state
def init_host(self, host):
#FIXME(armando): implement this
@@ -315,6 +326,16 @@ class XenAPIConnection(driver.ComputeDriver):
"""This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.')
def update_host_status(self):
"""Update the status info of the host, and return those values
to the calling program."""
return self.HostState.update_status()
def get_host_stats(self, refresh=False):
"""Return the current state of the host. If 'refresh' is
True, run the update first."""
return self.HostState.get_host_stats(refresh=refresh)
class XenAPISession(object):
"""The session to invoke XenAPI SDK calls"""
@@ -436,6 +457,65 @@ class XenAPISession(object):
raise
class HostState(object):
"""Manages information about the XenServer host this compute
node is running on.
"""
def __init__(self, session):
super(HostState, self).__init__()
self._session = session
self._stats = {}
self.update_status()
def get_host_stats(self, refresh=False):
"""Return the current state of the host. If 'refresh' is
True, run the update first.
"""
if refresh:
self.update_status()
return self._stats
def update_status(self):
"""Since under Xenserver, a compute node runs on a given host,
we can get host status information using xenapi.
"""
LOG.debug(_("Updating host stats"))
# Make it something unlikely to match any actual instance ID
task_id = random.randint(-80000, -70000)
task = self._session.async_call_plugin("xenhost", "host_data", {})
task_result = self._session.wait_for_task(task, task_id)
if not task_result:
task_result = json.dumps("")
try:
data = json.loads(task_result)
except ValueError as e:
# Invalid JSON object
LOG.error(_("Unable to get updated status: %s") % e)
return
# Get the SR usage
try:
sr_ref = vm_utils.safe_find_sr(self._session)
except exception.NotFound as e:
# No SR configured
LOG.error(_("Unable to get SR for this host: %s") % e)
return
sr_rec = self._session.get_xenapi().SR.get_record(sr_ref)
total = int(sr_rec["virtual_allocation"])
used = int(sr_rec["physical_utilisation"])
data["disk_total"] = total
data["disk_used"] = used
data["disk_available"] = total - used
host_memory = data.get('host_memory', None)
if host_memory:
data["host_memory_total"] = host_memory.get('total', 0)
data["host_memory_overhead"] = host_memory.get('overhead', 0)
data["host_memory_free"] = host_memory.get('free', 0)
data["host_memory_free_computed"] = \
host_memory.get('free-computed', 0)
del data['host_memory']
self._stats = data
def _parse_xmlrpc_value(val):
"""Parse the given value as if it were an XML-RPC value. This is
sometimes used as the format for the task.result field."""

View File

@@ -33,3 +33,4 @@ nova-adminclient
suds==0.4
coverage
nosexcover
GitPython