Move engine options extraction to __init__ methods

Instead of repeatedly extraction of engine options in
local functions/methods, just extract the currently
used options once (in using objects __init__ methods).

This saves some repeated extraction and establishes
the pattern of where engine options should be extracted
going forward.

Change-Id: I4d24f7badef61fdd69db22f59931c374c32986ff
This commit is contained in:
Joshua Harlow
2015-11-10 11:36:29 -08:00
committed by Thomas Goirand
parent 84cf42597a
commit 1150c27023
2 changed files with 55 additions and 15 deletions

View File

@@ -113,6 +113,8 @@ class Completer(object):
self._task_action = runtime.task_action
self._retry_action = runtime.retry_action
self._undefined_resolver = RevertAll(self._runtime)
self._defer_reverts = strutils.bool_from_string(
self._runtime.options.get('defer_reverts', False))
def _complete_task(self, task, outcome, result):
"""Completes the given task, processes task failure."""
@@ -186,20 +188,11 @@ class Completer(object):
elif strategy == retry_atom.REVERT:
# Ask parent retry and figure out what to do...
parent_resolver = self._determine_resolution(retry, failure)
# In the future, this will be the only behavior. REVERT
# should defer to the parent retry if it exists, or use the
# default REVERT_ALL if it doesn't. This lets you safely nest
# flows with retries inside flows without retries and it still
# behave as a user would expect, i.e. if the retry gets
# exhausted it reverts the outer flow unless the outer flow
# has a separate retry behavior.
defer_reverts = strutils.bool_from_string(
self._runtime.options.get('defer_reverts', False)
)
if defer_reverts:
# default REVERT_ALL if it doesn't.
if self._defer_reverts:
return parent_resolver
# Ok if the parent resolver says something not REVERT, and
# it isn't just using the undefined resolver, assume the
# parent knows best.

View File

@@ -70,6 +70,40 @@ class ActionEngine(base.Engine):
which will cause the process of reversion or retrying to commence. See the
valid states in the states module to learn more about what other states
the tasks and flow being ran can go through.
**Engine options:**
+-------------------+-----------------------+------+-----------+
| Name/key | Description | Type | Default |
+===================+=======================+======+===========+
| defer_reverts | This option lets you | bool | ``False`` |
| | safely nest flows | | |
| | with retries inside | | |
| | flows without retries | | |
| | and it still behaves | | |
| | as a user would | | |
| | expect (for example | | |
| | if the retry gets | | |
| | exhausted it reverts | | |
| | the outer flow unless | | |
| | the outer flow has a | | |
| | has a separate retry | | |
| | behavior). | | |
+-------------------+-----------------------+------+-----------+
| inject_transient | When true, values | bool | ``True`` |
| | that are local to | | |
| | each atoms scope | | |
| | are injected into | | |
| | storage into a | | |
| | transient location | | |
| | (typically a local | | |
| | dictionary), when | | |
| | false those values | | |
| | are instead persisted | | |
| | into atom details | | |
| | (and saved in a non- | | |
| | transient manner). | | |
+-------------------+-----------------------+------+-----------+
"""
NO_RERAISING_STATES = frozenset([states.SUSPENDED, states.SUCCESS])
@@ -100,6 +134,8 @@ class ActionEngine(base.Engine):
# Retries are not *currently* executed out of the engines process
# or thread (this could change in the future if we desire it to).
self._retry_executor = executor.SerialRetryExecutor()
self._inject_transient = strutils.bool_from_string(
self._options.get('inject_transient', True))
def _check(self, name, check_compiled, check_storage_ensured):
"""Check (and raise) if the engine has not reached a certain stage."""
@@ -256,14 +292,12 @@ class ActionEngine(base.Engine):
def _ensure_storage(self):
"""Ensure all contained atoms exist in the storage unit."""
transient = strutils.bool_from_string(
self._options.get('inject_transient', True))
self.storage.ensure_atoms(
self._runtime.analyzer.iterate_nodes(compiler.ATOMS))
for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
if atom.inject:
self.storage.inject_atom_args(atom.name, atom.inject,
transient=transient)
transient=self._inject_transient)
@fasteners.locked
def validate(self):
@@ -371,7 +405,7 @@ class _ExecutorTextMatch(collections.namedtuple('_ExecutorTextMatch',
class ParallelActionEngine(ActionEngine):
"""Engine that runs tasks in parallel manner.
Supported option keys:
**Additional engine options:**
* ``executor``: a object that implements a :pep:`3148` compatible executor
interface; it will be used for scheduling tasks. The following
@@ -401,6 +435,19 @@ String (case insensitive) Executor used
``threads`` :class:`~.executor.ParallelThreadTaskExecutor`
=========================== ===============================================
* ``max_workers``: a integer that will affect the number of parallel
workers that are used to dispatch tasks into (this number is bounded
by the maximum parallelization your workflow can support).
* ``dispatch_periodicity``: a float (in seconds) that will affect the
parallel process task executor (and therefore is **only** applicable when
the executor provided above is of the process variant). This number
affects how much time the process task executor waits for messages from
child processes (typically indicating they have finished or failed). A
lower number will have high granularity but *currently* involves more
polling while a higher number will involve less polling but a slower time
for an engine to notice a task has completed.
.. |cfp| replace:: concurrent.futures.process
.. |cft| replace:: concurrent.futures.thread
.. |cf| replace:: concurrent.futures