diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index a8cf14ed..d9b4a5a3 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -61,9 +61,6 @@ class ActionEngine(base.EngineBase): self._compiled = False self._lock = threading.RLock() self._state_lock = threading.RLock() - self._task_executor = None - self._task_action = None - self._retry_action = None self._storage_ensured = False def __str__(self): @@ -166,6 +163,19 @@ class ActionEngine(base.EngineBase): self._root.reset_all() self._change_state(states.PENDING) + @misc.cachedproperty + def _retry_action(self): + return self._retry_action_factory(self.storage, self.task_notifier) + + @misc.cachedproperty + def _task_executor(self): + return self._task_executor_factory() + + @misc.cachedproperty + def _task_action(self): + return self._task_action_factory(self.storage, self._task_executor, + self.task_notifier) + @lock_utils.locked def compile(self): if self._compiled: @@ -175,15 +185,6 @@ class ActionEngine(base.EngineBase): raise exc.Empty("Flow %s is empty." % self._flow.name) self._analyzer = self._graph_analyzer_factory(execution_graph, self.storage) - if self._task_executor is None: - self._task_executor = self._task_executor_factory() - if self._task_action is None: - self._task_action = self._task_action_factory(self.storage, - self._task_executor, - self.task_notifier) - if self._retry_action is None: - self._retry_action = self._retry_action_factory(self.storage, - self.task_notifier) self._root = self._graph_action_factory(self._analyzer, self.storage, self._task_action, diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index 402aaee5..eb8d76ee 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -34,17 +34,13 @@ class EngineBase(object): self._conf = {} else: self._conf = dict(conf) - self._storage = None self.notifier = misc.Notifier() self.task_notifier = misc.Notifier() - @property + @misc.cachedproperty def storage(self): """The storage unit for this flow.""" - if self._storage is None: - self._storage = self._storage_factory(self._flow_detail, - self._backend) - return self._storage + return self._storage_factory(self._flow_detail, self._backend) @abc.abstractproperty def _storage_factory(self): diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 0a592689..10ba522f 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -21,6 +21,7 @@ import copy import datetime import errno import functools +import inspect import keyword import logging import os @@ -174,6 +175,37 @@ def decode_json(raw_data, root_types=(dict,)): return data +class cachedproperty(object): + """Descriptor that can be placed on instance methods to translate + those methods into properties that will be cached in the instance (avoiding + repeated creation checking logic to do the equivalent). + """ + def __init__(self, wrapped): + # If a name is provided (as an argument) then this will be the string + # to place the cached attribute under if not then it will be the + # function itself to be wrapped into a property. + if inspect.isfunction(wrapped): + self._wrapped = wrapped + self._wrapped_attr = "_%s" % (wrapped.__name__) + else: + self._wrapped_attr = wrapped + self._wrapped = None + + def __call__(self, fget): + # If __init__ received a string then this will be the function to be + # wrapped as a property (if __init__ got a function then this will not + # be called). + self._wrapped = fget + return self + + def __get__(self, source, owner): + try: + return getattr(source, self._wrapped_attr) + except AttributeError: + setattr(source, self._wrapped_attr, self._wrapped(source)) + return getattr(source, self._wrapped_attr) + + def wallclock(): # NOTE(harlowja): made into a function so that this can be easily mocked # out if we want to alter time related functionality (for testing