Use threadpool when building compute data model

Use the general purpose threadpool when building the nova compute
data model. Additionally, adds thorough explanation about theory of
operation.

Updates related test cases to better ensure the correct operation
of add_physical_layer.

Partially Implements: blueprint general-purpose-decision-engine-threadpool

Change-Id: I53ed32a4b2a089b05d1ffede629c9f4c5cb720c8
This commit is contained in:
Dantali0n
2019-10-23 11:18:50 +02:00
parent 2b6ee38327
commit c644e23ca0
2 changed files with 191 additions and 29 deletions

View File

@ -16,6 +16,8 @@
import os_resource_classes as orc
from oslo_log import log
from futurist import waiters
from watcher.common import nova_helper
from watcher.common import placement_helper
from watcher.decision_engine.model.collector import base
@ -23,6 +25,7 @@ from watcher.decision_engine.model import element
from watcher.decision_engine.model import model_root
from watcher.decision_engine.model.notification import nova
from watcher.decision_engine.scope import compute as compute_scope
from watcher.decision_engine import threading
LOG = log.getLogger(__name__)
@ -212,8 +215,12 @@ class NovaModelBuilder(base.BaseModelBuilder):
self.nova = osc.nova()
self.nova_helper = nova_helper.NovaHelper(osc=self.osc)
self.placement_helper = placement_helper.PlacementHelper(osc=self.osc)
self.executor = threading.DecisionEngineThreadPool()
def _collect_aggregates(self, host_aggregates, _nodes):
if not host_aggregates:
return
aggregate_list = self.call_retry(f=self.nova_helper.get_aggregate_list)
aggregate_ids = [aggregate['id'] for aggregate
in host_aggregates if 'id' in aggregate]
@ -229,6 +236,9 @@ class NovaModelBuilder(base.BaseModelBuilder):
_nodes.update(aggregate.hosts)
def _collect_zones(self, availability_zones, _nodes):
if not availability_zones:
return
service_list = self.call_retry(f=self.nova_helper.get_service_list)
zone_names = [zone['name'] for zone
in availability_zones]
@ -239,20 +249,71 @@ class NovaModelBuilder(base.BaseModelBuilder):
if service.zone in zone_names or include_all_nodes:
_nodes.add(service.host)
def _add_physical_layer(self):
"""Add the physical layer of the graph.
def _compute_node_future(self, future, future_instances):
"""Add compute node information to model and schedule instance info job
This includes components which represent actual infrastructure
hardware.
:param future: The future from the finished execution
:rtype future: :py:class:`futurist.GreenFuture`
:param future_instances: list of futures for instance jobs
:rtype future_instances: list :py:class:`futurist.GreenFuture`
"""
try:
node_info = future.result()[0]
# filter out baremetal node
if node_info.hypervisor_type == 'ironic':
LOG.debug("filtering out baremetal node: %s", node_info)
return
self.add_compute_node(node_info)
# node.servers is a list of server objects
# New in nova version 2.53
instances = getattr(node_info, "servers", None)
# Do not submit job if there are no instances on compute node
if instances is None:
LOG.info("No instances on compute_node: {0}".format(node_info))
return
future_instances.append(
self.executor.submit(
self.add_instance_node, node_info, instances)
)
except Exception:
LOG.error("compute node from aggregate / "
"availability_zone could not be found")
def _add_physical_layer(self):
"""Collects all information on compute nodes and instances
Will collect all required compute node and instance information based
on the host aggregates and availability zones. If aggregates and zones
do not specify any compute nodes all nodes are retrieved instead.
The collection of information happens concurrently using the
DecisionEngineThreadpool. The collection is parallelized in three steps
first information about aggregates and zones is gathered. Secondly,
for each of the compute nodes a tasks is submitted to get detailed
information about the compute node. Finally, Each of these submitted
tasks will submit an additional task if the compute node contains
instances. Before returning from this function all instance tasks are
waited upon to complete.
"""
compute_nodes = set()
host_aggregates = self.model_scope.get("host_aggregates")
availability_zones = self.model_scope.get("availability_zones")
if host_aggregates:
self._collect_aggregates(host_aggregates, compute_nodes)
if availability_zones:
self._collect_zones(availability_zones, compute_nodes)
"""Submit tasks to gather compute nodes from availability zones and
host aggregates. Each task adds compute nodes to the set, this set is
threadsafe under the assumption that CPython is used with the GIL
enabled."""
zone_aggregate_futures = {
self.executor.submit(
self._collect_aggregates, host_aggregates, compute_nodes),
self.executor.submit(
self._collect_zones, availability_zones, compute_nodes)
}
waiters.wait_for_all(zone_aggregate_futures)
# if zones and aggregates did not contain any nodes get every node.
if not compute_nodes:
self.no_model_scope_flag = True
all_nodes = self.call_retry(
@ -260,24 +321,20 @@ class NovaModelBuilder(base.BaseModelBuilder):
compute_nodes = set(
[node.hypervisor_hostname for node in all_nodes])
LOG.debug("compute nodes: %s", compute_nodes)
for node_name in compute_nodes:
cnode = self.call_retry(
self.nova_helper.get_compute_node_by_name,
node_name, servers=True, detailed=True)
if cnode:
node_info = cnode[0]
# filter out baremetal node
if node_info.hypervisor_type == 'ironic':
LOG.debug("filtering out baremetal node: %s", node_name)
continue
self.add_compute_node(node_info)
# node.servers is a list of server objects
# New in nova version 2.53
instances = getattr(node_info, "servers", None)
self.add_instance_node(node_info, instances)
else:
LOG.error("compute_node from aggregate / availability_zone "
"could not be found: {0}".format(node_name))
node_futures = [self.executor.submit(
self.nova_helper.get_compute_node_by_name,
node, servers=True, detailed=True)
for node in compute_nodes]
LOG.debug("submitted {0} jobs".format(len(compute_nodes)))
# Futures will concurrently be added, only safe with CPython GIL
future_instances = []
self.executor.do_while_futures_modify(
node_futures, self._compute_node_future, future_instances)
# Wait for all instance jobs to finish
waiters.wait_for_all(future_instances)
def add_compute_node(self, node):
# Build and add base node.

View File

@ -291,6 +291,15 @@ class TestNovaModelBuilder(base.TestCase):
self.assertEqual(set(['hostone', 'hosttwo']), result)
@mock.patch.object(nova_helper, 'NovaHelper')
def test_collect_aggregates_none(self, m_nova):
"""Test collect_aggregates with host_aggregates None"""
result = set()
t_nova_cluster = nova.NovaModelBuilder(mock.Mock())
t_nova_cluster._collect_aggregates(None, result)
self.assertEqual(set(), result)
@mock.patch.object(nova_helper, 'NovaHelper')
def test_collect_zones(self, m_nova):
""""""
@ -310,8 +319,35 @@ class TestNovaModelBuilder(base.TestCase):
self.assertEqual(set(['hostone']), result)
@mock.patch.object(nova_helper, 'NovaHelper')
def test_add_physical_layer(self, m_nova):
""""""
def test_collect_zones_none(self, m_nova):
"""Test collect_zones with availability_zones None"""
result = set()
t_nova_cluster = nova.NovaModelBuilder(mock.Mock())
t_nova_cluster._collect_zones(None, result)
self.assertEqual(set(), result)
@mock.patch.object(placement_helper, 'PlacementHelper')
@mock.patch.object(nova_helper, 'NovaHelper')
def test_add_physical_layer(self, m_nova, m_placement):
"""Ensure all three steps of the physical layer are fully executed
First the return value for get_aggregate_list and get_service_list are
mocked. These return 3 hosts of which hostone is returned by both the
aggregate and service call. This will help verify the elimination of
duplicates. The scope is setup so that only hostone and hosttwo should
remain.
There will be 2 simulated compute nodes and 2 associated instances.
These will be returned by their matching calls in nova helper. The
calls to get_compute_node_by_name and get_instance_list are asserted
as to verify the correct operation of add_physical_layer.
"""
mock_placement = mock.Mock(name="placement_helper")
mock_placement.get_inventories.return_value = dict()
mock_placement.get_usages_for_resource_provider.return_value = None
m_placement.return_value = mock_placement
m_nova.return_value.get_aggregate_list.return_value = \
[mock.Mock(id=1, name='example'),
@ -321,7 +357,69 @@ class TestNovaModelBuilder(base.TestCase):
[mock.Mock(zone='av_b', host='hostthree'),
mock.Mock(zone='av_a', host='hostone')]
m_nova.return_value.get_compute_node_by_name.return_value = False
compute_node_one = mock.Mock(
id='796fee99-65dd-4262-aa-fd2a1143faa6',
hypervisor_hostname='hostone',
hypervisor_type='QEMU',
state='TEST_STATE',
status='TEST_STATUS',
memory_mb=333,
memory_mb_used=100,
free_disk_gb=222,
local_gb=111,
local_gb_used=10,
vcpus=4,
vcpus_used=0,
servers=[
{'name': 'fake_instance',
'uuid': 'ef500f7e-dac8-470f-960c-169486fce71b'}
],
service={'id': 123, 'host': 'hostone',
'disabled_reason': ''},
)
compute_node_two = mock.Mock(
id='756fef99-65dd-4262-aa-fd2a1143faa6',
hypervisor_hostname='hosttwo',
hypervisor_type='QEMU',
state='TEST_STATE',
status='TEST_STATUS',
memory_mb=333,
memory_mb_used=100,
free_disk_gb=222,
local_gb=111,
local_gb_used=10,
vcpus=4,
vcpus_used=0,
servers=[
{'name': 'fake_instance2',
'uuid': 'ef500f7e-dac8-47f0-960c-169486fce71b'}
],
service={'id': 123, 'host': 'hosttwo',
'disabled_reason': ''},
)
m_nova.return_value.get_compute_node_by_name.side_effect = [
[compute_node_one], [compute_node_two]
]
fake_instance_one = mock.Mock(
id='796fee99-65dd-4262-aa-fd2a1143faa6',
name='fake_instance',
flavor={'ram': 333, 'disk': 222, 'vcpus': 4, 'id': 1},
metadata={'hi': 'hello'},
tenant_id='ff560f7e-dbc8-771f-960c-164482fce21b',
)
fake_instance_two = mock.Mock(
id='ef500f7e-dac8-47f0-960c-169486fce71b',
name='fake_instance2',
flavor={'ram': 333, 'disk': 222, 'vcpus': 4, 'id': 1},
metadata={'hi': 'hello'},
tenant_id='756fef99-65dd-4262-aa-fd2a1143faa6',
)
m_nova.return_value.get_instance_list.side_effect = [
[fake_instance_one], [fake_instance_two]
]
m_scope = [{"compute": [
{"host_aggregates": [{"id": 5}]},
@ -337,6 +435,13 @@ class TestNovaModelBuilder(base.TestCase):
self.assertEqual(
m_nova.return_value.get_compute_node_by_name.call_count, 2)
m_nova.return_value.get_instance_list.assert_any_call(
filters={'host': 'hostone'}, limit=1)
m_nova.return_value.get_instance_list.assert_any_call(
filters={'host': 'hosttwo'}, limit=1)
self.assertEqual(
m_nova.return_value.get_instance_list.call_count, 2)
@mock.patch.object(placement_helper, 'PlacementHelper')
@mock.patch.object(nova_helper, 'NovaHelper')
def test_add_physical_layer_with_baremetal_node(self, m_nova,