Merge "Add lock to scheduler host state updating"
This commit is contained in:
commit
7825cc7a2d
|
@ -106,9 +106,10 @@ class HostState(object):
|
|||
previously used and lock down access.
|
||||
"""
|
||||
|
||||
def __init__(self, host, node, compute=None):
|
||||
def __init__(self, host, node):
|
||||
self.host = host
|
||||
self.nodename = node
|
||||
self._lock_name = (host, node)
|
||||
|
||||
# Mutable available resources.
|
||||
# These will change as resources are virtually "consumed".
|
||||
|
@ -151,13 +152,29 @@ class HostState(object):
|
|||
self.cpu_allocation_ratio = None
|
||||
|
||||
self.updated = None
|
||||
if compute:
|
||||
self.update_from_compute_node(compute)
|
||||
|
||||
def update_service(self, service):
|
||||
self.service = ReadOnlyDict(service)
|
||||
def update(self, compute=None, service=None, aggregates=None,
|
||||
inst_dict=None):
|
||||
"""Update all information about a host."""
|
||||
|
||||
def update_from_compute_node(self, compute):
|
||||
@utils.synchronized(self._lock_name)
|
||||
def _locked_update(self, compute, service, aggregates, inst_dict):
|
||||
if compute is not None:
|
||||
LOG.debug("Update host state from compute node: %s", compute)
|
||||
self._update_from_compute_node(compute)
|
||||
if aggregates is not None:
|
||||
LOG.debug("Update host state with aggregates: %s", aggregates)
|
||||
self.aggregates = aggregates
|
||||
if service is not None:
|
||||
LOG.debug("Update host state with service dict: %s", service)
|
||||
self.service = ReadOnlyDict(service)
|
||||
if inst_dict is not None:
|
||||
LOG.debug("Update host state with instances: %s", inst_dict)
|
||||
self.instances = inst_dict
|
||||
|
||||
return _locked_update(self, compute, service, aggregates, inst_dict)
|
||||
|
||||
def _update_from_compute_node(self, compute):
|
||||
"""Update information about a host from a ComputeNode object."""
|
||||
if (self.updated and compute.updated_at
|
||||
and self.updated > compute.updated_at):
|
||||
|
@ -285,7 +302,7 @@ class HostManager(object):
|
|||
|
||||
# Can be overridden in a subclass
|
||||
def host_state_cls(self, host, node, **kwargs):
|
||||
return HostState(host, node, **kwargs)
|
||||
return HostState(host, node)
|
||||
|
||||
def __init__(self):
|
||||
self.host_state_map = {}
|
||||
|
@ -527,19 +544,17 @@ class HostManager(object):
|
|||
node = compute.hypervisor_hostname
|
||||
state_key = (host, node)
|
||||
host_state = self.host_state_map.get(state_key)
|
||||
if host_state:
|
||||
host_state.update_from_compute_node(compute)
|
||||
else:
|
||||
if not host_state:
|
||||
host_state = self.host_state_cls(host, node, compute=compute)
|
||||
self.host_state_map[state_key] = host_state
|
||||
# We force to update the aggregates info each time a new request
|
||||
# comes in, because some changes on the aggregates could have been
|
||||
# happening after setting this field for the first time
|
||||
host_state.aggregates = [self.aggs_by_id[agg_id] for agg_id in
|
||||
self.host_aggregates_map[
|
||||
host_state.host]]
|
||||
host_state.update_service(dict(service))
|
||||
self._add_instance_info(context, compute, host_state)
|
||||
host_state.update(compute,
|
||||
dict(service),
|
||||
self._get_aggregates_info(host),
|
||||
self._get_instance_info(context, compute))
|
||||
|
||||
seen_nodes.add(state_key)
|
||||
|
||||
# remove compute nodes from host_state_map if they are not active
|
||||
|
@ -552,8 +567,12 @@ class HostManager(object):
|
|||
|
||||
return six.itervalues(self.host_state_map)
|
||||
|
||||
def _add_instance_info(self, context, compute, host_state):
|
||||
"""Adds the host instance info to the host_state object.
|
||||
def _get_aggregates_info(self, host):
|
||||
return [self.aggs_by_id[agg_id] for agg_id in
|
||||
self.host_aggregates_map[host]]
|
||||
|
||||
def _get_instance_info(self, context, compute):
|
||||
"""Gets the host instance info from the compute host.
|
||||
|
||||
Some older compute nodes may not be sending instance change updates to
|
||||
the Scheduler; other sites may disable this feature for performance
|
||||
|
@ -571,7 +590,7 @@ class HostManager(object):
|
|||
inst_list = objects.InstanceList.get_by_host(context, host_name)
|
||||
inst_dict = {instance.uuid: instance
|
||||
for instance in inst_list.objects}
|
||||
host_state.instances = inst_dict
|
||||
return inst_dict
|
||||
|
||||
def _recreate_instance_info(self, context, host_name):
|
||||
"""Get the InstanceList for the specified host, and store it in the
|
||||
|
|
|
@ -34,7 +34,7 @@ class IronicNodeState(host_manager.HostState):
|
|||
previously used and lock down access.
|
||||
"""
|
||||
|
||||
def update_from_compute_node(self, compute):
|
||||
def _update_from_compute_node(self, compute):
|
||||
"""Update information about a host from a ComputeNode object."""
|
||||
self.vcpus_total = compute.vcpus
|
||||
self.vcpus_used = compute.vcpus_used
|
||||
|
@ -82,14 +82,14 @@ class IronicHostManager(host_manager.HostManager):
|
|||
"""Factory function/property to create a new HostState."""
|
||||
compute = kwargs.get('compute')
|
||||
if compute and compute.get('hypervisor_type') == hv_type.IRONIC:
|
||||
return IronicNodeState(host, node, **kwargs)
|
||||
return IronicNodeState(host, node)
|
||||
else:
|
||||
return host_manager.HostState(host, node, **kwargs)
|
||||
return host_manager.HostState(host, node)
|
||||
|
||||
def _init_instance_info(self):
|
||||
"""Ironic hosts should not pass instance info."""
|
||||
pass
|
||||
|
||||
def _add_instance_info(self, context, compute, host_state):
|
||||
def _get_instance_info(self, context, compute):
|
||||
"""Ironic hosts should not pass instance info."""
|
||||
host_state.instances = {}
|
||||
return {}
|
||||
|
|
|
@ -188,7 +188,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
|||
# one host should be chose
|
||||
self.assertEqual(len(hosts), 1)
|
||||
|
||||
@mock.patch('nova.scheduler.host_manager.HostManager._add_instance_info')
|
||||
@mock.patch('nova.scheduler.host_manager.HostManager._get_instance_info')
|
||||
@mock.patch('nova.objects.ServiceList.get_by_binary',
|
||||
return_value=fakes.SERVICES)
|
||||
@mock.patch('nova.objects.ComputeNodeList.get_all',
|
||||
|
@ -198,7 +198,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
|||
'pci_requests': None})
|
||||
def test_schedule_chooses_best_host(self, mock_get_extra, mock_cn_get_all,
|
||||
mock_get_by_binary,
|
||||
mock_add_inst_info):
|
||||
mock_get_inst_info):
|
||||
"""If scheduler_host_subset_size is 1, the largest host with greatest
|
||||
weight should be returned.
|
||||
"""
|
||||
|
|
|
@ -467,7 +467,7 @@ class HostManagerTestCase(test.NoDBTestCase):
|
|||
8388608)
|
||||
|
||||
@mock.patch.object(nova.objects.InstanceList, 'get_by_host')
|
||||
@mock.patch.object(host_manager.HostState, 'update_from_compute_node')
|
||||
@mock.patch.object(host_manager.HostState, '_update_from_compute_node')
|
||||
@mock.patch.object(objects.ComputeNodeList, 'get_all')
|
||||
@mock.patch.object(objects.ServiceList, 'get_by_binary')
|
||||
def test_get_all_host_states_with_no_aggs(self, svc_get_by_binary,
|
||||
|
@ -484,7 +484,7 @@ class HostManagerTestCase(test.NoDBTestCase):
|
|||
self.assertEqual([], host_state.aggregates)
|
||||
|
||||
@mock.patch.object(nova.objects.InstanceList, 'get_by_host')
|
||||
@mock.patch.object(host_manager.HostState, 'update_from_compute_node')
|
||||
@mock.patch.object(host_manager.HostState, '_update_from_compute_node')
|
||||
@mock.patch.object(objects.ComputeNodeList, 'get_all')
|
||||
@mock.patch.object(objects.ServiceList, 'get_by_binary')
|
||||
def test_get_all_host_states_with_matching_aggs(self, svc_get_by_binary,
|
||||
|
@ -505,7 +505,7 @@ class HostManagerTestCase(test.NoDBTestCase):
|
|||
self.assertEqual([fake_agg], host_state.aggregates)
|
||||
|
||||
@mock.patch.object(nova.objects.InstanceList, 'get_by_host')
|
||||
@mock.patch.object(host_manager.HostState, 'update_from_compute_node')
|
||||
@mock.patch.object(host_manager.HostState, '_update_from_compute_node')
|
||||
@mock.patch.object(objects.ComputeNodeList, 'get_all')
|
||||
@mock.patch.object(objects.ServiceList, 'get_by_binary')
|
||||
def test_get_all_host_states_with_not_matching_aggs(self,
|
||||
|
@ -545,7 +545,8 @@ class HostManagerTestCase(test.NoDBTestCase):
|
|||
host_state = host_manager.HostState('host1', cn1)
|
||||
self.assertFalse(host_state.instances)
|
||||
mock_get_by_host.return_value = None
|
||||
hm._add_instance_info(context, cn1, host_state)
|
||||
host_state.update(
|
||||
inst_dict=hm._get_instance_info(context, cn1))
|
||||
self.assertFalse(mock_get_by_host.called)
|
||||
self.assertTrue(host_state.instances)
|
||||
self.assertEqual(host_state.instances['uuid1'], inst1)
|
||||
|
@ -567,7 +568,8 @@ class HostManagerTestCase(test.NoDBTestCase):
|
|||
host_state = host_manager.HostState('host1', cn1)
|
||||
self.assertFalse(host_state.instances)
|
||||
mock_get_by_host.return_value = objects.InstanceList(objects=[inst1])
|
||||
hm._add_instance_info(context, cn1, host_state)
|
||||
host_state.update(
|
||||
inst_dict=hm._get_instance_info(context, cn1))
|
||||
mock_get_by_host.assert_called_once_with(context, cn1.host)
|
||||
self.assertTrue(host_state.instances)
|
||||
self.assertEqual(host_state.instances['uuid1'], inst1)
|
||||
|
@ -803,7 +805,9 @@ class HostStateTestCase(test.NoDBTestCase):
|
|||
# update_from_compute_node() and consume_from_request() are tested
|
||||
# in HostManagerTestCase.test_get_all_host_states()
|
||||
|
||||
def test_stat_consumption_from_compute_node(self):
|
||||
@mock.patch('nova.utils.synchronized',
|
||||
side_effect=lambda a: lambda f: lambda *args: f(*args))
|
||||
def test_stat_consumption_from_compute_node(self, sync_mock):
|
||||
stats = {
|
||||
'num_instances': '5',
|
||||
'num_proj_12345': '3',
|
||||
|
@ -831,8 +835,9 @@ class HostStateTestCase(test.NoDBTestCase):
|
|||
cpu_allocation_ratio=16.0, ram_allocation_ratio=1.5)
|
||||
|
||||
host = host_manager.HostState("fakehost", "fakenode")
|
||||
host.update_from_compute_node(compute)
|
||||
host.update(compute=compute)
|
||||
|
||||
sync_mock.assert_called_once_with(("fakehost", "fakenode"))
|
||||
self.assertEqual(5, host.num_instances)
|
||||
self.assertEqual(42, host.num_io_ops)
|
||||
self.assertEqual(10, len(host.stats))
|
||||
|
@ -872,7 +877,7 @@ class HostStateTestCase(test.NoDBTestCase):
|
|||
cpu_allocation_ratio=16.0, ram_allocation_ratio=1.5)
|
||||
|
||||
host = host_manager.HostState("fakehost", "fakenode")
|
||||
host.update_from_compute_node(compute)
|
||||
host.update(compute=compute)
|
||||
self.assertEqual([], host.pci_stats.pools)
|
||||
self.assertEqual(hyper_ver_int, host.hypervisor_version)
|
||||
|
||||
|
@ -904,7 +909,7 @@ class HostStateTestCase(test.NoDBTestCase):
|
|||
cpu_allocation_ratio=16.0, ram_allocation_ratio=1.5)
|
||||
|
||||
host = host_manager.HostState("fakehost", "fakenode")
|
||||
host.update_from_compute_node(compute)
|
||||
host.update(compute=compute)
|
||||
|
||||
self.assertEqual(5, host.num_instances)
|
||||
self.assertEqual(42, host.num_io_ops)
|
||||
|
@ -1059,7 +1064,7 @@ class HostStateTestCase(test.NoDBTestCase):
|
|||
stats=None, pci_device_pools=None,
|
||||
cpu_allocation_ratio=16.0, ram_allocation_ratio=1.5)
|
||||
host = host_manager.HostState("fakehost", "fakenode")
|
||||
host.update_from_compute_node(compute)
|
||||
host.update(compute=compute)
|
||||
|
||||
self.assertEqual(len(host.metrics), 2)
|
||||
self.assertEqual(1.0, host.metrics.to_list()[0]['value'])
|
||||
|
|
|
@ -187,7 +187,7 @@ class IronicHostManagerChangedNodesTestCase(test.NoDBTestCase):
|
|||
|
||||
def test_update_from_compute_node(self):
|
||||
host = ironic_host_manager.IronicNodeState("fakehost", "fakenode")
|
||||
host.update_from_compute_node(self.compute_node)
|
||||
host.update(compute=self.compute_node)
|
||||
|
||||
self.assertEqual(1024, host.free_ram_mb)
|
||||
self.assertEqual(1024, host.total_usable_ram_mb)
|
||||
|
@ -201,7 +201,7 @@ class IronicHostManagerChangedNodesTestCase(test.NoDBTestCase):
|
|||
|
||||
def test_consume_identical_instance_from_compute(self):
|
||||
host = ironic_host_manager.IronicNodeState("fakehost", "fakenode")
|
||||
host.update_from_compute_node(self.compute_node)
|
||||
host.update(compute=self.compute_node)
|
||||
|
||||
self.assertIsNone(host.updated)
|
||||
spec_obj = objects.RequestSpec(
|
||||
|
@ -217,7 +217,7 @@ class IronicHostManagerChangedNodesTestCase(test.NoDBTestCase):
|
|||
|
||||
def test_consume_larger_instance_from_compute(self):
|
||||
host = ironic_host_manager.IronicNodeState("fakehost", "fakenode")
|
||||
host.update_from_compute_node(self.compute_node)
|
||||
host.update(compute=self.compute_node)
|
||||
|
||||
self.assertIsNone(host.updated)
|
||||
spec_obj = objects.RequestSpec(
|
||||
|
@ -232,7 +232,7 @@ class IronicHostManagerChangedNodesTestCase(test.NoDBTestCase):
|
|||
|
||||
def test_consume_smaller_instance_from_compute(self):
|
||||
host = ironic_host_manager.IronicNodeState("fakehost", "fakenode")
|
||||
host.update_from_compute_node(self.compute_node)
|
||||
host.update(compute=self.compute_node)
|
||||
|
||||
self.assertIsNone(host.updated)
|
||||
spec_obj = objects.RequestSpec(
|
||||
|
|
Loading…
Reference in New Issue