Fix scheduler code to use monitor metric objects
Per bug 1468012 we changed the monitor metric reporting to use versioned Monitor Metric objects instead of plain old dictionaries. This fix addresses a code refactoring needed inside nova/scheduler/host_manager.py - in the _update_metrics_from_compute_node to use the monitor metric object instead of the current non object implementation. Further, it fixes the metrics_filter. The metrics_filter was expecting a dict of metrics but now it has to adapt to the MonitorMetricList object instead. Closes-bug: 1485082 Change-Id: I4dfbea27ce6c3eecc1a8658b1f9dc0feb2298705
This commit is contained in:
@@ -10,6 +10,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
from oslo_serialization import jsonutils
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
|
|
||||||
from nova.objects import base
|
from nova.objects import base
|
||||||
@@ -74,6 +75,19 @@ class MonitorMetricList(base.ObjectListBase, base.NovaObject):
|
|||||||
'objects': [('1.0', '1.0'), ('1.1', '1.1')],
|
'objects': [('1.0', '1.0'), ('1.1', '1.1')],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_json(cls, metrics):
|
||||||
|
"""Converts a legacy json object into a list of MonitorMetric objs
|
||||||
|
and finally returns of MonitorMetricList
|
||||||
|
|
||||||
|
:param metrics: a string of json serialized objects
|
||||||
|
:returns: a MonitorMetricList Object.
|
||||||
|
"""
|
||||||
|
metrics = jsonutils.loads(metrics) if metrics else []
|
||||||
|
metric_list = [
|
||||||
|
MonitorMetric(**metric) for metric in metrics]
|
||||||
|
return MonitorMetricList(objects=metric_list)
|
||||||
|
|
||||||
# NOTE(jaypipes): This method exists to convert the object to the
|
# NOTE(jaypipes): This method exists to convert the object to the
|
||||||
# format expected by the RPC notifier for metrics events.
|
# format expected by the RPC notifier for metrics events.
|
||||||
def to_list(self):
|
def to_list(self):
|
||||||
|
|||||||
@@ -41,13 +41,15 @@ class MetricsFilter(filters.BaseHostFilter):
|
|||||||
sep='=',
|
sep='=',
|
||||||
converter=float,
|
converter=float,
|
||||||
name="metrics.weight_setting")
|
name="metrics.weight_setting")
|
||||||
self.keys = [x[0] for x in opts]
|
self.keys = set([x[0] for x in opts])
|
||||||
|
|
||||||
def host_passes(self, host_state, filter_properties):
|
def host_passes(self, host_state, filter_properties):
|
||||||
unavail = [i for i in self.keys if i not in host_state.metrics]
|
metrics_on_host = set(m.name for m in host_state.metrics)
|
||||||
if unavail:
|
if not self.keys.issubset(metrics_on_host):
|
||||||
|
unavail = metrics_on_host - self.keys
|
||||||
LOG.debug("%(host_state)s does not have the following "
|
LOG.debug("%(host_state)s does not have the following "
|
||||||
"metrics: %(metrics)s",
|
"metrics: %(metrics)s",
|
||||||
{'host_state': host_state,
|
{'host_state': host_state,
|
||||||
'metrics': ', '.join(unavail)})
|
'metrics': ', '.join(unavail)})
|
||||||
return len(unavail) == 0
|
return False
|
||||||
|
return True
|
||||||
|
|||||||
@@ -28,7 +28,6 @@ except ImportError:
|
|||||||
import iso8601
|
import iso8601
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_serialization import jsonutils
|
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@@ -150,7 +149,7 @@ class HostState(object):
|
|||||||
self.limits = {}
|
self.limits = {}
|
||||||
|
|
||||||
# Generic metrics from compute nodes
|
# Generic metrics from compute nodes
|
||||||
self.metrics = {}
|
self.metrics = None
|
||||||
|
|
||||||
# List of aggregates the host belongs to
|
# List of aggregates the host belongs to
|
||||||
self.aggregates = []
|
self.aggregates = []
|
||||||
@@ -165,27 +164,6 @@ class HostState(object):
|
|||||||
def update_service(self, service):
|
def update_service(self, service):
|
||||||
self.service = ReadOnlyDict(service)
|
self.service = ReadOnlyDict(service)
|
||||||
|
|
||||||
def _update_metrics_from_compute_node(self, compute):
|
|
||||||
"""Update metrics from a ComputeNode object."""
|
|
||||||
# NOTE(llu): The 'or []' is to avoid json decode failure of None
|
|
||||||
# returned from compute.get, because DB schema allows
|
|
||||||
# NULL in the metrics column
|
|
||||||
metrics = compute.metrics or []
|
|
||||||
if metrics:
|
|
||||||
metrics = jsonutils.loads(metrics)
|
|
||||||
for metric in metrics:
|
|
||||||
# 'name', 'value', 'timestamp' and 'source' are all required
|
|
||||||
# to be valid keys, just let KeyError happen if any one of
|
|
||||||
# them is missing. But we also require 'name' to be True.
|
|
||||||
name = metric['name']
|
|
||||||
item = MetricItem(value=metric['value'],
|
|
||||||
timestamp=metric['timestamp'],
|
|
||||||
source=metric['source'])
|
|
||||||
if name:
|
|
||||||
self.metrics[name] = item
|
|
||||||
else:
|
|
||||||
LOG.warning(_LW("Metric name unknown of %r"), item)
|
|
||||||
|
|
||||||
def update_from_compute_node(self, compute):
|
def update_from_compute_node(self, compute):
|
||||||
"""Update information about a host from a ComputeNode object."""
|
"""Update information about a host from a ComputeNode object."""
|
||||||
if (self.updated and compute.updated_at
|
if (self.updated and compute.updated_at
|
||||||
@@ -244,7 +222,7 @@ class HostState(object):
|
|||||||
self.num_io_ops = int(self.stats.get('io_workload', 0))
|
self.num_io_ops = int(self.stats.get('io_workload', 0))
|
||||||
|
|
||||||
# update metrics
|
# update metrics
|
||||||
self._update_metrics_from_compute_node(compute)
|
self.metrics = objects.MonitorMetricList.from_json(compute.metrics)
|
||||||
|
|
||||||
def consume_from_instance(self, instance):
|
def consume_from_instance(self, instance):
|
||||||
"""Incrementally update host state from an instance."""
|
"""Incrementally update host state from an instance."""
|
||||||
|
|||||||
@@ -10,6 +10,8 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
from nova import objects
|
||||||
from nova.scheduler.filters import metrics_filter
|
from nova.scheduler.filters import metrics_filter
|
||||||
from nova import test
|
from nova import test
|
||||||
from nova.tests.unit.scheduler import fakes
|
from nova.tests.unit.scheduler import fakes
|
||||||
@@ -18,17 +20,32 @@ from nova.tests.unit.scheduler import fakes
|
|||||||
class TestMetricsFilter(test.NoDBTestCase):
|
class TestMetricsFilter(test.NoDBTestCase):
|
||||||
|
|
||||||
def test_metrics_filter_pass(self):
|
def test_metrics_filter_pass(self):
|
||||||
self.flags(weight_setting=['foo=1', 'bar=2'], group='metrics')
|
_ts_now = datetime.datetime(2015, 11, 11, 11, 0, 0)
|
||||||
|
obj1 = objects.MonitorMetric(name='cpu.frequency',
|
||||||
|
value=1000,
|
||||||
|
timestamp=_ts_now,
|
||||||
|
source='nova.virt.libvirt.driver')
|
||||||
|
obj2 = objects.MonitorMetric(name='numa.membw.current',
|
||||||
|
numa_membw_values={"0": 10, "1": 43},
|
||||||
|
timestamp=_ts_now,
|
||||||
|
source='nova.virt.libvirt.driver')
|
||||||
|
metrics_list = objects.MonitorMetricList(objects=[obj1, obj2])
|
||||||
|
self.flags(weight_setting=[
|
||||||
|
'cpu.frequency=1', 'numa.membw.current=2'], group='metrics')
|
||||||
filt_cls = metrics_filter.MetricsFilter()
|
filt_cls = metrics_filter.MetricsFilter()
|
||||||
metrics = dict(foo=1, bar=2)
|
|
||||||
host = fakes.FakeHostState('host1', 'node1',
|
host = fakes.FakeHostState('host1', 'node1',
|
||||||
attribute_dict={'metrics': metrics})
|
attribute_dict={'metrics': metrics_list})
|
||||||
self.assertTrue(filt_cls.host_passes(host, None))
|
self.assertTrue(filt_cls.host_passes(host, None))
|
||||||
|
|
||||||
def test_metrics_filter_missing_metrics(self):
|
def test_metrics_filter_missing_metrics(self):
|
||||||
|
_ts_now = datetime.datetime(2015, 11, 11, 11, 0, 0)
|
||||||
|
obj1 = objects.MonitorMetric(name='cpu.frequency',
|
||||||
|
value=1000,
|
||||||
|
timestamp=_ts_now,
|
||||||
|
source='nova.virt.libvirt.driver')
|
||||||
|
metrics_list = objects.MonitorMetricList(objects=[obj1])
|
||||||
self.flags(weight_setting=['foo=1', 'bar=2'], group='metrics')
|
self.flags(weight_setting=['foo=1', 'bar=2'], group='metrics')
|
||||||
filt_cls = metrics_filter.MetricsFilter()
|
filt_cls = metrics_filter.MetricsFilter()
|
||||||
metrics = dict(foo=1)
|
|
||||||
host = fakes.FakeHostState('host1', 'node1',
|
host = fakes.FakeHostState('host1', 'node1',
|
||||||
attribute_dict={'metrics': metrics})
|
attribute_dict={'metrics': metrics_list})
|
||||||
self.assertFalse(filt_cls.host_passes(host, None))
|
self.assertFalse(filt_cls.host_passes(host, None))
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ Tests For HostManager
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
|
import datetime
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@@ -956,15 +957,16 @@ class HostStateTestCase(test.NoDBTestCase):
|
|||||||
self.assertEqual(0, len(host.pci_stats.pools))
|
self.assertEqual(0, len(host.pci_stats.pools))
|
||||||
|
|
||||||
def test_resources_consumption_from_compute_node(self):
|
def test_resources_consumption_from_compute_node(self):
|
||||||
|
_ts_now = datetime.datetime(2015, 11, 11, 11, 0, 0)
|
||||||
metrics = [
|
metrics = [
|
||||||
dict(name='res1',
|
dict(name='cpu.frequency',
|
||||||
value=1.0,
|
value=1.0,
|
||||||
source='source1',
|
source='source1',
|
||||||
timestamp=None),
|
timestamp=_ts_now),
|
||||||
dict(name='res2',
|
dict(name='numa.membw.current',
|
||||||
value="string2",
|
numa_membw_values={"0": 10, "1": 43},
|
||||||
source='source2',
|
source='source2',
|
||||||
timestamp=None),
|
timestamp=_ts_now),
|
||||||
]
|
]
|
||||||
hyper_ver_int = utils.convert_version_to_int('6.0.0')
|
hyper_ver_int = utils.convert_version_to_int('6.0.0')
|
||||||
compute = objects.ComputeNode(
|
compute = objects.ComputeNode(
|
||||||
@@ -983,9 +985,11 @@ class HostStateTestCase(test.NoDBTestCase):
|
|||||||
host.update_from_compute_node(compute)
|
host.update_from_compute_node(compute)
|
||||||
|
|
||||||
self.assertEqual(len(host.metrics), 2)
|
self.assertEqual(len(host.metrics), 2)
|
||||||
self.assertEqual(set(['res1', 'res2']), set(host.metrics.keys()))
|
self.assertEqual(1.0, host.metrics.to_list()[0]['value'])
|
||||||
self.assertEqual(1.0, host.metrics['res1'].value)
|
self.assertEqual('source1', host.metrics[0].source)
|
||||||
self.assertEqual('source1', host.metrics['res1'].source)
|
self.assertEqual('cpu.frequency', host.metrics[0].name)
|
||||||
self.assertEqual('string2', host.metrics['res2'].value)
|
self.assertEqual('numa.membw.current', host.metrics[1].name)
|
||||||
self.assertEqual('source2', host.metrics['res2'].source)
|
self.assertEqual('source2', host.metrics.to_list()[1]['source'])
|
||||||
|
self.assertEqual({'0': 10, '1': 43},
|
||||||
|
host.metrics[1].numa_membw_values)
|
||||||
self.assertIsInstance(host.numa_topology, six.string_types)
|
self.assertIsInstance(host.numa_topology, six.string_types)
|
||||||
|
|||||||
Reference in New Issue
Block a user