Make compute_capabilities_filter use information from DB not RPC
Make compute_capabilities_filter directly use the data in host_state and not the data capabilities data that is currently broadcast from the schedulers over RPC, as this will be removed since it doesn't scale. The filter name, compute_capabilities_filter, is kept the same to keep backwards compatibility. This is a second attempt at this patch the first one broke the ability to filter an arbitrary key value pairs, this time that is supported via host_state.stats, along with tests to confirm Once all the capabilities usage in scheduler filters is removed, the capabilities RPC broadcast can be removed. Part of bp no-compute-fanout-to-scheduler. Change-Id: I0b9b05a25a8c1841093913c4a2fb19b48aff2ae1
This commit is contained in:
@@ -28,8 +28,8 @@ class ComputeCapabilitiesFilter(filters.BaseHostFilter):
|
|||||||
# Instance type and host capabilities do not change within a request
|
# Instance type and host capabilities do not change within a request
|
||||||
run_filter_once_per_request = True
|
run_filter_once_per_request = True
|
||||||
|
|
||||||
def _satisfies_extra_specs(self, capabilities, instance_type):
|
def _satisfies_extra_specs(self, host_state, instance_type):
|
||||||
"""Check that the capabilities provided by the compute service
|
"""Check that the host_state provided by the compute service
|
||||||
satisfy the extra specs associated with the instance type.
|
satisfy the extra specs associated with the instance type.
|
||||||
"""
|
"""
|
||||||
if 'extra_specs' not in instance_type:
|
if 'extra_specs' not in instance_type:
|
||||||
@@ -43,10 +43,17 @@ class ComputeCapabilitiesFilter(filters.BaseHostFilter):
|
|||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
del scope[0]
|
del scope[0]
|
||||||
cap = capabilities
|
cap = host_state
|
||||||
for index in range(0, len(scope)):
|
for index in range(0, len(scope)):
|
||||||
try:
|
try:
|
||||||
cap = cap.get(scope[index], None)
|
if not isinstance(cap, dict):
|
||||||
|
if getattr(cap, scope[index], None) is None:
|
||||||
|
# If can't find, check stats dict
|
||||||
|
cap = cap.stats.get(scope[index], None)
|
||||||
|
else:
|
||||||
|
cap = getattr(cap, scope[index], None)
|
||||||
|
else:
|
||||||
|
cap = cap.get(scope[index], None)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
return False
|
return False
|
||||||
if cap is None:
|
if cap is None:
|
||||||
@@ -58,7 +65,7 @@ class ComputeCapabilitiesFilter(filters.BaseHostFilter):
|
|||||||
def host_passes(self, host_state, filter_properties):
|
def host_passes(self, host_state, filter_properties):
|
||||||
"""Return a list of hosts that can create instance_type."""
|
"""Return a list of hosts that can create instance_type."""
|
||||||
instance_type = filter_properties.get('instance_type')
|
instance_type = filter_properties.get('instance_type')
|
||||||
if not self._satisfies_extra_specs(host_state.capabilities,
|
if not self._satisfies_extra_specs(host_state,
|
||||||
instance_type):
|
instance_type):
|
||||||
LOG.debug(_("%(host_state)s fails instance_type extra_specs "
|
LOG.debug(_("%(host_state)s fails instance_type extra_specs "
|
||||||
"requirements"), {'host_state': host_state})
|
"requirements"), {'host_state': host_state})
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ from nova.compute import vm_states
|
|||||||
from nova import db
|
from nova import db
|
||||||
from nova import exception
|
from nova import exception
|
||||||
from nova.openstack.common.gettextutils import _
|
from nova.openstack.common.gettextutils import _
|
||||||
|
from nova.openstack.common import jsonutils
|
||||||
from nova.openstack.common import log as logging
|
from nova.openstack.common import log as logging
|
||||||
from nova.openstack.common import timeutils
|
from nova.openstack.common import timeutils
|
||||||
from nova.scheduler import filters
|
from nova.scheduler import filters
|
||||||
@@ -122,6 +123,11 @@ class HostState(object):
|
|||||||
|
|
||||||
# Other information
|
# Other information
|
||||||
self.host_ip = None
|
self.host_ip = None
|
||||||
|
self.hypervisor_type = None
|
||||||
|
self.hypervisor_version = None
|
||||||
|
self.hypervisor_hostname = None
|
||||||
|
self.cpu_info = None
|
||||||
|
self.supported_instances = None
|
||||||
|
|
||||||
# Resource oversubscription values for the compute host:
|
# Resource oversubscription values for the compute host:
|
||||||
self.limits = {}
|
self.limits = {}
|
||||||
@@ -161,41 +167,53 @@ class HostState(object):
|
|||||||
self.vcpus_used = compute['vcpus_used']
|
self.vcpus_used = compute['vcpus_used']
|
||||||
self.updated = compute['updated_at']
|
self.updated = compute['updated_at']
|
||||||
|
|
||||||
|
# All virt drivers report host_ip
|
||||||
self.host_ip = compute['host_ip']
|
self.host_ip = compute['host_ip']
|
||||||
|
self.hypervisor_type = compute.get('hypervisor_type')
|
||||||
|
self.hypervisor_version = compute.get('hypervisor_version')
|
||||||
|
self.hypervisor_hostname = compute.get('hypervisor_hostname')
|
||||||
|
self.cpu_info = compute.get('cpu_info')
|
||||||
|
if compute.get('supported_instances'):
|
||||||
|
self.supported_instances = jsonutils.loads(
|
||||||
|
compute.get('supported_instances'))
|
||||||
|
|
||||||
stats = compute.get('stats', [])
|
# Don't store stats directly in host_state to make sure these don't
|
||||||
statmap = self._statmap(stats)
|
# overwrite any values, or get overwritten themselves. Store in self so
|
||||||
|
# filters can schedule with them.
|
||||||
|
self.stats = self._statmap(compute.get('stats', []))
|
||||||
|
|
||||||
# Track number of instances on host
|
# Track number of instances on host
|
||||||
self.num_instances = int(statmap.get('num_instances', 0))
|
self.num_instances = int(self.stats.get('num_instances', 0))
|
||||||
|
|
||||||
# Track number of instances by project_id
|
# Track number of instances by project_id
|
||||||
project_id_keys = [k for k in statmap.keys() if
|
project_id_keys = [k for k in self.stats.keys() if
|
||||||
k.startswith("num_proj_")]
|
k.startswith("num_proj_")]
|
||||||
for key in project_id_keys:
|
for key in project_id_keys:
|
||||||
project_id = key[9:]
|
project_id = key[9:]
|
||||||
self.num_instances_by_project[project_id] = int(statmap[key])
|
self.num_instances_by_project[project_id] = int(self.stats[key])
|
||||||
|
|
||||||
# Track number of instances in certain vm_states
|
# Track number of instances in certain vm_states
|
||||||
vm_state_keys = [k for k in statmap.keys() if k.startswith("num_vm_")]
|
vm_state_keys = [k for k in self.stats.keys() if
|
||||||
|
k.startswith("num_vm_")]
|
||||||
for key in vm_state_keys:
|
for key in vm_state_keys:
|
||||||
vm_state = key[7:]
|
vm_state = key[7:]
|
||||||
self.vm_states[vm_state] = int(statmap[key])
|
self.vm_states[vm_state] = int(self.stats[key])
|
||||||
|
|
||||||
# Track number of instances in certain task_states
|
# Track number of instances in certain task_states
|
||||||
task_state_keys = [k for k in statmap.keys() if
|
task_state_keys = [k for k in self.stats.keys() if
|
||||||
k.startswith("num_task_")]
|
k.startswith("num_task_")]
|
||||||
for key in task_state_keys:
|
for key in task_state_keys:
|
||||||
task_state = key[9:]
|
task_state = key[9:]
|
||||||
self.task_states[task_state] = int(statmap[key])
|
self.task_states[task_state] = int(self.stats[key])
|
||||||
|
|
||||||
# Track number of instances by host_type
|
# Track number of instances by host_type
|
||||||
os_keys = [k for k in statmap.keys() if k.startswith("num_os_type_")]
|
os_keys = [k for k in self.stats.keys() if
|
||||||
|
k.startswith("num_os_type_")]
|
||||||
for key in os_keys:
|
for key in os_keys:
|
||||||
os = key[12:]
|
os = key[12:]
|
||||||
self.num_instances_by_os_type[os] = int(statmap[key])
|
self.num_instances_by_os_type[os] = int(self.stats[key])
|
||||||
|
|
||||||
self.num_io_ops = int(statmap.get('io_workload', 0))
|
self.num_io_ops = int(self.stats.get('io_workload', 0))
|
||||||
|
|
||||||
def consume_from_instance(self, instance):
|
def consume_from_instance(self, instance):
|
||||||
"""Incrementally update host state from an instance."""
|
"""Incrementally update host state from an instance."""
|
||||||
|
|||||||
@@ -732,27 +732,27 @@ class HostFiltersTestCase(test.NoDBTestCase):
|
|||||||
service = {'disabled': False}
|
service = {'disabled': False}
|
||||||
filter_properties = {'instance_type': {'memory_mb': 1024,
|
filter_properties = {'instance_type': {'memory_mb': 1024,
|
||||||
'extra_specs': especs}}
|
'extra_specs': especs}}
|
||||||
host = fakes.FakeHostState('host1', 'node1',
|
host_state = {'free_ram_mb': 1024, 'service': service}
|
||||||
{'free_ram_mb': 1024, 'capabilities': capabilities,
|
host_state.update(capabilities)
|
||||||
'service': service})
|
host = fakes.FakeHostState('host1', 'node1', host_state)
|
||||||
assertion = self.assertTrue if passes else self.assertFalse
|
assertion = self.assertTrue if passes else self.assertFalse
|
||||||
assertion(filt_cls.host_passes(host, filter_properties))
|
assertion(filt_cls.host_passes(host, filter_properties))
|
||||||
|
|
||||||
def test_compute_filter_passes_extra_specs_simple(self):
|
def test_compute_filter_passes_extra_specs_simple(self):
|
||||||
self._do_test_compute_filter_extra_specs(
|
self._do_test_compute_filter_extra_specs(
|
||||||
ecaps={'opt1': 1, 'opt2': 2},
|
ecaps={'stats': {'opt1': 1, 'opt2': 2}},
|
||||||
especs={'opt1': '1', 'opt2': '2', 'trust:trusted_host': 'true'},
|
especs={'opt1': '1', 'opt2': '2', 'trust:trusted_host': 'true'},
|
||||||
passes=True)
|
passes=True)
|
||||||
|
|
||||||
def test_compute_filter_fails_extra_specs_simple(self):
|
def test_compute_filter_fails_extra_specs_simple(self):
|
||||||
self._do_test_compute_filter_extra_specs(
|
self._do_test_compute_filter_extra_specs(
|
||||||
ecaps={'opt1': 1, 'opt2': 2},
|
ecaps={'stats': {'opt1': 1, 'opt2': 2}},
|
||||||
especs={'opt1': '1', 'opt2': '222', 'trust:trusted_host': 'true'},
|
especs={'opt1': '1', 'opt2': '222', 'trust:trusted_host': 'true'},
|
||||||
passes=False)
|
passes=False)
|
||||||
|
|
||||||
def test_compute_filter_pass_extra_specs_simple_with_scope(self):
|
def test_compute_filter_pass_extra_specs_simple_with_scope(self):
|
||||||
self._do_test_compute_filter_extra_specs(
|
self._do_test_compute_filter_extra_specs(
|
||||||
ecaps={'opt1': 1, 'opt2': 2},
|
ecaps={'stats': {'opt1': 1, 'opt2': 2}},
|
||||||
especs={'capabilities:opt1': '1',
|
especs={'capabilities:opt1': '1',
|
||||||
'trust:trusted_host': 'true'},
|
'trust:trusted_host': 'true'},
|
||||||
passes=True)
|
passes=True)
|
||||||
@@ -773,7 +773,7 @@ class HostFiltersTestCase(test.NoDBTestCase):
|
|||||||
|
|
||||||
def test_compute_filter_extra_specs_pass_multi_level_with_scope(self):
|
def test_compute_filter_extra_specs_pass_multi_level_with_scope(self):
|
||||||
self._do_test_compute_filter_extra_specs(
|
self._do_test_compute_filter_extra_specs(
|
||||||
ecaps={'opt1': {'a': 1, 'b': {'aa': 2}}, 'opt2': 2},
|
ecaps={'stats': {'opt1': {'a': 1, 'b': {'aa': 2}}, 'opt2': 2}},
|
||||||
especs={'opt1:a': '1', 'capabilities:opt1:b:aa': '2',
|
especs={'opt1:a': '1', 'capabilities:opt1:b:aa': '2',
|
||||||
'trust:trusted_host': 'true'},
|
'trust:trusted_host': 'true'},
|
||||||
passes=True)
|
passes=True)
|
||||||
|
|||||||
@@ -435,9 +435,12 @@ class HostStateTestCase(test.NoDBTestCase):
|
|||||||
dict(key='num_os_type_windoze', value='1'),
|
dict(key='num_os_type_windoze', value='1'),
|
||||||
dict(key='io_workload', value='42'),
|
dict(key='io_workload', value='42'),
|
||||||
]
|
]
|
||||||
compute = dict(stats=stats, memory_mb=0, free_disk_gb=0, local_gb=0,
|
compute = dict(stats=stats, memory_mb=1, free_disk_gb=0, local_gb=0,
|
||||||
local_gb_used=0, free_ram_mb=0, vcpus=0, vcpus_used=0,
|
local_gb_used=0, free_ram_mb=0, vcpus=0, vcpus_used=0,
|
||||||
updated_at=None, host_ip='127.0.0.1')
|
updated_at=None, host_ip='127.0.0.1',
|
||||||
|
hypervisor_type='htype', hypervisor_version='1.1',
|
||||||
|
hypervisor_hostname='hostname', cpu_info='cpu_info',
|
||||||
|
supported_instances='{}')
|
||||||
|
|
||||||
host = host_manager.HostState("fakehost", "fakenode")
|
host = host_manager.HostState("fakehost", "fakenode")
|
||||||
host.update_from_compute_node(compute)
|
host.update_from_compute_node(compute)
|
||||||
@@ -452,6 +455,14 @@ class HostStateTestCase(test.NoDBTestCase):
|
|||||||
self.assertEqual(4, host.num_instances_by_os_type['linux'])
|
self.assertEqual(4, host.num_instances_by_os_type['linux'])
|
||||||
self.assertEqual(1, host.num_instances_by_os_type['windoze'])
|
self.assertEqual(1, host.num_instances_by_os_type['windoze'])
|
||||||
self.assertEqual(42, host.num_io_ops)
|
self.assertEqual(42, host.num_io_ops)
|
||||||
|
self.assertEqual(10, len(host.stats))
|
||||||
|
|
||||||
|
self.assertEqual('127.0.0.1', host.host_ip)
|
||||||
|
self.assertEqual('htype', host.hypervisor_type)
|
||||||
|
self.assertEqual('1.1', host.hypervisor_version)
|
||||||
|
self.assertEqual('hostname', host.hypervisor_hostname)
|
||||||
|
self.assertEqual('cpu_info', host.cpu_info)
|
||||||
|
self.assertEqual({}, host.supported_instances)
|
||||||
|
|
||||||
def test_stat_consumption_from_instance(self):
|
def test_stat_consumption_from_instance(self):
|
||||||
host = host_manager.HostState("fakehost", "fakenode")
|
host = host_manager.HostState("fakehost", "fakenode")
|
||||||
|
|||||||
Reference in New Issue
Block a user