Documentation on concurrency for contributors
Documentation with details on general concurrency as well as OpenStack specific libraries. Describes how different libraries are effectively used across different Watcher services. This includes describing how futurist is used in the Decision Engine and how taskflow is used in the Applier. Finally, this documentation describes how contributors can use the new DecisionEngineThreadpool effectively and includes examples. https://docs.openstack.org/futurist/latest/ https://docs.openstack.org/taskflow/latest/ Change-Id: Ic1cd1f3733a0e9a239c9b8d49951e1e4ece49f3a Partially Implements: blueprint general-purpose-decision-engine-threadpool
This commit is contained in:
parent
c644e23ca0
commit
b5f8e9a910
248
doc/source/contributor/concurrency.rst
Normal file
248
doc/source/contributor/concurrency.rst
Normal file
@ -0,0 +1,248 @@
|
||||
===========
|
||||
Concurrency
|
||||
===========
|
||||
|
||||
Introduction
|
||||
************
|
||||
|
||||
Modern processors typically contain multiple cores all capable of executing
|
||||
instructions in parallel. Ensuring applications can fully utilize modern
|
||||
underlying hardware requires developing with these concepts in mind. The
|
||||
OpenStack foundation maintains a number of libraries to facilitate this
|
||||
utilization, combined with constructs like CPython's GIL_ the proper use of
|
||||
these concepts becomes more straightforward compared to other programming
|
||||
languages.
|
||||
|
||||
The primary libraries maintained by OpenStack to facilitate concurrency are
|
||||
futurist_ and taskflow_. Here futurist is a more straightforward and
|
||||
lightweight library while taskflow is more advanced supporting features like
|
||||
rollback mechanisms. Within Watcher both libraries are used to facilitate
|
||||
concurrency.
|
||||
|
||||
.. _GIL: https://wiki.python.org/moin/GlobalInterpreterLock
|
||||
.. _futurist: https://docs.openstack.org/futurist/latest/
|
||||
.. _taskflow: https://docs.openstack.org/taskflow/latest/
|
||||
|
||||
Threadpool
|
||||
**********
|
||||
|
||||
A threadpool is a collection of one or more threads typically called *workers*
|
||||
to which tasks can be submitted. These submitted tasks will be scheduled by a
|
||||
threadpool and subsequently executed. In the case of Python tasks typically are
|
||||
bounded or unbounded methods while other programming languages like Java
|
||||
require implementing an interface.
|
||||
|
||||
The order and amount of concurrency with which these tasks are executed is up
|
||||
to the threadpool to decide. Some libraries like taskflow allow for either
|
||||
strong or loose ordering of tasks while others like futurist might only support
|
||||
loose ordering. Taskflow supports building tree-based hierarchies of dependent
|
||||
tasks for example.
|
||||
|
||||
Upon submission of a task to a threadpool a so called future_ is returned.
|
||||
These objects allow to determine information about the task such as if it is
|
||||
currently being executed or if it has finished execution. When the task has
|
||||
finished execution the future can also be used to retrieve what was returned by
|
||||
the method.
|
||||
|
||||
Some libraries like futurist provide synchronization primitives for collections
|
||||
of futures such as wait_for_any_. The following sections will cover different
|
||||
types of concurrency used in various services of Watcher.
|
||||
|
||||
.. _future: https://docs.python.org/3/library/concurrent.futures.html
|
||||
.. _wait_for_any: https://docs.openstack.org/futurist/latest/reference/index.html#waiters
|
||||
|
||||
|
||||
Decision engine concurrency
|
||||
***************************
|
||||
|
||||
The concurrency in the decision engine is governed by two independent
|
||||
threadpools. Both of these threadpools are GreenThreadPoolExecutor_ from the
|
||||
futurist_ library. One of these is used automatically and most contributors
|
||||
will not interact with it while developing new features. The other threadpool
|
||||
can frequently be used while developing new features or updating existing ones.
|
||||
It is known as the DecisionEngineThreadpool and allows to achieve performance
|
||||
improvements in network or I/O bound operations.
|
||||
|
||||
.. _GreenThreadPoolExecutor: https://docs.openstack.org/futurist/latest/reference/index.html#executors
|
||||
|
||||
AuditEndpoint
|
||||
#############
|
||||
|
||||
The first threadpool is used to allow multiple audits to be run in parallel.
|
||||
In practice, however, only one audit can be run in parallel. This is due to
|
||||
the data model used by audits being a singleton. To prevent audits destroying
|
||||
each others data model one must wait for the other to complete before being
|
||||
allowed to access this data model. A performance improvement could be achieved
|
||||
by being more intelligent in the use, caching and construction of these
|
||||
data models.
|
||||
|
||||
DecisionEngineThreadPool
|
||||
########################
|
||||
|
||||
The second threadpool is used for generic tasks, typically networking and I/O
|
||||
could benefit the most of this threadpool. Upon execution of an audit this
|
||||
threadpool can be utilized to retrieve information from the Nova compute
|
||||
service for instance. This second threadpool is a singleton and is shared
|
||||
amongst concurrently running audits as a result the amount of workers is static
|
||||
and independent from the amount of workers in the first threadpool. The use of
|
||||
the :class:`~.DecisionEngineThreadpool` while building the Nova compute data
|
||||
model is demonstrated to show how it can effectively be used.
|
||||
|
||||
In the following example a reference to the
|
||||
:class:`~.DecisionEngineThreadpool` is stored in ``self.executor``. Here two
|
||||
tasks are submitted one with function ``self._collect_aggregates`` and the
|
||||
other function ``self._collect_zones``. With both ``self.executor.submit``
|
||||
calls subsequent arguments are passed to the function. All subsequent arguments
|
||||
are passed to the function being submitted as task following the common
|
||||
``(fn, *args, **kwargs)`` signature. One of the original signatures would be
|
||||
``def _collect_aggregates(host_aggregates, compute_nodes)`` for example.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
zone_aggregate_futures = {
|
||||
self.executor.submit(
|
||||
self._collect_aggregates, host_aggregates, compute_nodes),
|
||||
self.executor.submit(
|
||||
self._collect_zones, availability_zones, compute_nodes)
|
||||
}
|
||||
waiters.wait_for_all(zone_aggregate_futures)
|
||||
|
||||
The last statement of the example above waits on all futures to complete.
|
||||
Similarly, ``waiters.wait_for_any`` will wait for any future of the specified
|
||||
collection to complete. To simplify the usage of ``wait_for_any`` the
|
||||
:class:`~.DecisiongEngineThreadpool` defines a ``do_while_futures`` method.
|
||||
This method will iterate in a do_while loop over a collection of futures until
|
||||
all of them have completed. The advantage of ``do_while_futures`` is that it
|
||||
allows to immediately call a method as soon as a future finishes. The arguments
|
||||
for this callback method can be supplied when calling ``do_while_futures``,
|
||||
however, the first argument to the callback is always the future itself! If
|
||||
the collection of futures can safely be modified ``do_while_futures_modify``
|
||||
can be used and should have slightly better performance. The following example
|
||||
will show how ``do_while_futures`` is used in the decision engine.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# For every compute node from compute_nodes submit a task to gather the node it's information.
|
||||
# List comprehension is used to store all the futures of the submitted tasks in node_futures.
|
||||
node_futures = [self.executor.submit(
|
||||
self.nova_helper.get_compute_node_by_name,
|
||||
node, servers=True, detailed=True)
|
||||
for node in compute_nodes]
|
||||
LOG.debug("submitted {0} jobs".format(len(compute_nodes)))
|
||||
|
||||
future_instances = []
|
||||
# do_while iterate over node_futures and upon completion of a future call
|
||||
# self._compute_node_future with the future and future_instances as arguments.
|
||||
self.executor.do_while_futures_modify(
|
||||
node_futures, self._compute_node_future, future_instances)
|
||||
|
||||
# Wait for all instance jobs to finish
|
||||
waiters.wait_for_all(future_instances)
|
||||
|
||||
Finally, let's demonstrate how powerful this ``do_while_futures`` can be by
|
||||
showing what the ``compute_node_future`` callback does. First, it retrieves the
|
||||
result from the future and adds the compute node to the data model. Afterwards,
|
||||
it checks if the compute node has any associated instances and if so it submits
|
||||
an additional task to the :class:`~.DecisionEngineThreadpool`. The future is
|
||||
appended to the ``future_instances`` so ``waiters.wait_for_all`` can be called
|
||||
on this list. This is important as otherwise the building of the data model
|
||||
might return before all tasks for instances have finished.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Get the result from the future.
|
||||
node_info = future.result()[0]
|
||||
|
||||
# Filter out baremetal nodes.
|
||||
if node_info.hypervisor_type == 'ironic':
|
||||
LOG.debug("filtering out baremetal node: %s", node_info)
|
||||
return
|
||||
|
||||
# Add the compute node to the data model.
|
||||
self.add_compute_node(node_info)
|
||||
# Get the instances from the compute node.
|
||||
instances = getattr(node_info, "servers", None)
|
||||
# Do not submit job if there are no instances on compute node.
|
||||
if instances is None:
|
||||
LOG.info("No instances on compute_node: {0}".format(node_info))
|
||||
return
|
||||
# Submit a job to retrieve detailed information about the instances.
|
||||
future_instances.append(
|
||||
self.executor.submit(
|
||||
self.add_instance_node, node_info, instances)
|
||||
)
|
||||
|
||||
Without ``do_while_futures`` an additional ``waiters.wait_for_all`` would be
|
||||
required in between the compute node tasks and the instance tasks. This would
|
||||
cause the progress of the decision engine to stall as less and less tasks
|
||||
remain active before the instance tasks could be submitted. This demonstrates
|
||||
how ``do_while_futures`` can be used to achieve more constant utilization of
|
||||
the underlying hardware.
|
||||
|
||||
Applier concurrency
|
||||
*******************
|
||||
|
||||
The applier does not use the futurist_ GreenThreadPoolExecutor_ directly but
|
||||
instead uses taskflow_. However, taskflow still utilizes a greenthreadpool.
|
||||
This threadpool is initialized in the workflow engine called
|
||||
:class:`~.DefaultWorkFlowEngine`. Currently Watcher supports one workflow
|
||||
engine but the base class allows contributors to develop other workflow engines
|
||||
as well. In taskflow tasks are created using different types of flows such as a
|
||||
linear, unordered or a graph flow. The linear and graph flow allow for strong
|
||||
ordering between individual tasks and it is for this reason that the workflow
|
||||
engine utilizes a graph flow. The creation of tasks, subsequently linking them
|
||||
into a graph like structure and submitting them is shown below.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
self.execution_rule = self.get_execution_rule(actions)
|
||||
flow = gf.Flow("watcher_flow")
|
||||
actions_uuid = {}
|
||||
for a in actions:
|
||||
task = TaskFlowActionContainer(a, self)
|
||||
flow.add(task)
|
||||
actions_uuid[a.uuid] = task
|
||||
|
||||
for a in actions:
|
||||
for parent_id in a.parents:
|
||||
flow.link(actions_uuid[parent_id], actions_uuid[a.uuid],
|
||||
decider=self.decider)
|
||||
|
||||
e = engines.load(
|
||||
flow, executor='greenthreaded', engine='parallel',
|
||||
max_workers=self.config.max_workers)
|
||||
e.run()
|
||||
|
||||
return flow
|
||||
|
||||
In the applier tasks are contained in a :class:`~.TaskFlowActionContainer`
|
||||
which allows them to trigger events in the workflow engine. This way the
|
||||
workflow engine can halt or take other actions while the action plan is being
|
||||
executed based on the success or failure of individual actions. However, the
|
||||
base workflow engine simply uses these notifies to store the result of
|
||||
individual actions in the database. Additionally, since taskflow uses a graph
|
||||
flow if any of the tasks would fail all childs of this tasks not be executed
|
||||
while ``do_revert`` will be triggered for all parents.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
class TaskFlowActionContainer(...):
|
||||
...
|
||||
def do_execute(self, *args, **kwargs):
|
||||
...
|
||||
result = self.action.execute()
|
||||
if result is True:
|
||||
return self.engine.notify(self._db_action,
|
||||
objects.action.State.SUCCEEDED)
|
||||
else:
|
||||
self.engine.notify(self._db_action,
|
||||
objects.action.State.FAILED)
|
||||
|
||||
class BaseWorkFlowEngine(...):
|
||||
...
|
||||
def notify(self, action, state):
|
||||
db_action = objects.Action.get_by_uuid(self.context, action.uuid,
|
||||
eager=True)
|
||||
db_action.state = state
|
||||
db_action.save()
|
||||
return db_action
|
@ -45,7 +45,6 @@ Introduction
|
||||
architecture
|
||||
contributor/contributing
|
||||
|
||||
|
||||
Getting Started
|
||||
---------------
|
||||
|
||||
@ -68,6 +67,7 @@ Admin Guide
|
||||
:maxdepth: 2
|
||||
|
||||
admin/index
|
||||
contributor/concurrency
|
||||
|
||||
User Guide
|
||||
==========
|
||||
|
Loading…
x
Reference in New Issue
Block a user